Add more metrics for file sync (#164)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run

* add metrics in log sync package

* udpate auto sync metrics

* Add metrics for completed file sync

* add more metrics for serial file sync

* adjust default timeout value for auto sync

* fix metrics rpc for Timer type

* add metrics for channel

* refactor channel metrics

* add timeout metrics for segment sync

* refactor channel receiver
This commit is contained in:
Bo QIU 2024-08-22 10:42:15 +08:00 committed by GitHub
parent c1f465e009
commit 82fef674ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 268 additions and 91 deletions

3
Cargo.lock generated
View File

@ -844,6 +844,7 @@ dependencies = [
name = "channel" name = "channel"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"metrics",
"tokio", "tokio",
] ]
@ -4538,6 +4539,8 @@ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"jsonrpsee", "jsonrpsee",
"lazy_static",
"metrics",
"serde_json", "serde_json",
"shared_types", "shared_types",
"storage", "storage",

View File

@ -5,3 +5,4 @@ edition = "2021"
[dependencies] [dependencies]
tokio = { version = "1.19.2", features = ["sync", "time"] } tokio = { version = "1.19.2", features = ["sync", "time"] }
metrics = { workspace = true }

View File

@ -1,7 +1,9 @@
use crate::error::Error; use crate::error::Error;
use crate::metrics::unbounded_channel;
use metrics::{Counter, CounterUsize};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout; use tokio::time::timeout;
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
@ -19,20 +21,30 @@ pub struct Channel<N, Req, Res> {
} }
impl<N, Req, Res> Channel<N, Req, Res> { impl<N, Req, Res> Channel<N, Req, Res> {
pub fn unbounded() -> (Sender<N, Req, Res>, Receiver<N, Req, Res>) { pub fn unbounded(name: &str) -> (Sender<N, Req, Res>, Receiver<N, Req, Res>) {
let (sender, receiver) = mpsc::unbounded_channel(); let metrics_group = format!("common_channel_{}", name);
(Sender { chan: sender }, Receiver { chan: receiver }) let (sender, receiver) = unbounded_channel(metrics_group.as_str());
let metrics_timeout = CounterUsize::register_with_group(metrics_group.as_str(), "timeout");
(
Sender {
chan: sender,
metrics_timeout,
},
receiver,
)
} }
} }
pub struct Sender<N, Req, Res> { pub struct Sender<N, Req, Res> {
chan: mpsc::UnboundedSender<Message<N, Req, Res>>, chan: crate::metrics::Sender<Message<N, Req, Res>>,
metrics_timeout: Arc<dyn Counter<usize>>,
} }
impl<N, Req, Res> Clone for Sender<N, Req, Res> { impl<N, Req, Res> Clone for Sender<N, Req, Res> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Sender { Sender {
chan: self.chan.clone(), chan: self.chan.clone(),
metrics_timeout: self.metrics_timeout.clone(),
} }
} }
} }
@ -53,24 +65,15 @@ impl<N, Req, Res> Sender<N, Req, Res> {
timeout(DEFAULT_REQUEST_TIMEOUT, receiver) timeout(DEFAULT_REQUEST_TIMEOUT, receiver)
.await .await
.map_err(|_| Error::TimeoutError)? .map_err(|_| {
self.metrics_timeout.inc(1);
Error::TimeoutError
})?
.map_err(|e| Error::RecvError(e)) .map_err(|e| Error::RecvError(e))
} }
} }
pub struct Receiver<N, Req, Res> { pub type Receiver<N, Req, Res> = crate::metrics::Receiver<Message<N, Req, Res>>;
chan: mpsc::UnboundedReceiver<Message<N, Req, Res>>,
}
impl<N, Req, Res> Receiver<N, Req, Res> {
pub async fn recv(&mut self) -> Option<Message<N, Req, Res>> {
self.chan.recv().await
}
pub fn try_recv(&mut self) -> Result<Message<N, Req, Res>, TryRecvError> {
self.chan.try_recv()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -91,7 +94,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn request_response() { async fn request_response() {
let (tx, mut rx) = Channel::<Notification, Request, Response>::unbounded(); let (tx, mut rx) = Channel::<Notification, Request, Response>::unbounded("test");
let task1 = async move { let task1 = async move {
match rx.recv().await.expect("not dropped") { match rx.recv().await.expect("not dropped") {

View File

@ -1,5 +1,6 @@
mod channel; mod channel;
pub mod error; pub mod error;
pub mod metrics;
pub mod test_util; pub mod test_util;
pub use crate::channel::{Channel, Message, Receiver, ResponseSender, Sender}; pub use crate::channel::{Channel, Message, Receiver, ResponseSender, Sender};

View File

@ -0,0 +1,112 @@
use std::{fmt::Debug, sync::Arc, time::Instant};
use metrics::{register_meter_with_group, Counter, CounterUsize, Histogram, Meter, Sample};
use tokio::sync::mpsc::{
error::{SendError, TryRecvError},
unbounded_channel as new_unbounded_channel, UnboundedReceiver, UnboundedSender,
};
pub fn unbounded_channel<T>(metric_name: &str) -> (Sender<T>, Receiver<T>) {
let (sender, receiver) = new_unbounded_channel();
let metrics_queued = CounterUsize::register_with_group(metric_name, "size");
(
Sender::new(sender, metric_name, metrics_queued.clone()),
Receiver::new(receiver, metric_name, metrics_queued),
)
}
pub struct Sender<T> {
sender: UnboundedSender<(Instant, T)>,
metrics_send_qps: Arc<dyn Meter>,
metrics_queued: Arc<dyn Counter<usize>>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
metrics_send_qps: self.metrics_send_qps.clone(),
metrics_queued: self.metrics_queued.clone(),
}
}
}
impl<T> Debug for Sender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.sender)
}
}
impl<T> Sender<T> {
pub(crate) fn new(
sender: UnboundedSender<(Instant, T)>,
metrics_group: &str,
metrics_queued: Arc<dyn Counter<usize>>,
) -> Self {
Self {
sender,
metrics_send_qps: register_meter_with_group(metrics_group, "send"),
metrics_queued,
}
}
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.sender.send((Instant::now(), value)) {
Ok(()) => {
self.metrics_send_qps.mark(1);
self.metrics_queued.inc(1);
Ok(())
}
Err(e) => Err(SendError(e.0 .1)),
}
}
}
pub struct Receiver<T> {
receiver: UnboundedReceiver<(Instant, T)>,
metrics_recv_qps: Arc<dyn Meter>,
metrics_queued: Arc<dyn Counter<usize>>,
metrics_queue_latency: Arc<dyn Histogram>,
}
impl<T> Debug for Receiver<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.receiver)
}
}
impl<T> Receiver<T> {
pub(crate) fn new(
receiver: UnboundedReceiver<(Instant, T)>,
metrics_group: &str,
metrics_queued: Arc<dyn Counter<usize>>,
) -> Self {
Self {
receiver,
metrics_recv_qps: register_meter_with_group(metrics_group, "recv"),
metrics_queued,
metrics_queue_latency: Sample::ExpDecay(0.015).register_with_group(
metrics_group,
"latency",
1024,
),
}
}
fn on_recv(&self, value: (Instant, T)) -> T {
self.metrics_recv_qps.mark(1);
self.metrics_queued.dec(1);
self.metrics_queue_latency.update_since(value.0);
value.1
}
pub async fn recv(&mut self) -> Option<T> {
let value = self.receiver.recv().await?;
Some(self.on_recv(value))
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let value = self.receiver.try_recv()?;
Ok(self.on_recv(value))
}
}

View File

@ -21,4 +21,6 @@ storage = { path = "../storage" }
contract-interface = { path = "../../common/contract-interface" } contract-interface = { path = "../../common/contract-interface" }
futures-core = "0.3.28" futures-core = "0.3.28"
futures-util = "0.3.28" futures-util = "0.3.28"
thiserror = "1.0.44" thiserror = "1.0.44"
lazy_static = "1.4.0"
metrics = { workspace = true }

View File

@ -0,0 +1,7 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
}

View File

@ -11,7 +11,7 @@ use std::collections::BTreeMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::future::Future; use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::{ShutdownReason, TaskExecutor}; use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::broadcast; use tokio::sync::broadcast;
@ -358,7 +358,11 @@ impl LogSyncManager {
} }
async fn put_tx_inner(&mut self, tx: Transaction) -> bool { async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
if let Err(e) = self.store.put_tx(tx.clone()) { let start_time = Instant::now();
let result = self.store.put_tx(tx.clone());
metrics::STORE_PUT_TX.update_since(start_time);
if let Err(e) = result {
error!("put_tx error: e={:?}", e); error!("put_tx error: e={:?}", e);
false false
} else { } else {
@ -458,3 +462,4 @@ pub(crate) mod config;
mod data_cache; mod data_cache;
mod log_entry_fetcher; mod log_entry_fetcher;
mod log_query; mod log_query;
mod metrics;

View File

@ -815,7 +815,7 @@ mod tests {
let runtime = TestRuntime::default(); let runtime = TestRuntime::default();
let (network_globals, keypair) = Context::new_network_globals(); let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel(); let (network_send, network_recv) = mpsc::unbounded_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded(); let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel(); let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let store = LogManager::memorydb(LogConfig::default()).unwrap(); let store = LogManager::memorydb(LogConfig::default()).unwrap();
Self { Self {

View File

@ -4,7 +4,7 @@ use crate::{error, Context};
use futures::prelude::*; use futures::prelude::*;
use jsonrpsee::core::async_trait; use jsonrpsee::core::async_trait;
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use metrics::DEFAULT_REGISTRY; use metrics::{DEFAULT_GROUPING_REGISTRY, DEFAULT_REGISTRY};
use network::{multiaddr::Protocol, Multiaddr}; use network::{multiaddr::Protocol, Multiaddr};
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::net::IpAddr; use std::net::IpAddr;
@ -266,6 +266,21 @@ impl RpcServer for RpcServerImpl {
} }
} }
for (group_name, metrics) in DEFAULT_GROUPING_REGISTRY.read().get_all() {
for (metric_name, metric) in metrics.iter() {
let name = format!("{}.{}", group_name, metric_name);
match &maybe_prefix {
Some(prefix) if !name.starts_with(prefix) => {}
_ => {
result.insert(
name,
format!("{} {}", metric.get_type(), metric.get_value()),
);
}
}
}
}
Ok(result) Ok(result)
} }
} }

View File

@ -1,7 +1,7 @@
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender}; use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{collections::HashSet, fmt::Debug, sync::Arc}; use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
use storage_async::Store; use storage_async::Store;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -15,18 +15,23 @@ pub enum SyncResult {
/// Supports to sync files concurrently. /// Supports to sync files concurrently.
#[derive(Clone)] #[derive(Clone)]
pub struct Batcher { pub struct Batcher {
pub(crate) config: Config,
capacity: usize, capacity: usize,
find_peer_timeout: Duration,
tasks: Arc<RwLock<HashSet<u64>>>, // files to sync tasks: Arc<RwLock<HashSet<u64>>>, // files to sync
store: Store, store: Store,
sync_send: SyncSender, sync_send: SyncSender,
} }
impl Batcher { impl Batcher {
pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self { pub fn new(
capacity: usize,
find_peer_timeout: Duration,
store: Store,
sync_send: SyncSender,
) -> Self {
Self { Self {
config,
capacity, capacity,
find_peer_timeout,
tasks: Default::default(), tasks: Default::default(),
store, store,
sync_send, sync_send,
@ -128,7 +133,7 @@ impl Batcher {
// finding peers timeout // finding peers timeout
Some(SyncState::FindingPeers { origin, .. }) Some(SyncState::FindingPeers { origin, .. })
if origin.elapsed() > self.config.find_peer_timeout => if origin.elapsed() > self.find_peer_timeout =>
{ {
debug!(%tx_seq, "Terminate file sync due to finding peers timeout"); debug!(%tx_seq, "Terminate file sync due to finding peers timeout");
self.terminate_file_sync(tx_seq, false).await; self.terminate_file_sync(tx_seq, false).await;
@ -137,7 +142,7 @@ impl Batcher {
// connecting peers timeout // connecting peers timeout
Some(SyncState::ConnectingPeers { origin, .. }) Some(SyncState::ConnectingPeers { origin, .. })
if origin.elapsed() > self.config.find_peer_timeout => if origin.elapsed() > self.find_peer_timeout =>
{ {
debug!(%tx_seq, "Terminate file sync due to connecting peers timeout"); debug!(%tx_seq, "Terminate file sync due to connecting peers timeout");
self.terminate_file_sync(tx_seq, false).await; self.terminate_file_sync(tx_seq, false).await;

View File

@ -22,6 +22,7 @@ pub struct RandomBatcherState {
#[derive(Clone)] #[derive(Clone)]
pub struct RandomBatcher { pub struct RandomBatcher {
config: Config,
batcher: Batcher, batcher: Batcher,
sync_store: Arc<SyncStore>, sync_store: Arc<SyncStore>,
} }
@ -34,7 +35,13 @@ impl RandomBatcher {
sync_store: Arc<SyncStore>, sync_store: Arc<SyncStore>,
) -> Self { ) -> Self {
Self { Self {
batcher: Batcher::new(config, config.max_random_workers, store, sync_send), config,
batcher: Batcher::new(
config.max_random_workers,
config.random_find_peer_timeout,
store,
sync_send,
),
sync_store, sync_store,
} }
} }
@ -56,7 +63,7 @@ impl RandomBatcher {
// disable file sync until catched up // disable file sync until catched up
if !catched_up.load(Ordering::Relaxed) { if !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase"); trace!("Cannot sync file in catch-up phase");
sleep(self.batcher.config.auto_sync_idle_interval).await; sleep(self.config.auto_sync_idle_interval).await;
continue; continue;
} }
@ -73,11 +80,11 @@ impl RandomBatcher {
"File sync still in progress or idle, state = {:?}", "File sync still in progress or idle, state = {:?}",
self.get_state().await self.get_state().await
); );
sleep(self.batcher.config.auto_sync_idle_interval).await; sleep(self.config.auto_sync_idle_interval).await;
} }
Err(err) => { Err(err) => {
warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await); warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
sleep(self.batcher.config.auto_sync_error_interval).await; sleep(self.config.auto_sync_error_interval).await;
} }
} }
} }
@ -96,7 +103,7 @@ impl RandomBatcher {
debug!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await); debug!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
match sync_result { match sync_result {
SyncResult::Completed => metrics::RANDOM_SYNC_RESULT_COMPLETED.inc(1), SyncResult::Completed => metrics::RANDOM_SYNC_RESULT_COMPLETED.mark(1),
SyncResult::Failed => metrics::RANDOM_SYNC_RESULT_FAILED.inc(1), SyncResult::Failed => metrics::RANDOM_SYNC_RESULT_FAILED.inc(1),
SyncResult::Timeout => metrics::RANDOM_SYNC_RESULT_TIMEOUT.inc(1), SyncResult::Timeout => metrics::RANDOM_SYNC_RESULT_TIMEOUT.inc(1),
} }

View File

@ -23,6 +23,7 @@ use tokio::{
/// Supports to sync files in sequence concurrently. /// Supports to sync files in sequence concurrently.
#[derive(Clone)] #[derive(Clone)]
pub struct SerialBatcher { pub struct SerialBatcher {
config: Config,
batcher: Batcher, batcher: Batcher,
/// Next tx seq to sync. /// Next tx seq to sync.
@ -80,13 +81,17 @@ impl SerialBatcher {
sync_send: SyncSender, sync_send: SyncSender,
sync_store: Arc<SyncStore>, sync_store: Arc<SyncStore>,
) -> Result<Self> { ) -> Result<Self> {
let capacity = config.max_sequential_workers;
// continue file sync from break point in db // continue file sync from break point in db
let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?; let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?;
Ok(Self { Ok(Self {
batcher: Batcher::new(config, capacity, store, sync_send), config,
batcher: Batcher::new(
config.max_sequential_workers,
config.sequential_find_peer_timeout,
store,
sync_send,
),
next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))), next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq.unwrap_or(u64::MAX))), max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq.unwrap_or(u64::MAX))),
pending_completed_txs: Default::default(), pending_completed_txs: Default::default(),
@ -136,7 +141,7 @@ impl SerialBatcher {
// disable file sync until catched up // disable file sync until catched up
if !catched_up.load(Ordering::Relaxed) { if !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase"); trace!("Cannot sync file in catch-up phase");
sleep(self.batcher.config.auto_sync_idle_interval).await; sleep(self.config.auto_sync_idle_interval).await;
continue; continue;
} }
@ -157,11 +162,11 @@ impl SerialBatcher {
"File sync still in progress or idle, state = {:?}", "File sync still in progress or idle, state = {:?}",
self.get_state().await self.get_state().await
); );
sleep(self.batcher.config.auto_sync_idle_interval).await; sleep(self.config.auto_sync_idle_interval).await;
} }
Err(err) => { Err(err) => {
warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await); warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
sleep(self.batcher.config.auto_sync_error_interval).await; sleep(self.config.auto_sync_error_interval).await;
} }
} }
} }
@ -257,7 +262,7 @@ impl SerialBatcher {
info!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await); info!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
match sync_result { match sync_result {
SyncResult::Completed => metrics::SEQUENTIAL_SYNC_RESULT_COMPLETED.inc(1), SyncResult::Completed => metrics::SEQUENTIAL_SYNC_RESULT_COMPLETED.mark(1),
SyncResult::Failed => metrics::SEQUENTIAL_SYNC_RESULT_FAILED.inc(1), SyncResult::Failed => metrics::SEQUENTIAL_SYNC_RESULT_FAILED.inc(1),
SyncResult::Timeout => metrics::SEQUENTIAL_SYNC_RESULT_TIMEOUT.inc(1), SyncResult::Timeout => metrics::SEQUENTIAL_SYNC_RESULT_TIMEOUT.inc(1),
} }

View File

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use metrics::{Counter, CounterUsize, Gauge, GaugeUsize, Histogram, Sample}; use metrics::{register_meter, Counter, CounterUsize, Gauge, GaugeUsize, Histogram, Meter, Sample};
lazy_static::lazy_static! { lazy_static::lazy_static! {
// sequential auto sync // sequential auto sync
@ -9,8 +9,7 @@ lazy_static::lazy_static! {
pub static ref SEQUENTIAL_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_sequential_state_txs_pending", 1024); pub static ref SEQUENTIAL_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_sequential_state_txs_pending", 1024);
pub static ref SEQUENTIAL_STATE_GAP_NEXT_DB: Arc<dyn Gauge<usize>> = GaugeUsize::register("sync_auto_sequential_state_gap_next_db"); pub static ref SEQUENTIAL_STATE_GAP_NEXT_DB: Arc<dyn Gauge<usize>> = GaugeUsize::register("sync_auto_sequential_state_gap_next_db");
pub static ref SEQUENTIAL_SYNC_RESULT_TOTAL: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_total"); pub static ref SEQUENTIAL_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_sequential_sync_result_completed");
pub static ref SEQUENTIAL_SYNC_RESULT_COMPLETED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_completed");
pub static ref SEQUENTIAL_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_failed"); pub static ref SEQUENTIAL_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_failed");
pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout"); pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout");
@ -19,8 +18,7 @@ lazy_static::lazy_static! {
pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024); pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024); pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);
pub static ref RANDOM_SYNC_RESULT_TOTAL: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_total"); pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed");
pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_completed");
pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed"); pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed");
pub static ref RANDOM_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_timeout"); pub static ref RANDOM_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_timeout");
} }

View File

@ -0,0 +1,11 @@
use std::sync::Arc;
use metrics::{register_timer, Counter, CounterUsize, Histogram, Sample, Timer};
lazy_static::lazy_static! {
pub static ref SERIAL_SYNC_FILE_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_file_completed");
pub static ref SERIAL_SYNC_CHUNKS_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_chunks_completed");
pub static ref SERIAL_SYNC_SEGMENT_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_controllers_serial_sync_segment_latency", 1024);
pub static ref SERIAL_SYNC_SEGMENT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_segment_timeout");
pub static ref SERIAL_SYNC_UNEXPECTED_ERRORS: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_unexpected_errors");
}

View File

@ -1,3 +1,4 @@
mod metrics;
mod peers; mod peers;
mod serial; mod serial;

View File

@ -1,6 +1,6 @@
use crate::context::SyncNetworkContext; use crate::context::SyncNetworkContext;
use crate::controllers::peers::{PeerState, SyncPeers}; use crate::controllers::peers::{PeerState, SyncPeers};
use crate::controllers::{FileSyncGoal, FileSyncInfo}; use crate::controllers::{metrics, FileSyncGoal, FileSyncInfo};
use crate::{Config, InstantWrapper}; use crate::{Config, InstantWrapper};
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError; use libp2p::swarm::DialError;
@ -311,15 +311,15 @@ impl SerialSyncController {
.peers .peers
.add_new_peer_with_config(peer_id, addr.clone(), shard_config) .add_new_peer_with_config(peer_id, addr.clone(), shard_config)
{ {
info!(%self.tx_seq, %peer_id, %addr, "Found new peer"); debug!(%self.tx_seq, %peer_id, %addr, "Found new peer");
true true
} else { } else {
// e.g. multiple `AnnounceFile` messages propagated // e.g. multiple `AnnounceFile` messages propagated
debug!(%self.tx_seq, %peer_id, %addr, "Found an existing peer"); trace!(%self.tx_seq, %peer_id, %addr, "Found an existing peer");
false false
} }
} else { } else {
debug!(%self.tx_seq, %peer_id, %addr, "Found peer without shard config"); info!(%self.tx_seq, %peer_id, %addr, "Found peer without shard config");
false false
} }
} }
@ -406,7 +406,6 @@ impl SerialSyncController {
} }
pub async fn on_response(&mut self, from_peer_id: PeerId, response: ChunkArrayWithProof) { pub async fn on_response(&mut self, from_peer_id: PeerId, response: ChunkArrayWithProof) {
debug!(%self.tx_seq, %from_peer_id, "Received RPC response");
if self.handle_on_response_mismatch(from_peer_id) { if self.handle_on_response_mismatch(from_peer_id) {
return; return;
} }
@ -429,6 +428,7 @@ impl SerialSyncController {
let data_len = response.chunks.data.len(); let data_len = response.chunks.data.len();
if data_len == 0 || data_len % CHUNK_SIZE > 0 { if data_len == 0 || data_len % CHUNK_SIZE > 0 {
warn!(%from_peer_id, %self.tx_seq, %data_len, "Invalid chunk response data length"); warn!(%from_peer_id, %self.tx_seq, %data_len, "Invalid chunk response data length");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.ban_peer(from_peer_id, "Invalid chunk response data length"); self.ban_peer(from_peer_id, "Invalid chunk response data length");
self.state = SyncState::Idle; self.state = SyncState::Idle;
return; return;
@ -466,6 +466,7 @@ impl SerialSyncController {
} }
Err(err) => { Err(err) => {
warn!(%err, %self.tx_seq, "Failed to validate chunks response"); warn!(%err, %self.tx_seq, "Failed to validate chunks response");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.ban_peer(from_peer_id, "Chunk array validation failed"); self.ban_peer(from_peer_id, "Chunk array validation failed");
self.state = SyncState::Idle; self.state = SyncState::Idle;
return; return;
@ -474,6 +475,8 @@ impl SerialSyncController {
self.failures = 0; self.failures = 0;
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
let shard_config = self.store.get_store().flow().get_shard_config(); let shard_config = self.store.get_store().flow().get_shard_config();
let next_chunk = shard_config.next_segment_index( let next_chunk = shard_config.next_segment_index(
(from_chunk / PORA_CHUNK_SIZE as u64) as usize, (from_chunk / PORA_CHUNK_SIZE as u64) as usize,
@ -488,6 +491,7 @@ impl SerialSyncController {
Ok(true) => self.next_chunk = next_chunk as u64, Ok(true) => self.next_chunk = next_chunk as u64,
Ok(false) => { Ok(false) => {
warn!(%self.tx_seq, ?self.tx_id, "Transaction reverted while storing chunks"); warn!(%self.tx_seq, ?self.tx_id, "Transaction reverted while storing chunks");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed { self.state = SyncState::Failed {
reason: FailureReason::TxReverted(self.tx_id), reason: FailureReason::TxReverted(self.tx_id),
}; };
@ -495,6 +499,7 @@ impl SerialSyncController {
} }
Err(err) => { Err(err) => {
error!(%err, %self.tx_seq, "Unexpected DB error while storing chunks"); error!(%err, %self.tx_seq, "Unexpected DB error while storing chunks");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed { self.state = SyncState::Failed {
reason: FailureReason::DBError(err.to_string()), reason: FailureReason::DBError(err.to_string()),
}; };
@ -511,6 +516,7 @@ impl SerialSyncController {
// completed to download chunks // completed to download chunks
if !self.goal.is_all_chunks() { if !self.goal.is_all_chunks() {
self.state = SyncState::Completed; self.state = SyncState::Completed;
metrics::SERIAL_SYNC_CHUNKS_COMPLETED.update_since(self.since.0);
return; return;
} }
@ -523,15 +529,18 @@ impl SerialSyncController {
Ok(true) => { Ok(true) => {
info!(%self.tx_seq, "Succeeded to finalize file"); info!(%self.tx_seq, "Succeeded to finalize file");
self.state = SyncState::Completed; self.state = SyncState::Completed;
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
} }
Ok(false) => { Ok(false) => {
warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx"); warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed { self.state = SyncState::Failed {
reason: FailureReason::TxReverted(self.tx_id), reason: FailureReason::TxReverted(self.tx_id),
}; };
} }
Err(err) => { Err(err) => {
error!(%err, %self.tx_seq, "Unexpected error during finalize_tx"); error!(%err, %self.tx_seq, "Unexpected error during finalize_tx");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed { self.state = SyncState::Failed {
reason: FailureReason::DBError(err.to_string()), reason: FailureReason::DBError(err.to_string()),
}; };
@ -675,6 +684,7 @@ impl SerialSyncController {
debug!(%self.tx_seq, "No peer to continue downloading and try to find other peers to download"); debug!(%self.tx_seq, "No peer to continue downloading and try to find other peers to download");
self.state = SyncState::Idle; self.state = SyncState::Idle;
} else if since.elapsed() >= self.config.peer_chunks_download_timeout { } else if since.elapsed() >= self.config.peer_chunks_download_timeout {
metrics::SERIAL_SYNC_SEGMENT_TIMEOUT.inc(1);
self.handle_response_failure(peer_id, "RPC timeout"); self.handle_response_failure(peer_id, "RPC timeout");
} else { } else {
completed = true; completed = true;

View File

@ -52,7 +52,9 @@ pub struct Config {
pub max_sequential_workers: usize, pub max_sequential_workers: usize,
pub max_random_workers: usize, pub max_random_workers: usize,
#[serde(deserialize_with = "deserialize_duration")] #[serde(deserialize_with = "deserialize_duration")]
pub find_peer_timeout: Duration, pub sequential_find_peer_timeout: Duration,
#[serde(deserialize_with = "deserialize_duration")]
pub random_find_peer_timeout: Duration,
} }
impl Default for Config { impl Default for Config {
@ -61,26 +63,27 @@ impl Default for Config {
// sync service config // sync service config
heartbeat_interval: Duration::from_secs(5), heartbeat_interval: Duration::from_secs(5),
auto_sync_enabled: false, auto_sync_enabled: false,
max_sync_files: 16, max_sync_files: 32,
sync_file_by_rpc_enabled: true, sync_file_by_rpc_enabled: true,
sync_file_on_announcement_enabled: false, sync_file_on_announcement_enabled: false,
// serial sync config // serial sync config
max_chunks_to_request: 2 * 1024, max_chunks_to_request: 2 * 1024,
max_request_failures: 5, max_request_failures: 5,
peer_connect_timeout: Duration::from_secs(5), peer_connect_timeout: Duration::from_secs(15),
peer_disconnect_timeout: Duration::from_secs(5), peer_disconnect_timeout: Duration::from_secs(15),
peer_find_timeout: Duration::from_secs(5), peer_find_timeout: Duration::from_secs(30),
peer_chunks_download_timeout: Duration::from_secs(5), peer_chunks_download_timeout: Duration::from_secs(15),
peer_wait_outgoing_connection_timeout: Duration::from_secs(10), peer_wait_outgoing_connection_timeout: Duration::from_secs(10),
peer_next_chunks_request_wait_timeout: Duration::from_secs(3), peer_next_chunks_request_wait_timeout: Duration::from_secs(3),
// auto sync config // auto sync config
auto_sync_idle_interval: Duration::from_secs(3), auto_sync_idle_interval: Duration::from_secs(3),
auto_sync_error_interval: Duration::from_secs(10), auto_sync_error_interval: Duration::from_secs(10),
max_sequential_workers: 8, max_sequential_workers: 24,
max_random_workers: 4, max_random_workers: 8,
find_peer_timeout: Duration::from_secs(10), sequential_find_peer_timeout: Duration::from_secs(60),
random_find_peer_timeout: Duration::from_secs(500),
} }
} }
} }

View File

@ -157,7 +157,7 @@ impl SyncService {
event_recv: broadcast::Receiver<LogSyncEvent>, event_recv: broadcast::Receiver<LogSyncEvent>,
catch_up_end_recv: oneshot::Receiver<()>, catch_up_end_recv: oneshot::Receiver<()>,
) -> Result<SyncSender> { ) -> Result<SyncSender> {
let (sync_send, sync_recv) = channel::Channel::unbounded(); let (sync_send, sync_recv) = channel::Channel::unbounded("sync");
let store = Store::new(store, executor.clone()); let store = Store::new(store, executor.clone());
// init auto sync // init auto sync
@ -912,7 +912,7 @@ mod tests {
create_file_location_cache(init_peer_id, vec![txs[0].id()]); create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (_, sync_recv) = channel::Channel::unbounded(); let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService { let mut sync = SyncService {
config: Config::default(), config: Config::default(),
@ -941,7 +941,7 @@ mod tests {
create_file_location_cache(init_peer_id, vec![txs[0].id()]); create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (_, sync_recv) = channel::Channel::unbounded(); let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService { let mut sync = SyncService {
config: Config::default(), config: Config::default(),

View File

@ -228,11 +228,7 @@ reward_contract_address = "0x0496D0817BD8519e0de4894Dc379D35c35275609"
auto_sync_enabled = true auto_sync_enabled = true
# Maximum number of files in sync from other peers simultaneously. # Maximum number of files in sync from other peers simultaneously.
max_sync_files = 32 # max_sync_files = 32
# Timeout to terminate a file sync when automatically sync from other peers.
# If timeout, terminated file sync will be triggered later.
# find_peer_timeout = "10s"
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`). # Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true # sync_file_by_rpc_enabled = true
@ -241,10 +237,10 @@ max_sync_files = 32
# sync_file_on_announcement_enabled = false # sync_file_on_announcement_enabled = false
# Maximum threads to sync files in sequence. # Maximum threads to sync files in sequence.
max_sequential_workers = 24 # max_sequential_workers = 24
# Maximum threads to sync files randomly. # Maximum threads to sync files randomly.
# max_random_workers = 4 # max_random_workers = 8
####################################################################### #######################################################################
### File Location Cache Options ### ### File Location Cache Options ###
@ -265,4 +261,4 @@ max_sequential_workers = 24
# Validity period of location information. # Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache. # If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600 # entry_expiration_time_secs = 3600

View File

@ -228,11 +228,7 @@ reward_contract_address = "0x51998C4d486F406a788B766d93510980ae1f9360"
auto_sync_enabled = true auto_sync_enabled = true
# Maximum number of files in sync from other peers simultaneously. # Maximum number of files in sync from other peers simultaneously.
max_sync_files = 32 # max_sync_files = 32
# Timeout to terminate a file sync when automatically sync from other peers.
# If timeout, terminated file sync will be triggered later.
# find_peer_timeout = "10s"
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`). # Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true # sync_file_by_rpc_enabled = true
@ -241,10 +237,10 @@ max_sync_files = 32
# sync_file_on_announcement_enabled = false # sync_file_on_announcement_enabled = false
# Maximum threads to sync files in sequence. # Maximum threads to sync files in sequence.
max_sequential_workers = 24 # max_sequential_workers = 24
# Maximum threads to sync files randomly. # Maximum threads to sync files randomly.
# max_random_workers = 4 # max_random_workers = 8
####################################################################### #######################################################################
### File Location Cache Options ### ### File Location Cache Options ###
@ -265,4 +261,4 @@ max_sequential_workers = 24
# Validity period of location information. # Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache. # If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600 # entry_expiration_time_secs = 3600

View File

@ -227,11 +227,7 @@
# auto_sync_enabled = false # auto_sync_enabled = false
# Maximum number of files in sync from other peers simultaneously. # Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 16 # max_sync_files = 32
# Timeout to terminate a file sync when automatically sync from other peers.
# If timeout, terminated file sync will be triggered later.
# find_peer_timeout = "10s"
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`). # Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true # sync_file_by_rpc_enabled = true
@ -240,10 +236,10 @@
# sync_file_on_announcement_enabled = false # sync_file_on_announcement_enabled = false
# Maximum threads to sync files in sequence. # Maximum threads to sync files in sequence.
# max_sequential_workers = 8 # max_sequential_workers = 24
# Maximum threads to sync files randomly. # Maximum threads to sync files randomly.
# max_random_workers = 4 # max_random_workers = 8
####################################################################### #######################################################################
### File Location Cache Options ### ### File Location Cache Options ###
@ -264,4 +260,4 @@
# Validity period of location information. # Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache. # If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600 # entry_expiration_time_secs = 3600