add detailed metrics in slow operations

This commit is contained in:
Peter Zhang 2024-10-29 12:48:09 +08:00
parent 0443547147
commit daba22ed56
2 changed files with 28 additions and 29 deletions

View File

@ -8,6 +8,4 @@ lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner"); pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
pub static ref FlOW_CONTRACT_ROOT: Arc<dyn Timer> = register_timer("log_manager_flow_contract_root");
} }

View File

@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
// Each tx has less than 10KB, so the cache size should be acceptable. // Each tx has less than 10KB, so the cache size should be acceptable.
const BROADCAST_CHANNEL_CAPACITY: usize = 25000; const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10; const CATCH_UP_END_GAP: u64 = 10;
const CHECK_ROOT_INTERVAL: u64 = 500;
/// Errors while handle data /// Errors while handle data
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -515,40 +516,40 @@ impl LogSyncManager {
// Check if the computed data root matches on-chain state. // Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly. // If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract(); if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
let flow_contract = self.log_fetcher.flow_contract();
let flow_time = Instant::now(); match flow_contract
match flow_contract .get_flow_root_by_tx_seq(tx.seq.into())
.get_flow_root_by_tx_seq(tx.seq.into()) .call()
.call() .await
.await {
{ Ok(contract_root_bytes) => {
Ok(contract_root_bytes) => { let contract_root = H256::from_slice(&contract_root_bytes);
let contract_root = H256::from_slice(&contract_root_bytes); // contract_root is zero for tx submitted before upgrading.
// contract_root is zero for tx submitted before upgrading. if !contract_root.is_zero() {
if !contract_root.is_zero() { match self.store.get_context() {
match self.store.get_context() { Ok((local_root, _)) => {
Ok((local_root, _)) => { if contract_root != local_root {
if contract_root != local_root { error!(
error!( ?contract_root,
?contract_root, ?local_root,
?local_root, "local flow root and on-chain flow root mismatch"
"local flow root and on-chain flow root mismatch" );
); return false;
return false; }
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
} }
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
} }
} }
} }
} Err(e) => {
Err(e) => { warn!(?e, "fail to read the on-chain flow root");
warn!(?e, "fail to read the on-chain flow root"); }
} }
} }
metrics::FlOW_CONTRACT_ROOT.update_since(flow_time);
metrics::STORE_PUT_TX_SPEED_IN_BYTES metrics::STORE_PUT_TX_SPEED_IN_BYTES
.update((tx.size / start_time.elapsed().as_millis() as u64) as usize); .update((tx.size / start_time.elapsed().as_millis() as u64) as usize);