diff --git a/Cargo.lock b/Cargo.lock index d6fa5cc..64f030e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,6 +226,7 @@ dependencies = [ "itertools 0.13.0", "lazy_static", "lru 0.12.5", + "metrics", "once_cell", "serde", "tiny-keccak", @@ -7300,8 +7301,10 @@ dependencies = [ "kvdb", "kvdb-memorydb", "kvdb-rocksdb", + "lazy_static", "merkle_light", "merkle_tree", + "metrics", "once_cell", "parking_lot 0.12.3", "rand 0.8.5", diff --git a/common/append_merkle/Cargo.toml b/common/append_merkle/Cargo.toml index 2b1b8d6..7033875 100644 --- a/common/append_merkle/Cargo.toml +++ b/common/append_merkle/Cargo.toml @@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] } lazy_static = "1.4.0" tracing = "0.1.36" once_cell = "1.19.0" + +metrics = { workspace = true } + itertools = "0.13.0" lru = "0.12.5" \ No newline at end of file diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 86b550d..ccb7f6d 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -1,4 +1,5 @@ mod merkle_tree; +mod metrics; mod node_manager; mod proof; mod sha3; @@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Instant; use tracing::{trace, warn}; use crate::merkle_tree::MerkleTreeWrite; @@ -145,6 +147,7 @@ impl> AppendMerkleTree { } pub fn append(&mut self, new_leaf: E) { + let start_time = Instant::now(); if new_leaf == E::null() { // appending null is not allowed. return; @@ -152,10 +155,13 @@ impl> AppendMerkleTree { self.node_manager.start_transaction(); self.node_manager.push_node(0, new_leaf); self.recompute_after_append_leaves(self.leaves() - 1); + self.node_manager.commit(); + metrics::APPEND.update_since(start_time); } pub fn append_list(&mut self, leaf_list: Vec) { + let start_time = Instant::now(); if leaf_list.contains(&E::null()) { // appending null is not allowed. return; @@ -165,6 +171,7 @@ impl> AppendMerkleTree { self.node_manager.append_nodes(0, &leaf_list); self.recompute_after_append_leaves(start_index); self.node_manager.commit(); + metrics::APPEND_LIST.update_since(start_time); } /// Append a leaf list by providing their intermediate node hash. @@ -173,6 +180,7 @@ impl> AppendMerkleTree { /// Other nodes in the subtree will be set to `null` nodes. /// TODO: Optimize to avoid storing the `null` nodes? pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> { + let start_time = Instant::now(); if subtree_root == E::null() { // appending null is not allowed. bail!("subtree_root is null"); @@ -182,10 +190,13 @@ impl> AppendMerkleTree { self.append_subtree_inner(subtree_depth, subtree_root)?; self.recompute_after_append_subtree(start_index, subtree_depth - 1); self.node_manager.commit(); + metrics::APPEND_SUBTREE.update_since(start_time); + Ok(()) } pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> { + let start_time = Instant::now(); if subtree_list.iter().any(|(_, root)| root == &E::null()) { // appending null is not allowed. bail!("subtree_list contains null"); @@ -197,12 +208,15 @@ impl> AppendMerkleTree { self.recompute_after_append_subtree(start_index, subtree_depth - 1); } self.node_manager.commit(); + metrics::APPEND_SUBTREE_LIST.update_since(start_time); + Ok(()) } /// Change the value of the last leaf and return the new merkle root. /// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves. pub fn update_last(&mut self, updated_leaf: E) { + let start_time = Instant::now(); if updated_leaf == E::null() { // updating to null is not allowed. return; @@ -216,6 +230,7 @@ impl> AppendMerkleTree { } self.recompute_after_append_leaves(self.leaves() - 1); self.node_manager.commit(); + metrics::UPDATE_LAST.update_since(start_time); } /// Fill an unknown `null` leaf with its real value. diff --git a/common/append_merkle/src/metrics.rs b/common/append_merkle/src/metrics.rs new file mode 100644 index 0000000..cafa42f --- /dev/null +++ b/common/append_merkle/src/metrics.rs @@ -0,0 +1,11 @@ +use std::sync::Arc; + +use metrics::{register_timer, Timer}; + +lazy_static::lazy_static! { + pub static ref APPEND: Arc = register_timer("append_merkle_append"); + pub static ref APPEND_LIST: Arc = register_timer("append_merkle_append_list"); + pub static ref APPEND_SUBTREE: Arc = register_timer("append_merkle_append_subtree"); + pub static ref APPEND_SUBTREE_LIST: Arc = register_timer("append_merkle_append_subtree_list"); + pub static ref UPDATE_LAST: Arc = register_timer("append_merkle_update_last"); +} diff --git a/node/storage/Cargo.toml b/node/storage/Cargo.toml index aed9c68..c36afb4 100644 --- a/node/storage/Cargo.toml +++ b/node/storage/Cargo.toml @@ -31,6 +31,8 @@ parking_lot = "0.12.3" serde_json = "1.0.127" tokio = { version = "1.38.0", features = ["full"] } task_executor = { path = "../../common/task_executor" } +lazy_static = "1.4.0" +metrics = { workspace = true } once_cell = { version = "1.19.0", features = [] } [dev-dependencies] diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 88b11e9..4bbd58e 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -1,13 +1,14 @@ -use super::load_chunk::EntryBatch; -use super::seal_task_manager::SealTaskManager; -use super::{MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; use crate::error::Error; +use crate::log_store::load_chunk::EntryBatch; use crate::log_store::log_manager::{ bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE, }; -use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; +use crate::log_store::seal_task_manager::SealTaskManager; +use crate::log_store::{ + metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask, +}; use crate::{try_option, ZgsKeyValueDB}; use any::Any; use anyhow::{anyhow, bail, Result}; @@ -22,6 +23,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Debug; use std::sync::Arc; +use std::time::Instant; use std::{any, cmp, mem}; use tracing::{debug, error, trace}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; @@ -42,7 +44,11 @@ impl FlowStore { } pub fn put_batch_root_list(&self, root_map: BTreeMap) -> Result<()> { - self.db.put_batch_root_list(root_map) + let start_time = Instant::now(); + let res = self.db.put_batch_root_list(root_map); + + metrics::PUT_BATCH_ROOT_LIST.update_since(start_time); + res } pub fn insert_subtree_list_for_batch( @@ -50,6 +56,7 @@ impl FlowStore { batch_index: usize, subtree_list: Vec<(usize, usize, DataRoot)>, ) -> Result<()> { + let start_time = Instant::now(); let mut batch = self .db .get_entry_batch(batch_index as u64)? @@ -57,6 +64,8 @@ impl FlowStore { batch.set_subtree_list(subtree_list); self.db.put_entry_raw(vec![(batch_index as u64, batch)])?; + metrics::INSERT_SUBTREE_LIST.update_since(start_time); + Ok(()) } @@ -75,7 +84,10 @@ impl FlowStore { } pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> { - self.db.put_mpt_node_list(node_list) + let start_time = Instant::now(); + let res = self.db.put_mpt_node_list(node_list); + metrics::PUT_MPT_NODE.update_since(start_time); + res } pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> { @@ -227,6 +239,7 @@ impl FlowWrite for FlowStore { /// Return the roots of completed chunks. The order is guaranteed to be increasing /// by chunk index. fn append_entries(&self, data: ChunkArray) -> Result> { + let start_time = Instant::now(); let mut to_seal_set = self.seal_manager.to_seal_set.write(); trace!("append_entries: {} {}", data.start_index, data.data.len()); if data.data.len() % BYTES_PER_SECTOR != 0 { @@ -269,6 +282,8 @@ impl FlowWrite for FlowStore { batch_list.push((chunk_index, batch)); } + + metrics::APPEND_ENTRIES.update_since(start_time); self.db.put_entry_batch_list(batch_list) } @@ -378,6 +393,7 @@ impl FlowDBStore { &self, batch_list: Vec<(u64, EntryBatch)>, ) -> Result> { + let start_time = Instant::now(); let mut completed_batches = Vec::new(); let mut tx = self.kvdb.transaction(); for (batch_index, batch) in batch_list { @@ -398,6 +414,7 @@ impl FlowDBStore { } } self.kvdb.write(tx)?; + metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time); Ok(completed_batches) } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 118bedb..7001fed 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1,5 +1,3 @@ -use super::tx_store::BlockHashAndSubmissionIndex; -use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore}; use crate::log_store::tx_store::TransactionStore; @@ -26,8 +24,13 @@ use std::collections::BTreeMap; use std::path::Path; use std::sync::mpsc; use std::sync::Arc; +use std::time::Instant; use tracing::{debug, error, info, instrument, trace, warn}; +use crate::log_store::metrics; +use crate::log_store::tx_store::BlockHashAndSubmissionIndex; +use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; + /// 256 Bytes pub const ENTRY_SIZE: usize = 256; /// 1024 Entries. @@ -875,6 +878,7 @@ impl LogManager { if merkle_list.is_empty() { return Ok(()); } + let start_time = Instant::now(); self.pad_tx(tx_start_index, &mut *merkle)?; @@ -920,12 +924,15 @@ impl LogManager { } } self.flow_store.put_batch_root_list(batch_root_map)?; + + metrics::APPEND_SUBTREE_LIST.update_since(start_time); Ok(()) } #[instrument(skip(self, merkle))] fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> { // Check if we need to pad the flow. + let start_time = Instant::now(); let mut tx_start_flow_index = merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64; let pad_size = tx_start_index - tx_start_flow_index; @@ -1014,6 +1021,8 @@ impl LogManager { merkle.pora_chunks_merkle.leaves(), merkle.last_chunk_merkle.leaves() ); + + metrics::PAD_TX.update_since(start_time); Ok(()) } @@ -1142,6 +1151,8 @@ impl LogManager { } fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec) -> Result<()> { + let start_time = Instant::now(); + let mut merkle = self.merkle.write(); let shard_config = self.flow_store.get_shard_config(); // We have all the data need for this tx, so just copy them. @@ -1190,6 +1201,8 @@ impl LogManager { for (seq, _) in to_tx_offset_list { self.tx_store.finalize_tx(seq)?; } + + metrics::COPY_TX_AND_FINALIZE.update_since(start_time); Ok(()) } @@ -1262,6 +1275,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result { } pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result> { + let start_time = Instant::now(); if leaf_data.len() % ENTRY_SIZE != 0 { bail!("merkle_tree: mismatched data size"); } @@ -1277,6 +1291,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result> { .map(Sha3Algorithm::leaf) .collect() }; + + metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time); Ok(r) } diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs new file mode 100644 index 0000000..64c480e --- /dev/null +++ b/node/storage/src/log_store/metrics.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use metrics::{register_timer, Timer}; + +lazy_static::lazy_static! { + pub static ref TX_STORE_PUT: Arc = register_timer("log_store_tx_store_put_tx"); + + pub static ref CHECK_TX_COMPLETED: Arc = + register_timer("log_store_log_manager_check_tx_completed"); + + pub static ref APPEND_SUBTREE_LIST: Arc = + register_timer("log_store_log_manager_append_subtree_list"); + + pub static ref DATA_TO_MERKLE_LEAVES: Arc = + register_timer("log_store_log_manager_data_to_merkle_leaves"); + + pub static ref COPY_TX_AND_FINALIZE: Arc = + register_timer("log_store_log_manager_copy_tx_and_finalize"); + + pub static ref PAD_TX: Arc = register_timer("log_store_log_manager_pad_tx"); + + pub static ref PUT_BATCH_ROOT_LIST: Arc = register_timer("log_store_flow_store_put_batch_root_list"); + + pub static ref INSERT_SUBTREE_LIST: Arc = + register_timer("log_store_flow_store_insert_subtree_list"); + + pub static ref PUT_MPT_NODE: Arc = register_timer("log_store_flow_store_put_mpt_node"); + + pub static ref PUT_ENTRY_BATCH_LIST: Arc = + register_timer("log_store_flow_store_put_entry_batch_list"); + + pub static ref APPEND_ENTRIES: Arc = register_timer("log_store_flow_store_append_entries"); +} diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 19f99ee..74df4c4 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -15,6 +15,7 @@ pub mod config; mod flow_store; mod load_chunk; pub mod log_manager; +mod metrics; mod seal_task_manager; #[cfg(test)] mod tests; diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 37d4b45..a617a37 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -3,6 +3,7 @@ use crate::log_store::log_manager::{ data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED, COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE, }; +use crate::log_store::metrics; use crate::{try_option, LogManager, ZgsKeyValueDB}; use anyhow::{anyhow, Result}; use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm}; @@ -15,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Instant; use tracing::{error, instrument}; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; @@ -51,6 +53,8 @@ impl TransactionStore { #[instrument(skip(self))] /// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root. pub fn put_tx(&self, mut tx: Transaction) -> Result> { + let start_time = Instant::now(); + let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) { // The last tx is inserted again, so no need to process it. @@ -86,6 +90,7 @@ impl TransactionStore { ); self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); self.kvdb.write(db_tx)?; + metrics::TX_STORE_PUT.update_since(start_time); Ok(old_tx_seq_list) } @@ -175,8 +180,12 @@ impl TransactionStore { } pub fn check_tx_completed(&self, tx_seq: u64) -> Result { - Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? - == Some(vec![TX_STATUS_FINALIZED])) + let start_time = Instant::now(); + let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? + == Some(vec![TX_STATUS_FINALIZED]); + + metrics::CHECK_TX_COMPLETED.update_since(start_time); + Ok(res) } pub fn check_tx_pruned(&self, tx_seq: u64) -> Result { diff --git a/tests/sync_auto_random_v2_test.py b/tests/sync_auto_random_v2_test.py index 0196327..39a12c5 100644 --- a/tests/sync_auto_random_v2_test.py +++ b/tests/sync_auto_random_v2_test.py @@ -3,6 +3,7 @@ from test_framework.test_framework import TestFramework from utility.utils import wait_until + class AutoRandomSyncV2Test(TestFramework): def setup_params(self): self.num_nodes = 4 @@ -30,5 +31,6 @@ class AutoRandomSyncV2Test(TestFramework): wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]) + if __name__ == "__main__": AutoRandomSyncV2Test().main()