Compare commits

...

21 Commits

Author SHA1 Message Date
Peter Zhang
9c305a7743 add detailed metrics in slow operations 2024-10-29 16:05:16 +08:00
Peter Zhang
737dd3c44f add detailed metrics in slow operations 2024-10-29 13:01:00 +08:00
Peter Zhang
daba22ed56 add detailed metrics in slow operations 2024-10-29 12:48:09 +08:00
Peter Zhang
0443547147 add detailed metrics for storage layer 2024-10-29 10:09:04 +08:00
Peter Zhang
e2085d8b21 add detailed metrics for storage layer 2024-10-28 23:27:04 +08:00
Peter Zhang
c9624c3a1e add detailed metrics for storage layer 2024-10-28 22:39:04 +08:00
Peter Zhang
53fee08143 add detailed metrics for storage layer 2024-10-28 21:56:34 +08:00
Peter Zhang
daf261de8d add detailed metrics in slow operations 2024-10-28 15:26:25 +08:00
Peter Zhang
57d4d81ebf add detailed metrics in slow operations 2024-10-28 15:19:06 +08:00
Peter Zhang
55fb210da9 format code 2024-10-28 15:18:42 +08:00
Peter Zhang
cb7de7f60e add detailed metrics in slow operations 2024-10-28 15:17:55 +08:00
boqiu
9a199ba714 Add comments 2024-10-28 15:17:13 +08:00
boqiu
06ae2450d0 fix random test failure 2024-10-28 15:16:53 +08:00
boqiu
a872480faa fmt code 2024-10-28 15:16:37 +08:00
boqiu
a7a2f24784 Add py test for auto sync v2 2024-10-28 15:16:21 +08:00
boqiu
5dc7c52d41 fix unit test failures 2024-10-28 15:15:53 +08:00
boqiu
5a48c5ba94 Mark peer connected if FileAnnouncement RPC message received 2024-10-28 15:15:19 +08:00
boqiu
5409d8f418 do not propagate FindFile to whole network 2024-10-28 15:14:52 +08:00
boqiu
e338dca318 Disable sequential sync and store new file in v2 sync store 2024-10-28 15:12:56 +08:00
boqiu
57ba8f9bb7 handle NewFile in sync servic to write in db 2024-10-28 15:11:33 +08:00
boqiu
e15143e0a4 Add new P2P protocol NewFile 2024-10-28 15:03:14 +08:00
14 changed files with 185 additions and 46 deletions

3
Cargo.lock generated
View File

@ -226,6 +226,7 @@ dependencies = [
"itertools 0.13.0", "itertools 0.13.0",
"lazy_static", "lazy_static",
"lru 0.12.5", "lru 0.12.5",
"metrics",
"once_cell", "once_cell",
"serde", "serde",
"tiny-keccak", "tiny-keccak",
@ -7300,8 +7301,10 @@ dependencies = [
"kvdb", "kvdb",
"kvdb-memorydb", "kvdb-memorydb",
"kvdb-rocksdb", "kvdb-rocksdb",
"lazy_static",
"merkle_light", "merkle_light",
"merkle_tree", "merkle_tree",
"metrics",
"once_cell", "once_cell",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"rand 0.8.5", "rand 0.8.5",

View File

@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
tracing = "0.1.36" tracing = "0.1.36"
once_cell = "1.19.0" once_cell = "1.19.0"
metrics = { workspace = true }
itertools = "0.13.0" itertools = "0.13.0"
lru = "0.12.5" lru = "0.12.5"

View File

@ -1,4 +1,5 @@
mod merkle_tree; mod merkle_tree;
mod metrics;
mod node_manager; mod node_manager;
mod proof; mod proof;
mod sha3; mod sha3;
@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug; use std::fmt::Debug;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tracing::{trace, warn}; use tracing::{trace, warn};
use crate::merkle_tree::MerkleTreeWrite; use crate::merkle_tree::MerkleTreeWrite;
@ -145,6 +147,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
} }
pub fn append(&mut self, new_leaf: E) { pub fn append(&mut self, new_leaf: E) {
let start_time = Instant::now();
if new_leaf == E::null() { if new_leaf == E::null() {
// appending null is not allowed. // appending null is not allowed.
return; return;
@ -152,10 +155,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.node_manager.start_transaction(); self.node_manager.start_transaction();
self.node_manager.push_node(0, new_leaf); self.node_manager.push_node(0, new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1); self.recompute_after_append_leaves(self.leaves() - 1);
self.node_manager.commit(); self.node_manager.commit();
metrics::APPEND.update_since(start_time);
} }
pub fn append_list(&mut self, leaf_list: Vec<E>) { pub fn append_list(&mut self, leaf_list: Vec<E>) {
let start_time = Instant::now();
if leaf_list.contains(&E::null()) { if leaf_list.contains(&E::null()) {
// appending null is not allowed. // appending null is not allowed.
return; return;
@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.node_manager.append_nodes(0, &leaf_list); self.node_manager.append_nodes(0, &leaf_list);
self.recompute_after_append_leaves(start_index); self.recompute_after_append_leaves(start_index);
self.node_manager.commit(); self.node_manager.commit();
metrics::APPEND_LIST.update_since(start_time);
} }
/// Append a leaf list by providing their intermediate node hash. /// Append a leaf list by providing their intermediate node hash.
@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Other nodes in the subtree will be set to `null` nodes. /// Other nodes in the subtree will be set to `null` nodes.
/// TODO: Optimize to avoid storing the `null` nodes? /// TODO: Optimize to avoid storing the `null` nodes?
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> { pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
let start_time = Instant::now();
if subtree_root == E::null() { if subtree_root == E::null() {
// appending null is not allowed. // appending null is not allowed.
bail!("subtree_root is null"); bail!("subtree_root is null");
@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.append_subtree_inner(subtree_depth, subtree_root)?; self.append_subtree_inner(subtree_depth, subtree_root)?;
self.recompute_after_append_subtree(start_index, subtree_depth - 1); self.recompute_after_append_subtree(start_index, subtree_depth - 1);
self.node_manager.commit(); self.node_manager.commit();
metrics::APPEND_SUBTREE.update_since(start_time);
Ok(()) Ok(())
} }
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> { pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
let start_time = Instant::now();
if subtree_list.iter().any(|(_, root)| root == &E::null()) { if subtree_list.iter().any(|(_, root)| root == &E::null()) {
// appending null is not allowed. // appending null is not allowed.
bail!("subtree_list contains null"); bail!("subtree_list contains null");
@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.recompute_after_append_subtree(start_index, subtree_depth - 1); self.recompute_after_append_subtree(start_index, subtree_depth - 1);
} }
self.node_manager.commit(); self.node_manager.commit();
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(()) Ok(())
} }
/// Change the value of the last leaf and return the new merkle root. /// Change the value of the last leaf and return the new merkle root.
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves. /// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
pub fn update_last(&mut self, updated_leaf: E) { pub fn update_last(&mut self, updated_leaf: E) {
let start_time = Instant::now();
if updated_leaf == E::null() { if updated_leaf == E::null() {
// updating to null is not allowed. // updating to null is not allowed.
return; return;
@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
} }
self.recompute_after_append_leaves(self.leaves() - 1); self.recompute_after_append_leaves(self.leaves() - 1);
self.node_manager.commit(); self.node_manager.commit();
metrics::UPDATE_LAST.update_since(start_time);
} }
/// Fill an unknown `null` leaf with its real value. /// Fill an unknown `null` leaf with its real value.

View File

@ -0,0 +1,11 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
}

View File

@ -1,5 +1,5 @@
use crate::sync_manager::log_query::LogQuery; use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS; use crate::sync_manager::{metrics, RETRY_WAIT_MS};
use crate::ContractAddress; use crate::ContractAddress;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm}; use append_merkle::{Algorithm, Sha3Algorithm};
@ -14,15 +14,12 @@ use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::{ use tokio::sync::{
sync::{ mpsc::{UnboundedReceiver, UnboundedSender},
mpsc::{UnboundedReceiver, UnboundedSender}, RwLock,
RwLock,
},
time::Instant,
}; };
pub struct LogEntryFetcher { pub struct LogEntryFetcher {
@ -242,6 +239,7 @@ impl LogEntryFetcher {
); );
let (mut block_hash_sent, mut block_number_sent) = (None, None); let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await { while let Some(maybe_log) = stream.next().await {
let start_time = Instant::now();
match maybe_log { match maybe_log {
Ok(log) => { Ok(log) => {
let sync_progress = let sync_progress =
@ -301,6 +299,7 @@ impl LogEntryFetcher {
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await; tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
} }
} }
metrics::RECOVER_LOG.update_since(start_time);
} }
info!("log recover end"); info!("log recover end");

View File

@ -1,7 +1,13 @@
use std::sync::Arc; use std::sync::Arc;
use metrics::{register_timer, Timer}; use metrics::{register_timer, Gauge, GaugeUsize, Timer};
lazy_static::lazy_static! { lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx"); pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
} }

View File

@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
// Each tx has less than 10KB, so the cache size should be acceptable. // Each tx has less than 10KB, so the cache size should be acceptable.
const BROADCAST_CHANNEL_CAPACITY: usize = 25000; const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10; const CATCH_UP_END_GAP: u64 = 10;
const CHECK_ROOT_INTERVAL: u64 = 500;
/// Errors while handle data /// Errors while handle data
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -408,6 +409,7 @@ impl LogSyncManager {
} }
LogFetchProgress::Transaction((tx, block_number)) => { LogFetchProgress::Transaction((tx, block_number)) => {
let mut stop = false; let mut stop = false;
let start_time = Instant::now();
match self.put_tx(tx.clone()).await { match self.put_tx(tx.clone()).await {
Some(false) => stop = true, Some(false) => stop = true,
Some(true) => { Some(true) => {
@ -441,6 +443,8 @@ impl LogSyncManager {
// no receivers will be created. // no receivers will be created.
warn!("log sync broadcast error, error={:?}", e); warn!("log sync broadcast error, error={:?}", e);
} }
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
} }
LogFetchProgress::Reverted(reverted) => { LogFetchProgress::Reverted(reverted) => {
self.process_reverted(reverted).await; self.process_reverted(reverted).await;
@ -453,7 +457,6 @@ impl LogSyncManager {
async fn put_tx_inner(&mut self, tx: Transaction) -> bool { async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
let start_time = Instant::now(); let start_time = Instant::now();
let result = self.store.put_tx(tx.clone()); let result = self.store.put_tx(tx.clone());
metrics::STORE_PUT_TX.update_since(start_time);
if let Err(e) = result { if let Err(e) = result {
error!("put_tx error: e={:?}", e); error!("put_tx error: e={:?}", e);
@ -513,38 +516,45 @@ impl LogSyncManager {
// Check if the computed data root matches on-chain state. // Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly. // If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract(); if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
match flow_contract let flow_contract = self.log_fetcher.flow_contract();
.get_flow_root_by_tx_seq(tx.seq.into())
.call() match flow_contract
.await .get_flow_root_by_tx_seq(tx.seq.into())
{ .call()
Ok(contract_root_bytes) => { .await
let contract_root = H256::from_slice(&contract_root_bytes); {
// contract_root is zero for tx submitted before upgrading. Ok(contract_root_bytes) => {
if !contract_root.is_zero() { let contract_root = H256::from_slice(&contract_root_bytes);
match self.store.get_context() { // contract_root is zero for tx submitted before upgrading.
Ok((local_root, _)) => { if !contract_root.is_zero() {
if contract_root != local_root { match self.store.get_context() {
error!( Ok((local_root, _)) => {
?contract_root, if contract_root != local_root {
?local_root, error!(
"local flow root and on-chain flow root mismatch" ?contract_root,
); ?local_root,
return false; "local flow root and on-chain flow root mismatch"
);
return false;
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
} }
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
} }
} }
} }
} Err(e) => {
Err(e) => { warn!(?e, "fail to read the on-chain flow root");
warn!(?e, "fail to read the on-chain flow root"); }
} }
} }
metrics::STORE_PUT_TX_SPEED_IN_BYTES
.update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
metrics::STORE_PUT_TX.update_since(start_time);
true true
} }
} }

View File

@ -31,6 +31,8 @@ parking_lot = "0.12.3"
serde_json = "1.0.127" serde_json = "1.0.127"
tokio = { version = "1.38.0", features = ["full"] } tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
lazy_static = "1.4.0"
metrics = { workspace = true }
once_cell = { version = "1.19.0", features = [] } once_cell = { version = "1.19.0", features = [] }
[dev-dependencies] [dev-dependencies]

View File

@ -1,13 +1,14 @@
use super::load_chunk::EntryBatch;
use super::seal_task_manager::SealTaskManager;
use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig; use crate::config::ShardConfig;
use crate::error::Error; use crate::error::Error;
use crate::log_store::load_chunk::EntryBatch;
use crate::log_store::log_manager::{ use crate::log_store::log_manager::{
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE, COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
}; };
use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::log_store::seal_task_manager::SealTaskManager;
use crate::log_store::{
metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
};
use crate::{try_option, ZgsKeyValueDB}; use crate::{try_option, ZgsKeyValueDB};
use any::Any; use any::Any;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
@ -22,6 +23,7 @@ use std::cmp::Ordering;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use std::{any, cmp, mem}; use std::{any, cmp, mem};
use tracing::{debug, error, trace}; use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
@ -42,7 +44,11 @@ impl FlowStore {
} }
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> { pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
self.db.put_batch_root_list(root_map) let start_time = Instant::now();
let res = self.db.put_batch_root_list(root_map);
metrics::PUT_BATCH_ROOT_LIST.update_since(start_time);
res
} }
pub fn insert_subtree_list_for_batch( pub fn insert_subtree_list_for_batch(
@ -50,6 +56,7 @@ impl FlowStore {
batch_index: usize, batch_index: usize,
subtree_list: Vec<(usize, usize, DataRoot)>, subtree_list: Vec<(usize, usize, DataRoot)>,
) -> Result<()> { ) -> Result<()> {
let start_time = Instant::now();
let mut batch = self let mut batch = self
.db .db
.get_entry_batch(batch_index as u64)? .get_entry_batch(batch_index as u64)?
@ -57,6 +64,8 @@ impl FlowStore {
batch.set_subtree_list(subtree_list); batch.set_subtree_list(subtree_list);
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?; self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
Ok(()) Ok(())
} }
@ -75,7 +84,10 @@ impl FlowStore {
} }
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> { pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
self.db.put_mpt_node_list(node_list) let start_time = Instant::now();
let res = self.db.put_mpt_node_list(node_list);
metrics::PUT_MPT_NODE.update_since(start_time);
res
} }
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> { pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
@ -227,6 +239,7 @@ impl FlowWrite for FlowStore {
/// Return the roots of completed chunks. The order is guaranteed to be increasing /// Return the roots of completed chunks. The order is guaranteed to be increasing
/// by chunk index. /// by chunk index.
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> { fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut to_seal_set = self.seal_manager.to_seal_set.write(); let mut to_seal_set = self.seal_manager.to_seal_set.write();
trace!("append_entries: {} {}", data.start_index, data.data.len()); trace!("append_entries: {} {}", data.start_index, data.data.len());
if data.data.len() % BYTES_PER_SECTOR != 0 { if data.data.len() % BYTES_PER_SECTOR != 0 {
@ -269,6 +282,8 @@ impl FlowWrite for FlowStore {
batch_list.push((chunk_index, batch)); batch_list.push((chunk_index, batch));
} }
metrics::APPEND_ENTRIES.update_since(start_time);
self.db.put_entry_batch_list(batch_list) self.db.put_entry_batch_list(batch_list)
} }
@ -378,6 +393,7 @@ impl FlowDBStore {
&self, &self,
batch_list: Vec<(u64, EntryBatch)>, batch_list: Vec<(u64, EntryBatch)>,
) -> Result<Vec<(u64, DataRoot)>> { ) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut completed_batches = Vec::new(); let mut completed_batches = Vec::new();
let mut tx = self.kvdb.transaction(); let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list { for (batch_index, batch) in batch_list {
@ -398,6 +414,7 @@ impl FlowDBStore {
} }
} }
self.kvdb.write(tx)?; self.kvdb.write(tx)?;
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
Ok(completed_batches) Ok(completed_batches)
} }

View File

@ -1,5 +1,3 @@
use super::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig; use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore}; use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
use crate::log_store::tx_store::TransactionStore; use crate::log_store::tx_store::TransactionStore;
@ -26,8 +24,13 @@ use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
use crate::log_store::metrics;
use crate::log_store::tx_store::BlockHashAndSubmissionIndex;
use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
/// 256 Bytes /// 256 Bytes
pub const ENTRY_SIZE: usize = 256; pub const ENTRY_SIZE: usize = 256;
/// 1024 Entries. /// 1024 Entries.
@ -189,6 +192,7 @@ impl LogStoreChunkWrite for LogManager {
chunks: ChunkArray, chunks: ChunkArray,
maybe_file_proof: Option<FlowProof>, maybe_file_proof: Option<FlowProof>,
) -> Result<bool> { ) -> Result<bool> {
let start_time = Instant::now();
let mut merkle = self.merkle.write(); let mut merkle = self.merkle.write();
let tx = self let tx = self
.tx_store .tx_store
@ -221,6 +225,7 @@ impl LogStoreChunkWrite for LogManager {
)?; )?;
self.flow_store.put_mpt_node_list(updated_node_list)?; self.flow_store.put_mpt_node_list(updated_node_list)?;
} }
metrics::PUT_CHUNKS.update_since(start_time);
Ok(true) Ok(true)
} }
@ -251,6 +256,7 @@ impl LogStoreWrite for LogManager {
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds. /// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
/// ///
fn put_tx(&self, tx: Transaction) -> Result<()> { fn put_tx(&self, tx: Transaction) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write(); let mut merkle = self.merkle.write();
debug!("put_tx: tx={:?}", tx); debug!("put_tx: tx={:?}", tx);
let expected_seq = self.tx_store.next_tx_seq(); let expected_seq = self.tx_store.next_tx_seq();
@ -280,6 +286,7 @@ impl LogStoreWrite for LogManager {
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?; self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
} }
} }
metrics::PUT_TX.update_since(start_time);
Ok(()) Ok(())
} }
@ -310,6 +317,7 @@ impl LogStoreWrite for LogManager {
} }
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> { fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
let start_time = Instant::now();
trace!( trace!(
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}", "finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
tx_seq, tx_seq,
@ -338,6 +346,7 @@ impl LogStoreWrite for LogManager {
if same_root_seq_list.first() == Some(&tx_seq) { if same_root_seq_list.first() == Some(&tx_seq) {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?; self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
} }
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
Ok(true) Ok(true)
} else { } else {
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq) bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
@ -875,6 +884,7 @@ impl LogManager {
if merkle_list.is_empty() { if merkle_list.is_empty() {
return Ok(()); return Ok(());
} }
let start_time = Instant::now();
self.pad_tx(tx_start_index, &mut *merkle)?; self.pad_tx(tx_start_index, &mut *merkle)?;
@ -920,12 +930,15 @@ impl LogManager {
} }
} }
self.flow_store.put_batch_root_list(batch_root_map)?; self.flow_store.put_batch_root_list(batch_root_map)?;
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(()) Ok(())
} }
#[instrument(skip(self, merkle))] #[instrument(skip(self, merkle))]
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> { fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow. // Check if we need to pad the flow.
let start_time = Instant::now();
let mut tx_start_flow_index = let mut tx_start_flow_index =
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64; merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
let pad_size = tx_start_index - tx_start_flow_index; let pad_size = tx_start_index - tx_start_flow_index;
@ -1014,6 +1027,8 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(), merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves() merkle.last_chunk_merkle.leaves()
); );
metrics::PAD_TX.update_since(start_time);
Ok(()) Ok(())
} }
@ -1142,6 +1157,8 @@ impl LogManager {
} }
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> { fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write(); let mut merkle = self.merkle.write();
let shard_config = self.flow_store.get_shard_config(); let shard_config = self.flow_store.get_shard_config();
// We have all the data need for this tx, so just copy them. // We have all the data need for this tx, so just copy them.
@ -1190,6 +1207,8 @@ impl LogManager {
for (seq, _) in to_tx_offset_list { for (seq, _) in to_tx_offset_list {
self.tx_store.finalize_tx(seq)?; self.tx_store.finalize_tx(seq)?;
} }
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
Ok(()) Ok(())
} }
@ -1262,6 +1281,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
} }
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> { pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
let start_time = Instant::now();
if leaf_data.len() % ENTRY_SIZE != 0 { if leaf_data.len() % ENTRY_SIZE != 0 {
bail!("merkle_tree: mismatched data size"); bail!("merkle_tree: mismatched data size");
} }
@ -1277,6 +1297,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
.map(Sha3Algorithm::leaf) .map(Sha3Algorithm::leaf)
.collect() .collect()
}; };
metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
Ok(r) Ok(r)
} }

View File

@ -0,0 +1,39 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
pub static ref PUT_CHUNKS: Arc<dyn Timer> = register_timer("log_store_put_chunks");
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
register_timer("log_store_log_manager_check_tx_completed");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_log_manager_append_subtree_list");
pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
register_timer("log_store_log_manager_data_to_merkle_leaves");
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
register_timer("log_store_log_manager_copy_tx_and_finalize");
pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_insert_subtree_list");
pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_put_entry_batch_list");
pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
pub static ref FINALIZE_TX_WITH_HASH: Arc<dyn Timer> = register_timer("log_store_log_manager_finalize_tx_with_hash");
}

View File

@ -15,6 +15,7 @@ pub mod config;
mod flow_store; mod flow_store;
mod load_chunk; mod load_chunk;
pub mod log_manager; pub mod log_manager;
mod metrics;
mod seal_task_manager; mod seal_task_manager;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;

View File

@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED, data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE, COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
}; };
use crate::log_store::metrics;
use crate::{try_option, LogManager, ZgsKeyValueDB}; use crate::{try_option, LogManager, ZgsKeyValueDB};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm}; use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tracing::{error, instrument}; use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
@ -51,6 +53,8 @@ impl TransactionStore {
#[instrument(skip(self))] #[instrument(skip(self))]
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root. /// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> { pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
let start_time = Instant::now();
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?; let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) { if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
// The last tx is inserted again, so no need to process it. // The last tx is inserted again, so no need to process it.
@ -86,6 +90,7 @@ impl TransactionStore {
); );
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
self.kvdb.write(db_tx)?; self.kvdb.write(db_tx)?;
metrics::TX_STORE_PUT.update_since(start_time);
Ok(old_tx_seq_list) Ok(old_tx_seq_list)
} }
@ -175,8 +180,12 @@ impl TransactionStore {
} }
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> { pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? let start_time = Instant::now();
== Some(vec![TX_STATUS_FINALIZED])) let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]);
metrics::CHECK_TX_COMPLETED.update_since(start_time);
Ok(res)
} }
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> { pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {

View File

@ -3,6 +3,7 @@
from test_framework.test_framework import TestFramework from test_framework.test_framework import TestFramework
from utility.utils import wait_until from utility.utils import wait_until
class AutoRandomSyncV2Test(TestFramework): class AutoRandomSyncV2Test(TestFramework):
def setup_params(self): def setup_params(self):
self.num_nodes = 4 self.num_nodes = 4
@ -30,5 +31,6 @@ class AutoRandomSyncV2Test(TestFramework):
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__": if __name__ == "__main__":
AutoRandomSyncV2Test().main() AutoRandomSyncV2Test().main()