From e15143e0a4b9a265f7340fa600409c08dd492d4e Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 16:39:59 +0800 Subject: [PATCH 01/21] Add new P2P protocol NewFile --- node/network/src/types/pubsub.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index d5dd4e1..5705d8d 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -123,6 +123,14 @@ pub struct NewFile { pub timestamp: u32, } +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct NewFile { + pub tx_id: TxID, + pub num_shard: usize, + pub shard_id: usize, + pub timestamp: u32, +} + #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct FindFile { pub tx_id: TxID, From 57ba8f9bb77734a4d28b806b06b028feaab843a1 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 19:00:47 +0800 Subject: [PATCH 02/21] handle NewFile in sync servic to write in db --- node/sync/src/auto_sync/manager.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index 68e3da9..51a3da0 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -53,8 +53,15 @@ impl AutoSyncManager { let catched_up = Arc::new(AtomicBool::new(false)); // handle new file + let sync_store_cloned = sync_store.clone(); executor.spawn( - Self::handle_new_file(new_file_recv, sync_store.clone()), + async move { + while let Some(tx_seq) = new_file_recv.recv().await { + if let Err(err) = sync_store_cloned.insert(tx_seq, Queue::Ready).await { + warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); + } + } + }, "auto_sync_handle_new_file", ); From e338dca31836a52a9d2c4b00e5e49584672d880b Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 15:30:07 +0800 Subject: [PATCH 03/21] Disable sequential sync and store new file in v2 sync store --- node/sync/src/auto_sync/manager.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index 51a3da0..99003a6 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -35,7 +35,7 @@ impl AutoSyncManager { executor: &TaskExecutor, store: Store, sync_send: SyncSender, - log_sync_recv: broadcast::Receiver, + _log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { let (file_announcement_send, file_announcement_recv) = unbounded_channel(); @@ -53,15 +53,8 @@ impl AutoSyncManager { let catched_up = Arc::new(AtomicBool::new(false)); // handle new file - let sync_store_cloned = sync_store.clone(); executor.spawn( - async move { - while let Some(tx_seq) = new_file_recv.recv().await { - if let Err(err) = sync_store_cloned.insert(tx_seq, Queue::Ready).await { - warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); - } - } - }, + Self::handle_new_file(new_file_recv, sync_store.clone()), "auto_sync_handle_new_file", ); @@ -93,7 +86,7 @@ impl AutoSyncManager { ); Ok(Self { - serial, + serial: None, random, file_announcement_send, new_file_send, From 5409d8f4186fca15b0daaa592014b0d9f9c754de Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 18:38:41 +0800 Subject: [PATCH 04/21] do not propagate FindFile to whole network --- node/router/src/libp2p_event_handler.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 439dafa..60270b5 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -343,7 +343,7 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(propagation_source, msg).await + self.on_new_file(source, msg).await } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); @@ -616,6 +616,10 @@ impl Libp2pEventHandler { }; } + // update peer shard config in cache + self.file_location_cache + .insert_peer_config(peer_id, announced_shard_config); + // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { From 5a48c5ba9445543977e2d7830d31b4b4cf6bbb5f Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 19:29:08 +0800 Subject: [PATCH 05/21] Mark peer connected if FileAnnouncement RPC message received --- node/sync/src/controllers/serial.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 876f6d7..7343444 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -89,6 +89,9 @@ pub struct SerialSyncController { /// Cache for storing and serving gossip messages. file_location_cache: Arc, + + /// Whether to find files from neighbors only. + neighbors_only: bool, } impl SerialSyncController { @@ -115,6 +118,7 @@ impl SerialSyncController { ctx, store, file_location_cache, + neighbors_only: true, } } From 5dc7c52d41eb65deecd1e92024d59095ccfc3a15 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 20:01:57 +0800 Subject: [PATCH 06/21] fix unit test failures --- node/sync/src/controllers/serial.rs | 4 ---- node/sync/src/service.rs | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 7343444..876f6d7 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -89,9 +89,6 @@ pub struct SerialSyncController { /// Cache for storing and serving gossip messages. file_location_cache: Arc, - - /// Whether to find files from neighbors only. - neighbors_only: bool, } impl SerialSyncController { @@ -118,7 +115,6 @@ impl SerialSyncController { ctx, store, file_location_cache, - neighbors_only: true, } } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 3fd12f6..9c4ae18 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1571,7 +1571,7 @@ mod tests { .request(SyncRequest::SyncFile { tx_seq }) .await .unwrap(); - + receive_dial(&mut runtime, &sync_send).await; receive_chunk_request( From a7a2f24784b75c584bafc66368f6273cdb94ed70 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 11:09:30 +0800 Subject: [PATCH 07/21] Add py test for auto sync v2 --- tests/sync_auto_random_v2_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/sync_auto_random_v2_test.py b/tests/sync_auto_random_v2_test.py index 0196327..39a12c5 100644 --- a/tests/sync_auto_random_v2_test.py +++ b/tests/sync_auto_random_v2_test.py @@ -3,6 +3,7 @@ from test_framework.test_framework import TestFramework from utility.utils import wait_until + class AutoRandomSyncV2Test(TestFramework): def setup_params(self): self.num_nodes = 4 @@ -30,5 +31,6 @@ class AutoRandomSyncV2Test(TestFramework): wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]) + if __name__ == "__main__": AutoRandomSyncV2Test().main() From a872480faa323d2d84c6ea0acd707eed8d33c1c3 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 11:31:54 +0800 Subject: [PATCH 08/21] fmt code --- node/sync/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 9c4ae18..3fd12f6 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1571,7 +1571,7 @@ mod tests { .request(SyncRequest::SyncFile { tx_seq }) .await .unwrap(); - + receive_dial(&mut runtime, &sync_send).await; receive_chunk_request( From 06ae2450d096fe8caceaf63176ad4f5599a33fb7 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 12:23:08 +0800 Subject: [PATCH 09/21] fix random test failure --- node/sync/src/auto_sync/manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index 99003a6..68e3da9 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -35,7 +35,7 @@ impl AutoSyncManager { executor: &TaskExecutor, store: Store, sync_send: SyncSender, - _log_sync_recv: broadcast::Receiver, + log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { let (file_announcement_send, file_announcement_recv) = unbounded_channel(); @@ -86,7 +86,7 @@ impl AutoSyncManager { ); Ok(Self { - serial: None, + serial, random, file_announcement_send, new_file_send, From 9a199ba71406d1147c824923d3eed2e1f99470a5 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 14:57:30 +0800 Subject: [PATCH 10/21] Add comments --- node/router/src/libp2p_event_handler.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 60270b5..439dafa 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -343,7 +343,7 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(source, msg).await + self.on_new_file(propagation_source, msg).await } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); @@ -616,10 +616,6 @@ impl Libp2pEventHandler { }; } - // update peer shard config in cache - self.file_location_cache - .insert_peer_config(peer_id, announced_shard_config); - // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { From cb7de7f60e3c86492bef539d69fd317fb685064a Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Thu, 24 Oct 2024 13:38:02 +0800 Subject: [PATCH 11/21] add detailed metrics in slow operations --- Cargo.lock | 2 ++ node/storage/Cargo.toml | 2 ++ node/storage/src/log_store/log_manager.rs | 8 ++++++++ node/storage/src/log_store/metrics.rs | 14 ++++++++++++++ node/storage/src/log_store/mod.rs | 1 + node/storage/src/log_store/tx_store.rs | 5 +++++ 6 files changed, 32 insertions(+) create mode 100644 node/storage/src/log_store/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index d6fa5cc..7975f95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7300,9 +7300,11 @@ dependencies = [ "kvdb", "kvdb-memorydb", "kvdb-rocksdb", + "lazy_static", "merkle_light", "merkle_tree", "once_cell", + "metrics", "parking_lot 0.12.3", "rand 0.8.5", "rayon", diff --git a/node/storage/Cargo.toml b/node/storage/Cargo.toml index aed9c68..c36afb4 100644 --- a/node/storage/Cargo.toml +++ b/node/storage/Cargo.toml @@ -31,6 +31,8 @@ parking_lot = "0.12.3" serde_json = "1.0.127" tokio = { version = "1.38.0", features = ["full"] } task_executor = { path = "../../common/task_executor" } +lazy_static = "1.4.0" +metrics = { workspace = true } once_cell = { version = "1.19.0", features = [] } [dev-dependencies] diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 118bedb..75ab1e9 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -26,6 +26,7 @@ use std::collections::BTreeMap; use std::path::Path; use std::sync::mpsc; use std::sync::Arc; +use std::time::Instant; use tracing::{debug, error, info, instrument, trace, warn}; /// 256 Bytes @@ -875,6 +876,7 @@ impl LogManager { if merkle_list.is_empty() { return Ok(()); } + let start_time = Instant::now(); self.pad_tx(tx_start_index, &mut *merkle)?; @@ -920,6 +922,8 @@ impl LogManager { } } self.flow_store.put_batch_root_list(batch_root_map)?; + + metrics::APPEND_SUBTREE_LIST.update_since(start_time); Ok(()) } @@ -1142,6 +1146,8 @@ impl LogManager { } fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec) -> Result<()> { + let start_time = Instant::now(); + let mut merkle = self.merkle.write(); let shard_config = self.flow_store.get_shard_config(); // We have all the data need for this tx, so just copy them. @@ -1190,6 +1196,8 @@ impl LogManager { for (seq, _) in to_tx_offset_list { self.tx_store.finalize_tx(seq)?; } + + metrics::COPY_TX_AND_FINALIZE.update_since(start_time); Ok(()) } diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs new file mode 100644 index 0000000..9757615 --- /dev/null +++ b/node/storage/src/log_store/metrics.rs @@ -0,0 +1,14 @@ +use std::sync::Arc; + +use metrics::{register_timer, Timer}; + +lazy_static::lazy_static! { + pub static ref TX_STORE_PUT: Arc = register_timer("log_store_tx_store_put_tx"); + + + pub static ref APPEND_SUBTREE_LIST: Arc = + register_timer("log_store_log_manager_append_subtree_list"); + + pub static ref COPY_TX_AND_FINALIZE: Arc = + register_timer("log_store_log_manager_copy_tx_and_finalize"); +} diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 19f99ee..a930204 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -16,6 +16,7 @@ mod flow_store; mod load_chunk; pub mod log_manager; mod seal_task_manager; +mod metrics; #[cfg(test)] mod tests; pub mod tx_store; diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 37d4b45..ca6039c 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -3,6 +3,7 @@ use crate::log_store::log_manager::{ data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED, COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE, }; +use crate::log_store::metrics; use crate::{try_option, LogManager, ZgsKeyValueDB}; use anyhow::{anyhow, Result}; use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm}; @@ -15,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Instant; use tracing::{error, instrument}; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; @@ -51,6 +53,8 @@ impl TransactionStore { #[instrument(skip(self))] /// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root. pub fn put_tx(&self, mut tx: Transaction) -> Result> { + let start_time = Instant::now(); + let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) { // The last tx is inserted again, so no need to process it. @@ -86,6 +90,7 @@ impl TransactionStore { ); self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); self.kvdb.write(db_tx)?; + metrics::TX_STORE_PUT.update_since(start_time); Ok(old_tx_seq_list) } From 55fb210da993b3119d4447da057062665e5a5a1e Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Thu, 24 Oct 2024 14:47:57 +0800 Subject: [PATCH 12/21] format code --- Cargo.lock | 2 +- node/storage/src/log_store/log_manager.rs | 4 ++++ node/storage/src/log_store/mod.rs | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7975f95..c964d02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7303,8 +7303,8 @@ dependencies = [ "lazy_static", "merkle_light", "merkle_tree", - "once_cell", "metrics", + "once_cell", "parking_lot 0.12.3", "rand 0.8.5", "rayon", diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 75ab1e9..2dfecee 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -29,6 +29,10 @@ use std::sync::Arc; use std::time::Instant; use tracing::{debug, error, info, instrument, trace, warn}; +use crate::log_store::metrics; +use crate::log_store::tx_store::BlockHashAndSubmissionIndex; +use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; + /// 256 Bytes pub const ENTRY_SIZE: usize = 256; /// 1024 Entries. diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index a930204..74df4c4 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -15,8 +15,8 @@ pub mod config; mod flow_store; mod load_chunk; pub mod log_manager; -mod seal_task_manager; mod metrics; +mod seal_task_manager; #[cfg(test)] mod tests; pub mod tx_store; From 57d4d81ebf1d03e722529b8f0721341adc38c3e3 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Fri, 25 Oct 2024 11:59:26 +0800 Subject: [PATCH 13/21] add detailed metrics in slow operations --- node/storage/src/log_store/flow_store.rs | 24 +++++++++++++++++------ node/storage/src/log_store/log_manager.rs | 3 +++ node/storage/src/log_store/metrics.rs | 11 +++++++++++ node/storage/src/log_store/tx_store.rs | 8 ++++++-- 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 88b11e9..62bf9ad 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -1,13 +1,14 @@ -use super::load_chunk::EntryBatch; -use super::seal_task_manager::SealTaskManager; -use super::{MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; use crate::error::Error; +use crate::log_store::load_chunk::EntryBatch; use crate::log_store::log_manager::{ bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE, }; -use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; +use crate::log_store::seal_task_manager::SealTaskManager; +use crate::log_store::{ + metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask, +}; use crate::{try_option, ZgsKeyValueDB}; use any::Any; use anyhow::{anyhow, bail, Result}; @@ -22,6 +23,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Debug; use std::sync::Arc; +use std::time::Instant; use std::{any, cmp, mem}; use tracing::{debug, error, trace}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; @@ -42,7 +44,11 @@ impl FlowStore { } pub fn put_batch_root_list(&self, root_map: BTreeMap) -> Result<()> { - self.db.put_batch_root_list(root_map) + let start_time = Instant::now(); + let res = self.db.put_batch_root_list(root_map); + + metrics::PUT_BATCH_ROOT_LIST.update_since(start_time); + res } pub fn insert_subtree_list_for_batch( @@ -50,6 +56,7 @@ impl FlowStore { batch_index: usize, subtree_list: Vec<(usize, usize, DataRoot)>, ) -> Result<()> { + let start_time = Instant::now(); let mut batch = self .db .get_entry_batch(batch_index as u64)? @@ -57,6 +64,8 @@ impl FlowStore { batch.set_subtree_list(subtree_list); self.db.put_entry_raw(vec![(batch_index as u64, batch)])?; + metrics::INSERT_SUBTREE_LIST.update_since(start_time); + Ok(()) } @@ -75,7 +84,10 @@ impl FlowStore { } pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> { - self.db.put_mpt_node_list(node_list) + let start_time = Instant::now(); + let res = self.db.put_mpt_node_list(node_list); + metrics::PUT_MPT_NODE.update_since(start_time); + res } pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 2dfecee..75d2be5 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -934,6 +934,7 @@ impl LogManager { #[instrument(skip(self, merkle))] fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> { // Check if we need to pad the flow. + let start_time = Instant::now(); let mut tx_start_flow_index = merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64; let pad_size = tx_start_index - tx_start_flow_index; @@ -1022,6 +1023,8 @@ impl LogManager { merkle.pora_chunks_merkle.leaves(), merkle.last_chunk_merkle.leaves() ); + + metrics::PAD_TX.update_since(start_time); Ok(()) } diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs index 9757615..2b9ead4 100644 --- a/node/storage/src/log_store/metrics.rs +++ b/node/storage/src/log_store/metrics.rs @@ -5,10 +5,21 @@ use metrics::{register_timer, Timer}; lazy_static::lazy_static! { pub static ref TX_STORE_PUT: Arc = register_timer("log_store_tx_store_put_tx"); + pub static ref CHECK_TX_COMPLETED: Arc = + register_timer("log_store_log_manager_check_tx_completed"); pub static ref APPEND_SUBTREE_LIST: Arc = register_timer("log_store_log_manager_append_subtree_list"); pub static ref COPY_TX_AND_FINALIZE: Arc = register_timer("log_store_log_manager_copy_tx_and_finalize"); + + pub static ref PAD_TX: Arc = register_timer("log_store_log_manager_pad_tx"); + + pub static ref PUT_BATCH_ROOT_LIST: Arc = register_timer("log_store_flow_store_put_batch_root_list"); + + pub static ref INSERT_SUBTREE_LIST: Arc = + register_timer("log_store_log_manager_insert_subtree_list"); + + pub static ref PUT_MPT_NODE: Arc = register_timer("log_store_log_manager_put_mpt_node"); } diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index ca6039c..a617a37 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -180,8 +180,12 @@ impl TransactionStore { } pub fn check_tx_completed(&self, tx_seq: u64) -> Result { - Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? - == Some(vec![TX_STATUS_FINALIZED])) + let start_time = Instant::now(); + let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? + == Some(vec![TX_STATUS_FINALIZED]); + + metrics::CHECK_TX_COMPLETED.update_since(start_time); + Ok(res) } pub fn check_tx_pruned(&self, tx_seq: u64) -> Result { From daf261de8d480fe6306b47fb1444a9beeaadfab9 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Fri, 25 Oct 2024 13:03:45 +0800 Subject: [PATCH 14/21] add detailed metrics in slow operations --- Cargo.lock | 1 + common/append_merkle/Cargo.toml | 3 +++ common/append_merkle/src/lib.rs | 15 +++++++++++++++ common/append_merkle/src/metrics.rs | 11 +++++++++++ node/network/src/types/pubsub.rs | 8 -------- node/storage/src/log_store/flow_store.rs | 5 +++++ node/storage/src/log_store/log_manager.rs | 5 +++-- node/storage/src/log_store/metrics.rs | 12 ++++++++++-- 8 files changed, 48 insertions(+), 12 deletions(-) create mode 100644 common/append_merkle/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index c964d02..64f030e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,6 +226,7 @@ dependencies = [ "itertools 0.13.0", "lazy_static", "lru 0.12.5", + "metrics", "once_cell", "serde", "tiny-keccak", diff --git a/common/append_merkle/Cargo.toml b/common/append_merkle/Cargo.toml index 2b1b8d6..7033875 100644 --- a/common/append_merkle/Cargo.toml +++ b/common/append_merkle/Cargo.toml @@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] } lazy_static = "1.4.0" tracing = "0.1.36" once_cell = "1.19.0" + +metrics = { workspace = true } + itertools = "0.13.0" lru = "0.12.5" \ No newline at end of file diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 86b550d..ccb7f6d 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -1,4 +1,5 @@ mod merkle_tree; +mod metrics; mod node_manager; mod proof; mod sha3; @@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Instant; use tracing::{trace, warn}; use crate::merkle_tree::MerkleTreeWrite; @@ -145,6 +147,7 @@ impl> AppendMerkleTree { } pub fn append(&mut self, new_leaf: E) { + let start_time = Instant::now(); if new_leaf == E::null() { // appending null is not allowed. return; @@ -152,10 +155,13 @@ impl> AppendMerkleTree { self.node_manager.start_transaction(); self.node_manager.push_node(0, new_leaf); self.recompute_after_append_leaves(self.leaves() - 1); + self.node_manager.commit(); + metrics::APPEND.update_since(start_time); } pub fn append_list(&mut self, leaf_list: Vec) { + let start_time = Instant::now(); if leaf_list.contains(&E::null()) { // appending null is not allowed. return; @@ -165,6 +171,7 @@ impl> AppendMerkleTree { self.node_manager.append_nodes(0, &leaf_list); self.recompute_after_append_leaves(start_index); self.node_manager.commit(); + metrics::APPEND_LIST.update_since(start_time); } /// Append a leaf list by providing their intermediate node hash. @@ -173,6 +180,7 @@ impl> AppendMerkleTree { /// Other nodes in the subtree will be set to `null` nodes. /// TODO: Optimize to avoid storing the `null` nodes? pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> { + let start_time = Instant::now(); if subtree_root == E::null() { // appending null is not allowed. bail!("subtree_root is null"); @@ -182,10 +190,13 @@ impl> AppendMerkleTree { self.append_subtree_inner(subtree_depth, subtree_root)?; self.recompute_after_append_subtree(start_index, subtree_depth - 1); self.node_manager.commit(); + metrics::APPEND_SUBTREE.update_since(start_time); + Ok(()) } pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> { + let start_time = Instant::now(); if subtree_list.iter().any(|(_, root)| root == &E::null()) { // appending null is not allowed. bail!("subtree_list contains null"); @@ -197,12 +208,15 @@ impl> AppendMerkleTree { self.recompute_after_append_subtree(start_index, subtree_depth - 1); } self.node_manager.commit(); + metrics::APPEND_SUBTREE_LIST.update_since(start_time); + Ok(()) } /// Change the value of the last leaf and return the new merkle root. /// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves. pub fn update_last(&mut self, updated_leaf: E) { + let start_time = Instant::now(); if updated_leaf == E::null() { // updating to null is not allowed. return; @@ -216,6 +230,7 @@ impl> AppendMerkleTree { } self.recompute_after_append_leaves(self.leaves() - 1); self.node_manager.commit(); + metrics::UPDATE_LAST.update_since(start_time); } /// Fill an unknown `null` leaf with its real value. diff --git a/common/append_merkle/src/metrics.rs b/common/append_merkle/src/metrics.rs new file mode 100644 index 0000000..cafa42f --- /dev/null +++ b/common/append_merkle/src/metrics.rs @@ -0,0 +1,11 @@ +use std::sync::Arc; + +use metrics::{register_timer, Timer}; + +lazy_static::lazy_static! { + pub static ref APPEND: Arc = register_timer("append_merkle_append"); + pub static ref APPEND_LIST: Arc = register_timer("append_merkle_append_list"); + pub static ref APPEND_SUBTREE: Arc = register_timer("append_merkle_append_subtree"); + pub static ref APPEND_SUBTREE_LIST: Arc = register_timer("append_merkle_append_subtree_list"); + pub static ref UPDATE_LAST: Arc = register_timer("append_merkle_update_last"); +} diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 5705d8d..d5dd4e1 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -123,14 +123,6 @@ pub struct NewFile { pub timestamp: u32, } -#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] -pub struct NewFile { - pub tx_id: TxID, - pub num_shard: usize, - pub shard_id: usize, - pub timestamp: u32, -} - #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct FindFile { pub tx_id: TxID, diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 62bf9ad..4bbd58e 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -239,6 +239,7 @@ impl FlowWrite for FlowStore { /// Return the roots of completed chunks. The order is guaranteed to be increasing /// by chunk index. fn append_entries(&self, data: ChunkArray) -> Result> { + let start_time = Instant::now(); let mut to_seal_set = self.seal_manager.to_seal_set.write(); trace!("append_entries: {} {}", data.start_index, data.data.len()); if data.data.len() % BYTES_PER_SECTOR != 0 { @@ -281,6 +282,8 @@ impl FlowWrite for FlowStore { batch_list.push((chunk_index, batch)); } + + metrics::APPEND_ENTRIES.update_since(start_time); self.db.put_entry_batch_list(batch_list) } @@ -390,6 +393,7 @@ impl FlowDBStore { &self, batch_list: Vec<(u64, EntryBatch)>, ) -> Result> { + let start_time = Instant::now(); let mut completed_batches = Vec::new(); let mut tx = self.kvdb.transaction(); for (batch_index, batch) in batch_list { @@ -410,6 +414,7 @@ impl FlowDBStore { } } self.kvdb.write(tx)?; + metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time); Ok(completed_batches) } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 75d2be5..7001fed 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1,5 +1,3 @@ -use super::tx_store::BlockHashAndSubmissionIndex; -use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore}; use crate::log_store::tx_store::TransactionStore; @@ -1277,6 +1275,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result { } pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result> { + let start_time = Instant::now(); if leaf_data.len() % ENTRY_SIZE != 0 { bail!("merkle_tree: mismatched data size"); } @@ -1292,6 +1291,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result> { .map(Sha3Algorithm::leaf) .collect() }; + + metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time); Ok(r) } diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs index 2b9ead4..64c480e 100644 --- a/node/storage/src/log_store/metrics.rs +++ b/node/storage/src/log_store/metrics.rs @@ -11,6 +11,9 @@ lazy_static::lazy_static! { pub static ref APPEND_SUBTREE_LIST: Arc = register_timer("log_store_log_manager_append_subtree_list"); + pub static ref DATA_TO_MERKLE_LEAVES: Arc = + register_timer("log_store_log_manager_data_to_merkle_leaves"); + pub static ref COPY_TX_AND_FINALIZE: Arc = register_timer("log_store_log_manager_copy_tx_and_finalize"); @@ -19,7 +22,12 @@ lazy_static::lazy_static! { pub static ref PUT_BATCH_ROOT_LIST: Arc = register_timer("log_store_flow_store_put_batch_root_list"); pub static ref INSERT_SUBTREE_LIST: Arc = - register_timer("log_store_log_manager_insert_subtree_list"); + register_timer("log_store_flow_store_insert_subtree_list"); - pub static ref PUT_MPT_NODE: Arc = register_timer("log_store_log_manager_put_mpt_node"); + pub static ref PUT_MPT_NODE: Arc = register_timer("log_store_flow_store_put_mpt_node"); + + pub static ref PUT_ENTRY_BATCH_LIST: Arc = + register_timer("log_store_flow_store_put_entry_batch_list"); + + pub static ref APPEND_ENTRIES: Arc = register_timer("log_store_flow_store_append_entries"); } From 53fee08143164f584b83a51100bd755f6581b4a3 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Mon, 28 Oct 2024 21:56:34 +0800 Subject: [PATCH 15/21] add detailed metrics for storage layer --- node/log_entry_sync/src/sync_manager/metrics.rs | 8 ++++++-- node/log_entry_sync/src/sync_manager/mod.rs | 9 ++++++++- node/storage/src/log_store/log_manager.rs | 2 ++ node/storage/src/log_store/metrics.rs | 2 ++ 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/metrics.rs b/node/log_entry_sync/src/sync_manager/metrics.rs index 37bee24..6634065 100644 --- a/node/log_entry_sync/src/sync_manager/metrics.rs +++ b/node/log_entry_sync/src/sync_manager/metrics.rs @@ -1,7 +1,11 @@ use std::sync::Arc; -use metrics::{register_timer, Timer}; +use metrics::{register_timer, Gauge, GaugeUsize, Timer}; lazy_static::lazy_static! { - pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_store_put_tx"); + pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc = register_timer("log_manager_handle_data_transaction"); + + pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_manager_put_tx_inner"); + + pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); } diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 8c07d8d..7cbf1c2 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -408,6 +408,7 @@ impl LogSyncManager { } LogFetchProgress::Transaction((tx, block_number)) => { let mut stop = false; + let start_time = Instant::now(); match self.put_tx(tx.clone()).await { Some(false) => stop = true, Some(true) => { @@ -441,6 +442,8 @@ impl LogSyncManager { // no receivers will be created. warn!("log sync broadcast error, error={:?}", e); } + + metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time); } LogFetchProgress::Reverted(reverted) => { self.process_reverted(reverted).await; @@ -453,7 +456,6 @@ impl LogSyncManager { async fn put_tx_inner(&mut self, tx: Transaction) -> bool { let start_time = Instant::now(); let result = self.store.put_tx(tx.clone()); - metrics::STORE_PUT_TX.update_since(start_time); if let Err(e) = result { error!("put_tx error: e={:?}", e); @@ -514,6 +516,7 @@ impl LogSyncManager { // Check if the computed data root matches on-chain state. // If the call fails, we won't check the root here and return `true` directly. let flow_contract = self.log_fetcher.flow_contract(); + match flow_contract .get_flow_root_by_tx_seq(tx.seq.into()) .call() @@ -545,6 +548,10 @@ impl LogSyncManager { } } + metrics::STORE_PUT_TX_SPEED_IN_BYTES + .update((tx.size / start_time.elapsed().as_secs()) as usize); + metrics::STORE_PUT_TX.update_since(start_time); + true } } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 7001fed..c8abea5 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -254,6 +254,7 @@ impl LogStoreWrite for LogManager { /// `put_tx` for the last tx when we restart the node to ensure that it succeeds. /// fn put_tx(&self, tx: Transaction) -> Result<()> { + let start_time = Instant::now(); let mut merkle = self.merkle.write(); debug!("put_tx: tx={:?}", tx); let expected_seq = self.tx_store.next_tx_seq(); @@ -283,6 +284,7 @@ impl LogStoreWrite for LogManager { self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?; } } + metrics::PUT_TX.update_since(start_time); Ok(()) } diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs index 64c480e..deaa4d1 100644 --- a/node/storage/src/log_store/metrics.rs +++ b/node/storage/src/log_store/metrics.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use metrics::{register_timer, Timer}; lazy_static::lazy_static! { + pub static ref PUT_TX: Arc = register_timer("log_store_put_tx"); + pub static ref TX_STORE_PUT: Arc = register_timer("log_store_tx_store_put_tx"); pub static ref CHECK_TX_COMPLETED: Arc = From c9624c3a1e7bc77ffa73d3e1f6ad691f067380b8 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Mon, 28 Oct 2024 22:39:04 +0800 Subject: [PATCH 16/21] add detailed metrics for storage layer --- node/log_entry_sync/src/sync_manager/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 7cbf1c2..189d236 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -549,7 +549,7 @@ impl LogSyncManager { } metrics::STORE_PUT_TX_SPEED_IN_BYTES - .update((tx.size / start_time.elapsed().as_secs()) as usize); + .update((tx.size / start_time.elapsed().as_millis() as u64) as usize); metrics::STORE_PUT_TX.update_since(start_time); true From e2085d8b212ca294432efef27fd1324217d01968 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Mon, 28 Oct 2024 23:27:04 +0800 Subject: [PATCH 17/21] add detailed metrics for storage layer --- node/storage/src/log_store/log_manager.rs | 2 ++ node/storage/src/log_store/metrics.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index c8abea5..0546e14 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -192,6 +192,7 @@ impl LogStoreChunkWrite for LogManager { chunks: ChunkArray, maybe_file_proof: Option, ) -> Result { + let start_time = Instant::now(); let mut merkle = self.merkle.write(); let tx = self .tx_store @@ -224,6 +225,7 @@ impl LogStoreChunkWrite for LogManager { )?; self.flow_store.put_mpt_node_list(updated_node_list)?; } + metrics::PUT_CHUNKS.update_since(start_time); Ok(true) } diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs index deaa4d1..ca05772 100644 --- a/node/storage/src/log_store/metrics.rs +++ b/node/storage/src/log_store/metrics.rs @@ -5,6 +5,8 @@ use metrics::{register_timer, Timer}; lazy_static::lazy_static! { pub static ref PUT_TX: Arc = register_timer("log_store_put_tx"); + pub static ref PUT_CHUNKS: Arc = register_timer("log_store_put_chunks"); + pub static ref TX_STORE_PUT: Arc = register_timer("log_store_tx_store_put_tx"); pub static ref CHECK_TX_COMPLETED: Arc = From 04435471478798a75393870431603ab626c2ca2b Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Tue, 29 Oct 2024 10:09:04 +0800 Subject: [PATCH 18/21] add detailed metrics for storage layer --- node/log_entry_sync/src/sync_manager/metrics.rs | 2 ++ node/log_entry_sync/src/sync_manager/mod.rs | 2 ++ node/storage/src/log_store/log_manager.rs | 2 ++ node/storage/src/log_store/metrics.rs | 2 ++ 4 files changed, 8 insertions(+) diff --git a/node/log_entry_sync/src/sync_manager/metrics.rs b/node/log_entry_sync/src/sync_manager/metrics.rs index 6634065..4409cc7 100644 --- a/node/log_entry_sync/src/sync_manager/metrics.rs +++ b/node/log_entry_sync/src/sync_manager/metrics.rs @@ -8,4 +8,6 @@ lazy_static::lazy_static! { pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_manager_put_tx_inner"); pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); + + pub static ref FlOW_CONTRACT_ROOT: Arc = register_timer("log_manager_flow_contract_root"); } diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 189d236..13c7f04 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -517,6 +517,7 @@ impl LogSyncManager { // If the call fails, we won't check the root here and return `true` directly. let flow_contract = self.log_fetcher.flow_contract(); + let flow_time = Instant::now(); match flow_contract .get_flow_root_by_tx_seq(tx.seq.into()) .call() @@ -547,6 +548,7 @@ impl LogSyncManager { warn!(?e, "fail to read the on-chain flow root"); } } + metrics::FlOW_CONTRACT_ROOT.update_since(flow_time); metrics::STORE_PUT_TX_SPEED_IN_BYTES .update((tx.size / start_time.elapsed().as_millis() as u64) as usize); diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 0546e14..4bcd8ff 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -317,6 +317,7 @@ impl LogStoreWrite for LogManager { } fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result { + let start_time = Instant::now(); trace!( "finalize_tx_with_hash: tx_seq={} tx_hash={:?}", tx_seq, @@ -345,6 +346,7 @@ impl LogStoreWrite for LogManager { if same_root_seq_list.first() == Some(&tx_seq) { self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?; } + metrics::FINALIZE_TX_WITH_HASH.update_since(start_time); Ok(true) } else { bail!("finalize tx hash with data missing: tx_seq={}", tx_seq) diff --git a/node/storage/src/log_store/metrics.rs b/node/storage/src/log_store/metrics.rs index ca05772..55706a3 100644 --- a/node/storage/src/log_store/metrics.rs +++ b/node/storage/src/log_store/metrics.rs @@ -34,4 +34,6 @@ lazy_static::lazy_static! { register_timer("log_store_flow_store_put_entry_batch_list"); pub static ref APPEND_ENTRIES: Arc = register_timer("log_store_flow_store_append_entries"); + + pub static ref FINALIZE_TX_WITH_HASH: Arc = register_timer("log_store_log_manager_finalize_tx_with_hash"); } From daba22ed5603565e3932d769a06511bad8d3879e Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Tue, 29 Oct 2024 12:48:09 +0800 Subject: [PATCH 19/21] add detailed metrics in slow operations --- .../src/sync_manager/metrics.rs | 2 - node/log_entry_sync/src/sync_manager/mod.rs | 55 ++++++++++--------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/metrics.rs b/node/log_entry_sync/src/sync_manager/metrics.rs index 4409cc7..6634065 100644 --- a/node/log_entry_sync/src/sync_manager/metrics.rs +++ b/node/log_entry_sync/src/sync_manager/metrics.rs @@ -8,6 +8,4 @@ lazy_static::lazy_static! { pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_manager_put_tx_inner"); pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); - - pub static ref FlOW_CONTRACT_ROOT: Arc = register_timer("log_manager_flow_contract_root"); } diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 13c7f04..3b10b49 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500; // Each tx has less than 10KB, so the cache size should be acceptable. const BROADCAST_CHANNEL_CAPACITY: usize = 25000; const CATCH_UP_END_GAP: u64 = 10; +const CHECK_ROOT_INTERVAL: u64 = 500; /// Errors while handle data #[derive(Error, Debug)] @@ -515,40 +516,40 @@ impl LogSyncManager { // Check if the computed data root matches on-chain state. // If the call fails, we won't check the root here and return `true` directly. - let flow_contract = self.log_fetcher.flow_contract(); + if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 { + let flow_contract = self.log_fetcher.flow_contract(); - let flow_time = Instant::now(); - match flow_contract - .get_flow_root_by_tx_seq(tx.seq.into()) - .call() - .await - { - Ok(contract_root_bytes) => { - let contract_root = H256::from_slice(&contract_root_bytes); - // contract_root is zero for tx submitted before upgrading. - if !contract_root.is_zero() { - match self.store.get_context() { - Ok((local_root, _)) => { - if contract_root != local_root { - error!( - ?contract_root, - ?local_root, - "local flow root and on-chain flow root mismatch" - ); - return false; + match flow_contract + .get_flow_root_by_tx_seq(tx.seq.into()) + .call() + .await + { + Ok(contract_root_bytes) => { + let contract_root = H256::from_slice(&contract_root_bytes); + // contract_root is zero for tx submitted before upgrading. + if !contract_root.is_zero() { + match self.store.get_context() { + Ok((local_root, _)) => { + if contract_root != local_root { + error!( + ?contract_root, + ?local_root, + "local flow root and on-chain flow root mismatch" + ); + return false; + } + } + Err(e) => { + warn!(?e, "fail to read the local flow root"); } - } - Err(e) => { - warn!(?e, "fail to read the local flow root"); } } } - } - Err(e) => { - warn!(?e, "fail to read the on-chain flow root"); + Err(e) => { + warn!(?e, "fail to read the on-chain flow root"); + } } } - metrics::FlOW_CONTRACT_ROOT.update_since(flow_time); metrics::STORE_PUT_TX_SPEED_IN_BYTES .update((tx.size / start_time.elapsed().as_millis() as u64) as usize); From 737dd3c44f5d3d16eade526184d2fe6022a067d7 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Tue, 29 Oct 2024 13:01:00 +0800 Subject: [PATCH 20/21] add detailed metrics in slow operations --- node/log_entry_sync/src/sync_manager/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 3b10b49..36b7b7e 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -552,7 +552,7 @@ impl LogSyncManager { } metrics::STORE_PUT_TX_SPEED_IN_BYTES - .update((tx.size / start_time.elapsed().as_millis() as u64) as usize); + .update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize); metrics::STORE_PUT_TX.update_since(start_time); true From 9c305a7743d04881a0591de9c2ddcb1b4fbf3333 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Tue, 29 Oct 2024 16:05:16 +0800 Subject: [PATCH 21/21] add detailed metrics in slow operations --- .../src/sync_manager/log_entry_fetcher.rs | 15 +++++++-------- node/log_entry_sync/src/sync_manager/metrics.rs | 2 ++ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index 29f8287..72f8fa1 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -1,5 +1,5 @@ use crate::sync_manager::log_query::LogQuery; -use crate::sync_manager::RETRY_WAIT_MS; +use crate::sync_manager::{metrics, RETRY_WAIT_MS}; use crate::ContractAddress; use anyhow::{anyhow, bail, Result}; use append_merkle::{Algorithm, Sha3Algorithm}; @@ -14,15 +14,12 @@ use shared_types::{DataRoot, Transaction}; use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use task_executor::TaskExecutor; -use tokio::{ - sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - RwLock, - }, - time::Instant, +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock, }; pub struct LogEntryFetcher { @@ -242,6 +239,7 @@ impl LogEntryFetcher { ); let (mut block_hash_sent, mut block_number_sent) = (None, None); while let Some(maybe_log) = stream.next().await { + let start_time = Instant::now(); match maybe_log { Ok(log) => { let sync_progress = @@ -301,6 +299,7 @@ impl LogEntryFetcher { tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await; } } + metrics::RECOVER_LOG.update_since(start_time); } info!("log recover end"); diff --git a/node/log_entry_sync/src/sync_manager/metrics.rs b/node/log_entry_sync/src/sync_manager/metrics.rs index 6634065..c2ba946 100644 --- a/node/log_entry_sync/src/sync_manager/metrics.rs +++ b/node/log_entry_sync/src/sync_manager/metrics.rs @@ -8,4 +8,6 @@ lazy_static::lazy_static! { pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_manager_put_tx_inner"); pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); + + pub static ref RECOVER_LOG: Arc = register_timer("log_entry_sync_manager_recover_log"); }