mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
add detailed metrics in slow operations
This commit is contained in:
parent
9eea71e97d
commit
f0c7728c38
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -7300,8 +7300,10 @@ dependencies = [
|
|||||||
"kvdb",
|
"kvdb",
|
||||||
"kvdb-memorydb",
|
"kvdb-memorydb",
|
||||||
"kvdb-rocksdb",
|
"kvdb-rocksdb",
|
||||||
|
"lazy_static",
|
||||||
"merkle_light",
|
"merkle_light",
|
||||||
"merkle_tree",
|
"merkle_tree",
|
||||||
|
"metrics",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot 0.12.3",
|
"parking_lot 0.12.3",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
|
@ -31,6 +31,8 @@ parking_lot = "0.12.3"
|
|||||||
serde_json = "1.0.127"
|
serde_json = "1.0.127"
|
||||||
tokio = { version = "1.38.0", features = ["full"] }
|
tokio = { version = "1.38.0", features = ["full"] }
|
||||||
task_executor = { path = "../../common/task_executor" }
|
task_executor = { path = "../../common/task_executor" }
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
metrics = { workspace = true }
|
||||||
once_cell = { version = "1.19.0", features = [] }
|
once_cell = { version = "1.19.0", features = [] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -26,6 +26,7 @@ use std::cmp::Ordering;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use tracing::{debug, error, info, instrument, trace, warn};
|
use tracing::{debug, error, info, instrument, trace, warn};
|
||||||
|
|
||||||
/// 256 Bytes
|
/// 256 Bytes
|
||||||
@ -861,6 +862,7 @@ impl LogManager {
|
|||||||
if merkle_list.is_empty() {
|
if merkle_list.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
self.pad_tx(tx_start_index, &mut *merkle)?;
|
self.pad_tx(tx_start_index, &mut *merkle)?;
|
||||||
|
|
||||||
@ -896,6 +898,8 @@ impl LogManager {
|
|||||||
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1107,6 +1111,8 @@ impl LogManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
let shard_config = self.flow_store.get_shard_config();
|
let shard_config = self.flow_store.get_shard_config();
|
||||||
// We have all the data need for this tx, so just copy them.
|
// We have all the data need for this tx, so just copy them.
|
||||||
@ -1155,6 +1161,8 @@ impl LogManager {
|
|||||||
for (seq, _) in to_tx_offset_list {
|
for (seq, _) in to_tx_offset_list {
|
||||||
self.tx_store.finalize_tx(seq)?;
|
self.tx_store.finalize_tx(seq)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
14
node/storage/src/log_store/metrics.rs
Normal file
14
node/storage/src/log_store/metrics.rs
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use metrics::{register_timer, Timer};
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
|
||||||
|
|
||||||
|
|
||||||
|
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_log_manager_append_subtree_list");
|
||||||
|
|
||||||
|
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_log_manager_copy_tx_and_finalize");
|
||||||
|
}
|
@ -16,6 +16,7 @@ mod flow_store;
|
|||||||
mod load_chunk;
|
mod load_chunk;
|
||||||
pub mod log_manager;
|
pub mod log_manager;
|
||||||
mod seal_task_manager;
|
mod seal_task_manager;
|
||||||
|
mod metrics;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
pub mod tx_store;
|
pub mod tx_store;
|
||||||
|
@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
|
|||||||
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
|
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
|
||||||
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||||
};
|
};
|
||||||
|
use crate::log_store::metrics;
|
||||||
use crate::{try_option, LogManager, ZgsKeyValueDB};
|
use crate::{try_option, LogManager, ZgsKeyValueDB};
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||||
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use tracing::{error, instrument};
|
use tracing::{error, instrument};
|
||||||
|
|
||||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
||||||
@ -51,6 +53,8 @@ impl TransactionStore {
|
|||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
|
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
|
||||||
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
|
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
|
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
|
||||||
// The last tx is inserted again, so no need to process it.
|
// The last tx is inserted again, so no need to process it.
|
||||||
@ -86,6 +90,7 @@ impl TransactionStore {
|
|||||||
);
|
);
|
||||||
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
|
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
|
||||||
self.kvdb.write(db_tx)?;
|
self.kvdb.write(db_tx)?;
|
||||||
|
metrics::TX_STORE_PUT.update_since(start_time);
|
||||||
Ok(old_tx_seq_list)
|
Ok(old_tx_seq_list)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user