Compare commits

..

4 Commits

Author SHA1 Message Date
0g-peterzhb
b20ae0c4cc
Merge e2085d8b21 into 9b68a8b7d7 2024-10-28 15:27:00 +00:00
Peter Zhang
e2085d8b21 add detailed metrics for storage layer 2024-10-28 23:27:04 +08:00
Peter Zhang
c9624c3a1e add detailed metrics for storage layer 2024-10-28 22:39:04 +08:00
Peter Zhang
53fee08143 add detailed metrics for storage layer 2024-10-28 21:56:34 +08:00
4 changed files with 22 additions and 3 deletions

View File

@ -1,7 +1,11 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
}

View File

@ -408,6 +408,7 @@ impl LogSyncManager {
}
LogFetchProgress::Transaction((tx, block_number)) => {
let mut stop = false;
let start_time = Instant::now();
match self.put_tx(tx.clone()).await {
Some(false) => stop = true,
Some(true) => {
@ -441,6 +442,8 @@ impl LogSyncManager {
// no receivers will be created.
warn!("log sync broadcast error, error={:?}", e);
}
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
}
LogFetchProgress::Reverted(reverted) => {
self.process_reverted(reverted).await;
@ -453,7 +456,6 @@ impl LogSyncManager {
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
let start_time = Instant::now();
let result = self.store.put_tx(tx.clone());
metrics::STORE_PUT_TX.update_since(start_time);
if let Err(e) = result {
error!("put_tx error: e={:?}", e);
@ -514,6 +516,7 @@ impl LogSyncManager {
// Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
@ -545,6 +548,10 @@ impl LogSyncManager {
}
}
metrics::STORE_PUT_TX_SPEED_IN_BYTES
.update((tx.size / start_time.elapsed().as_millis() as u64) as usize);
metrics::STORE_PUT_TX.update_since(start_time);
true
}
}

View File

@ -192,6 +192,7 @@ impl LogStoreChunkWrite for LogManager {
chunks: ChunkArray,
maybe_file_proof: Option<FlowProof>,
) -> Result<bool> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
let tx = self
.tx_store
@ -224,6 +225,7 @@ impl LogStoreChunkWrite for LogManager {
)?;
self.flow_store.put_mpt_node_list(updated_node_list)?;
}
metrics::PUT_CHUNKS.update_since(start_time);
Ok(true)
}
@ -254,6 +256,7 @@ impl LogStoreWrite for LogManager {
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
///
fn put_tx(&self, tx: Transaction) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
debug!("put_tx: tx={:?}", tx);
let expected_seq = self.tx_store.next_tx_seq();
@ -283,6 +286,7 @@ impl LogStoreWrite for LogManager {
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
}
}
metrics::PUT_TX.update_since(start_time);
Ok(())
}

View File

@ -3,6 +3,10 @@ use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
pub static ref PUT_CHUNKS: Arc<dyn Timer> = register_timer("log_store_put_chunks");
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =