Compare commits

...

4 Commits

Author SHA1 Message Date
Peter Zhang
b3fbaa4306 add detailed metrics in slow operations 2024-10-25 13:03:45 +08:00
Peter Zhang
95efa47517 add detailed metrics in slow operations 2024-10-25 11:59:26 +08:00
Peter Zhang
09c6bc2da1 format code 2024-10-24 14:47:57 +08:00
Peter Zhang
09095c245a add detailed metrics in slow operations 2024-10-24 13:42:03 +08:00
10 changed files with 118 additions and 11 deletions

3
Cargo.lock generated
View File

@ -224,6 +224,7 @@ dependencies = [
"eth2_ssz_derive",
"ethereum-types 0.14.1",
"lazy_static",
"metrics",
"once_cell",
"serde",
"tiny-keccak",
@ -7272,8 +7273,10 @@ dependencies = [
"kvdb",
"kvdb-memorydb",
"kvdb-rocksdb",
"lazy_static",
"merkle_light",
"merkle_tree",
"metrics",
"parking_lot 0.12.3",
"rand 0.8.5",
"rayon",

View File

@ -12,4 +12,6 @@ eth2_ssz_derive = "0.3.0"
serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
tracing = "0.1.36"
once_cell = "1.19.0"
once_cell = "1.19.0"
metrics = { workspace = true }

View File

@ -1,4 +1,5 @@
mod merkle_tree;
mod metrics;
mod proof;
mod sha3;
@ -7,6 +8,7 @@ use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::time::Instant;
use tracing::{trace, warn};
pub use crate::merkle_tree::{
@ -138,15 +140,18 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn append(&mut self, new_leaf: E) {
let start_time = Instant::now();
if new_leaf == E::null() {
// appending null is not allowed.
return;
}
self.layers[0].push(new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1);
metrics::APPEND.update_since(start_time);
}
pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
let start_time = Instant::now();
if leaf_list.contains(&E::null()) {
// appending null is not allowed.
return;
@ -154,6 +159,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
let start_index = self.leaves();
self.layers[0].append(&mut leaf_list);
self.recompute_after_append_leaves(start_index);
metrics::APPEND_LIST.update_since(start_time);
}
/// Append a leaf list by providing their intermediate node hash.
@ -162,6 +169,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Other nodes in the subtree will be set to `null` nodes.
/// TODO: Optimize to avoid storing the `null` nodes?
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
let start_time = Instant::now();
if subtree_root == E::null() {
// appending null is not allowed.
bail!("subtree_root is null");
@ -169,10 +177,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
let start_index = self.leaves();
self.append_subtree_inner(subtree_depth, subtree_root)?;
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
metrics::APPEND_SUBTREE.update_since(start_time);
Ok(())
}
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
let start_time = Instant::now();
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
// appending null is not allowed.
bail!("subtree_list contains null");
@ -182,12 +193,14 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.append_subtree_inner(subtree_depth, subtree_root)?;
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
}
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(())
}
/// Change the value of the last leaf and return the new merkle root.
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
pub fn update_last(&mut self, updated_leaf: E) {
let start_time = Instant::now();
if updated_leaf == E::null() {
// updating to null is not allowed.
return;
@ -199,6 +212,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
*self.layers[0].last_mut().unwrap() = updated_leaf;
}
self.recompute_after_append_leaves(self.leaves() - 1);
metrics::UPDATE_LAST.update_since(start_time);
}
/// Fill an unknown `null` leaf with its real value.

View File

@ -0,0 +1,11 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
}

View File

@ -31,6 +31,8 @@ parking_lot = "0.12.3"
serde_json = "1.0.127"
tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" }
lazy_static = "1.4.0"
metrics = { workspace = true }
[dev-dependencies]
rand = "0.8.5"

View File

@ -1,13 +1,14 @@
use super::load_chunk::EntryBatch;
use super::seal_task_manager::SealTaskManager;
use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::error::Error;
use crate::log_store::load_chunk::EntryBatch;
use crate::log_store::log_manager::{
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
};
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::log_store::seal_task_manager::SealTaskManager;
use crate::log_store::{
metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
};
use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
@ -20,6 +21,7 @@ use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
use std::{cmp, mem};
use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
@ -40,7 +42,11 @@ impl FlowStore {
}
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
self.db.put_batch_root_list(root_map)
let start_time = Instant::now();
let res = self.db.put_batch_root_list(root_map);
metrics::PUT_BATCH_ROOT_LIST.update_since(start_time);
res
}
pub fn insert_subtree_list_for_batch(
@ -48,6 +54,7 @@ impl FlowStore {
batch_index: usize,
subtree_list: Vec<(usize, usize, DataRoot)>,
) -> Result<()> {
let start_time = Instant::now();
let mut batch = self
.db
.get_entry_batch(batch_index as u64)?
@ -55,6 +62,8 @@ impl FlowStore {
batch.set_subtree_list(subtree_list);
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
Ok(())
}
@ -73,7 +82,10 @@ impl FlowStore {
}
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
self.db.put_mpt_node_list(node_list)
let start_time = Instant::now();
let res = self.db.put_mpt_node_list(node_list);
metrics::PUT_MPT_NODE.update_since(start_time);
res
}
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
@ -222,6 +234,7 @@ impl FlowWrite for FlowStore {
/// Return the roots of completed chunks. The order is guaranteed to be increasing
/// by chunk index.
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut to_seal_set = self.seal_manager.to_seal_set.write();
trace!("append_entries: {} {}", data.start_index, data.data.len());
if data.data.len() % BYTES_PER_SECTOR != 0 {
@ -264,6 +277,8 @@ impl FlowWrite for FlowStore {
batch_list.push((chunk_index, batch));
}
metrics::APPEND_ENTRIES.update_since(start_time);
self.db.put_entry_batch_list(batch_list)
}
@ -373,6 +388,7 @@ impl FlowDBStore {
&self,
batch_list: Vec<(u64, EntryBatch)>,
) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut completed_batches = Vec::new();
let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list {
@ -393,6 +409,7 @@ impl FlowDBStore {
}
}
self.kvdb.write(tx)?;
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
Ok(completed_batches)
}

View File

@ -23,10 +23,12 @@ use std::collections::BTreeMap;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::log_store::metrics;
use crate::log_store::tx_store::BlockHashAndSubmissionIndex;
use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
/// 256 Bytes
pub const ENTRY_SIZE: usize = 256;
@ -880,6 +882,7 @@ impl LogManager {
if merkle_list.is_empty() {
return Ok(());
}
let start_time = Instant::now();
self.pad_tx(tx_start_index, &mut *merkle)?;
@ -925,12 +928,15 @@ impl LogManager {
}
}
self.flow_store.put_batch_root_list(batch_root_map)?;
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(())
}
#[instrument(skip(self, merkle))]
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow.
let start_time = Instant::now();
let mut tx_start_flow_index =
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
let pad_size = tx_start_index - tx_start_flow_index;
@ -1020,6 +1026,8 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
metrics::PAD_TX.update_since(start_time);
Ok(())
}
@ -1148,6 +1156,8 @@ impl LogManager {
}
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
let shard_config = self.flow_store.get_shard_config();
// We have all the data need for this tx, so just copy them.
@ -1196,6 +1206,8 @@ impl LogManager {
for (seq, _) in to_tx_offset_list {
self.tx_store.finalize_tx(seq)?;
}
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
Ok(())
}
@ -1268,6 +1280,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
}
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
let start_time = Instant::now();
if leaf_data.len() % ENTRY_SIZE != 0 {
bail!("merkle_tree: mismatched data size");
}
@ -1283,6 +1296,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
.map(Sha3Algorithm::leaf)
.collect()
};
metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
Ok(r)
}

View File

@ -0,0 +1,33 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
register_timer("log_store_log_manager_check_tx_completed");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_log_manager_append_subtree_list");
pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
register_timer("log_store_log_manager_data_to_merkle_leaves");
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
register_timer("log_store_log_manager_copy_tx_and_finalize");
pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_insert_subtree_list");
pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_put_entry_batch_list");
pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
}

View File

@ -15,6 +15,7 @@ pub mod config;
mod flow_store;
mod load_chunk;
pub mod log_manager;
mod metrics;
mod seal_task_manager;
#[cfg(test)]
mod tests;

View File

@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
};
use crate::log_store::metrics;
use crate::{try_option, LogManager, ZgsKeyValueDB};
use anyhow::{anyhow, Result};
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
@ -51,6 +53,8 @@ impl TransactionStore {
#[instrument(skip(self))]
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
let start_time = Instant::now();
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
// The last tx is inserted again, so no need to process it.
@ -86,6 +90,7 @@ impl TransactionStore {
);
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
self.kvdb.write(db_tx)?;
metrics::TX_STORE_PUT.update_since(start_time);
Ok(old_tx_seq_list)
}
@ -175,8 +180,12 @@ impl TransactionStore {
}
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]))
let start_time = Instant::now();
let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]);
metrics::CHECK_TX_COMPLETED.update_since(start_time);
Ok(res)
}
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {