diff --git a/node/log_entry_sync/src/sync_manager/metrics.rs b/node/log_entry_sync/src/sync_manager/metrics.rs index 37bee24..6634065 100644 --- a/node/log_entry_sync/src/sync_manager/metrics.rs +++ b/node/log_entry_sync/src/sync_manager/metrics.rs @@ -1,7 +1,11 @@ use std::sync::Arc; -use metrics::{register_timer, Timer}; +use metrics::{register_timer, Gauge, GaugeUsize, Timer}; lazy_static::lazy_static! { - pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_store_put_tx"); + pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc = register_timer("log_manager_handle_data_transaction"); + + pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_manager_put_tx_inner"); + + pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); } diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 8c07d8d..7cbf1c2 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -408,6 +408,7 @@ impl LogSyncManager { } LogFetchProgress::Transaction((tx, block_number)) => { let mut stop = false; + let start_time = Instant::now(); match self.put_tx(tx.clone()).await { Some(false) => stop = true, Some(true) => { @@ -441,6 +442,8 @@ impl LogSyncManager { // no receivers will be created. warn!("log sync broadcast error, error={:?}", e); } + + metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time); } LogFetchProgress::Reverted(reverted) => { self.process_reverted(reverted).await; @@ -453,7 +456,6 @@ impl LogSyncManager { async fn put_tx_inner(&mut self, tx: Transaction) -> bool { let start_time = Instant::now(); let result = self.store.put_tx(tx.clone()); - metrics::STORE_PUT_TX.update_since(start_time); if let Err(e) = result { error!("put_tx error: e={:?}", e); @@ -514,6 +516,7 @@ impl LogSyncManager { // Check if the computed data root matches on-chain state. // If the call fails, we won't check the root here and return `true` directly. let flow_contract = self.log_fetcher.flow_contract(); + match flow_contract .get_flow_root_by_tx_seq(tx.seq.into()) .call() @@ -545,6 +548,10 @@ impl LogSyncManager { } } + metrics::STORE_PUT_TX_SPEED_IN_BYTES + .update((tx.size / start_time.elapsed().as_secs()) as usize); + metrics::STORE_PUT_TX.update_since(start_time); + true } } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index f94483c..8de28ea 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -252,6 +252,7 @@ impl LogStoreWrite for LogManager { /// `put_tx` for the last tx when we restart the node to ensure that it succeeds. /// fn put_tx(&self, tx: Transaction) -> Result<()> { + let start_time = Instant::now(); let mut merkle = self.merkle.write(); debug!("put_tx: tx={:?}", tx); let expected_seq = self.tx_store.next_tx_seq(); @@ -281,6 +282,7 @@ impl LogStoreWrite for LogManager { self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?; } } + metrics::PUT_TX.update_since(start_time); Ok(()) } diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs index 64c480e..deaa4d1 100644 --- a/node/storage/src/log_store/metrics.rs +++ b/node/storage/src/log_store/metrics.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use metrics::{register_timer, Timer}; lazy_static::lazy_static! { + pub static ref PUT_TX: Arc = register_timer("log_store_put_tx"); + pub static ref TX_STORE_PUT: Arc = register_timer("log_store_tx_store_put_tx"); pub static ref CHECK_TX_COMPLETED: Arc =