Compare commits

...

4 Commits

Author SHA1 Message Date
0g-peterzhb
8f8b6db72a
Merge 9c305a7743 into 9b68a8b7d7 2024-10-29 08:05:26 +00:00
Peter Zhang
9c305a7743 add detailed metrics in slow operations 2024-10-29 16:05:16 +08:00
Peter Zhang
737dd3c44f add detailed metrics in slow operations 2024-10-29 13:01:00 +08:00
Peter Zhang
daba22ed56 add detailed metrics in slow operations 2024-10-29 12:48:09 +08:00
3 changed files with 37 additions and 37 deletions

View File

@ -1,5 +1,5 @@
use crate::sync_manager::log_query::LogQuery; use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS; use crate::sync_manager::{metrics, RETRY_WAIT_MS};
use crate::ContractAddress; use crate::ContractAddress;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm}; use append_merkle::{Algorithm, Sha3Algorithm};
@ -14,15 +14,12 @@ use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::{ use tokio::sync::{
sync::{
mpsc::{UnboundedReceiver, UnboundedSender}, mpsc::{UnboundedReceiver, UnboundedSender},
RwLock, RwLock,
},
time::Instant,
}; };
pub struct LogEntryFetcher { pub struct LogEntryFetcher {
@ -242,6 +239,7 @@ impl LogEntryFetcher {
); );
let (mut block_hash_sent, mut block_number_sent) = (None, None); let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await { while let Some(maybe_log) = stream.next().await {
let start_time = Instant::now();
match maybe_log { match maybe_log {
Ok(log) => { Ok(log) => {
let sync_progress = let sync_progress =
@ -301,6 +299,7 @@ impl LogEntryFetcher {
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await; tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
} }
} }
metrics::RECOVER_LOG.update_since(start_time);
} }
info!("log recover end"); info!("log recover end");

View File

@ -9,5 +9,5 @@ lazy_static::lazy_static! {
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
pub static ref FlOW_CONTRACT_ROOT: Arc<dyn Timer> = register_timer("log_manager_flow_contract_root"); pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
} }

View File

@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
// Each tx has less than 10KB, so the cache size should be acceptable. // Each tx has less than 10KB, so the cache size should be acceptable.
const BROADCAST_CHANNEL_CAPACITY: usize = 25000; const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10; const CATCH_UP_END_GAP: u64 = 10;
const CHECK_ROOT_INTERVAL: u64 = 500;
/// Errors while handle data /// Errors while handle data
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -515,9 +516,9 @@ impl LogSyncManager {
// Check if the computed data root matches on-chain state. // Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly. // If the call fails, we won't check the root here and return `true` directly.
if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
let flow_contract = self.log_fetcher.flow_contract(); let flow_contract = self.log_fetcher.flow_contract();
let flow_time = Instant::now();
match flow_contract match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into()) .get_flow_root_by_tx_seq(tx.seq.into())
.call() .call()
@ -548,10 +549,10 @@ impl LogSyncManager {
warn!(?e, "fail to read the on-chain flow root"); warn!(?e, "fail to read the on-chain flow root");
} }
} }
metrics::FlOW_CONTRACT_ROOT.update_since(flow_time); }
metrics::STORE_PUT_TX_SPEED_IN_BYTES metrics::STORE_PUT_TX_SPEED_IN_BYTES
.update((tx.size / start_time.elapsed().as_millis() as u64) as usize); .update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
metrics::STORE_PUT_TX.update_since(start_time); metrics::STORE_PUT_TX.update_since(start_time);
true true