add detailed metrics in slow operations

This commit is contained in:
Peter Zhang 2024-10-24 13:38:02 +08:00
parent cc6cf0fdb2
commit 0f3cca9140
6 changed files with 35 additions and 2 deletions

2
Cargo.lock generated
View File

@ -7272,8 +7272,10 @@ dependencies = [
"kvdb", "kvdb",
"kvdb-memorydb", "kvdb-memorydb",
"kvdb-rocksdb", "kvdb-rocksdb",
"lazy_static",
"merkle_light", "merkle_light",
"merkle_tree", "merkle_tree",
"metrics",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"rand 0.8.5", "rand 0.8.5",
"rayon", "rayon",

View File

@ -31,6 +31,8 @@ parking_lot = "0.12.3"
serde_json = "1.0.127" serde_json = "1.0.127"
tokio = { version = "1.38.0", features = ["full"] } tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
lazy_static = "1.4.0"
metrics = { workspace = true }
[dev-dependencies] [dev-dependencies]
rand = "0.8.5" rand = "0.8.5"

View File

@ -23,10 +23,12 @@ use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex; use crate::log_store::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::log_store::metrics;
/// 256 Bytes /// 256 Bytes
pub const ENTRY_SIZE: usize = 256; pub const ENTRY_SIZE: usize = 256;
@ -880,6 +882,7 @@ impl LogManager {
if merkle_list.is_empty() { if merkle_list.is_empty() {
return Ok(()); return Ok(());
} }
let start_time = Instant::now();
self.pad_tx(tx_start_index, &mut *merkle)?; self.pad_tx(tx_start_index, &mut *merkle)?;
@ -925,6 +928,8 @@ impl LogManager {
} }
} }
self.flow_store.put_batch_root_list(batch_root_map)?; self.flow_store.put_batch_root_list(batch_root_map)?;
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(()) Ok(())
} }
@ -1148,6 +1153,8 @@ impl LogManager {
} }
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> { fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write(); let mut merkle = self.merkle.write();
let shard_config = self.flow_store.get_shard_config(); let shard_config = self.flow_store.get_shard_config();
// We have all the data need for this tx, so just copy them. // We have all the data need for this tx, so just copy them.
@ -1196,6 +1203,8 @@ impl LogManager {
for (seq, _) in to_tx_offset_list { for (seq, _) in to_tx_offset_list {
self.tx_store.finalize_tx(seq)?; self.tx_store.finalize_tx(seq)?;
} }
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
Ok(()) Ok(())
} }

View File

@ -0,0 +1,14 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_log_manager_append_subtree_list");
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
register_timer("log_store_log_manager_copy_tx_and_finalize");
}

View File

@ -16,6 +16,7 @@ mod flow_store;
mod load_chunk; mod load_chunk;
pub mod log_manager; pub mod log_manager;
mod seal_task_manager; mod seal_task_manager;
mod metrics;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub mod tx_store; pub mod tx_store;

View File

@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED, data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE, COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
}; };
use crate::log_store::metrics;
use crate::{try_option, LogManager, ZgsKeyValueDB}; use crate::{try_option, LogManager, ZgsKeyValueDB};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm}; use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tracing::{error, instrument}; use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
@ -51,6 +53,8 @@ impl TransactionStore {
#[instrument(skip(self))] #[instrument(skip(self))]
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root. /// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> { pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
let start_time = Instant::now();
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) { if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
// The last tx is inserted again, so no need to process it. // The last tx is inserted again, so no need to process it.
@ -86,6 +90,7 @@ impl TransactionStore {
); );
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
self.kvdb.write(db_tx)?; self.kvdb.write(db_tx)?;
metrics::TX_STORE_PUT.update_since(start_time);
Ok(old_tx_seq_list) Ok(old_tx_seq_list)
} }