mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
add detailed metrics for storage layer
This commit is contained in:
parent
05381ab347
commit
63f0f1a833
@ -1,7 +1,11 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use metrics::{register_timer, Timer};
|
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
|
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
|
||||||
|
|
||||||
|
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
|
||||||
|
|
||||||
|
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
|
||||||
}
|
}
|
||||||
|
@ -408,6 +408,7 @@ impl LogSyncManager {
|
|||||||
}
|
}
|
||||||
LogFetchProgress::Transaction((tx, block_number)) => {
|
LogFetchProgress::Transaction((tx, block_number)) => {
|
||||||
let mut stop = false;
|
let mut stop = false;
|
||||||
|
let start_time = Instant::now();
|
||||||
match self.put_tx(tx.clone()).await {
|
match self.put_tx(tx.clone()).await {
|
||||||
Some(false) => stop = true,
|
Some(false) => stop = true,
|
||||||
Some(true) => {
|
Some(true) => {
|
||||||
@ -441,6 +442,8 @@ impl LogSyncManager {
|
|||||||
// no receivers will be created.
|
// no receivers will be created.
|
||||||
warn!("log sync broadcast error, error={:?}", e);
|
warn!("log sync broadcast error, error={:?}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
|
||||||
}
|
}
|
||||||
LogFetchProgress::Reverted(reverted) => {
|
LogFetchProgress::Reverted(reverted) => {
|
||||||
self.process_reverted(reverted).await;
|
self.process_reverted(reverted).await;
|
||||||
@ -453,7 +456,6 @@ impl LogSyncManager {
|
|||||||
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
|
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
let result = self.store.put_tx(tx.clone());
|
let result = self.store.put_tx(tx.clone());
|
||||||
metrics::STORE_PUT_TX.update_since(start_time);
|
|
||||||
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
error!("put_tx error: e={:?}", e);
|
error!("put_tx error: e={:?}", e);
|
||||||
@ -514,6 +516,7 @@ impl LogSyncManager {
|
|||||||
// Check if the computed data root matches on-chain state.
|
// Check if the computed data root matches on-chain state.
|
||||||
// If the call fails, we won't check the root here and return `true` directly.
|
// If the call fails, we won't check the root here and return `true` directly.
|
||||||
let flow_contract = self.log_fetcher.flow_contract();
|
let flow_contract = self.log_fetcher.flow_contract();
|
||||||
|
|
||||||
match flow_contract
|
match flow_contract
|
||||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||||
.call()
|
.call()
|
||||||
@ -545,6 +548,10 @@ impl LogSyncManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::STORE_PUT_TX_SPEED_IN_BYTES
|
||||||
|
.update((tx.size / start_time.elapsed().as_secs()) as usize);
|
||||||
|
metrics::STORE_PUT_TX.update_since(start_time);
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -252,6 +252,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
|
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
|
||||||
///
|
///
|
||||||
fn put_tx(&self, tx: Transaction) -> Result<()> {
|
fn put_tx(&self, tx: Transaction) -> Result<()> {
|
||||||
|
let start_time = Instant::now();
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
debug!("put_tx: tx={:?}", tx);
|
debug!("put_tx: tx={:?}", tx);
|
||||||
let expected_seq = self.tx_store.next_tx_seq();
|
let expected_seq = self.tx_store.next_tx_seq();
|
||||||
@ -281,6 +282,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
metrics::PUT_TX.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,6 +3,8 @@ use std::sync::Arc;
|
|||||||
use metrics::{register_timer, Timer};
|
use metrics::{register_timer, Timer};
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
|
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
|
||||||
|
|
||||||
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
|
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
|
||||||
|
|
||||||
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
|
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
|
||||||
|
Loading…
Reference in New Issue
Block a user