mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Merge 0443547147
into 9b68a8b7d7
This commit is contained in:
commit
ab7bdf908a
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -226,6 +226,7 @@ dependencies = [
|
|||||||
"itertools 0.13.0",
|
"itertools 0.13.0",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"lru 0.12.5",
|
"lru 0.12.5",
|
||||||
|
"metrics",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"serde",
|
"serde",
|
||||||
"tiny-keccak",
|
"tiny-keccak",
|
||||||
@ -7300,8 +7301,10 @@ dependencies = [
|
|||||||
"kvdb",
|
"kvdb",
|
||||||
"kvdb-memorydb",
|
"kvdb-memorydb",
|
||||||
"kvdb-rocksdb",
|
"kvdb-rocksdb",
|
||||||
|
"lazy_static",
|
||||||
"merkle_light",
|
"merkle_light",
|
||||||
"merkle_tree",
|
"merkle_tree",
|
||||||
|
"metrics",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot 0.12.3",
|
"parking_lot 0.12.3",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
|
@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
|
|||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
tracing = "0.1.36"
|
tracing = "0.1.36"
|
||||||
once_cell = "1.19.0"
|
once_cell = "1.19.0"
|
||||||
|
|
||||||
|
metrics = { workspace = true }
|
||||||
|
|
||||||
itertools = "0.13.0"
|
itertools = "0.13.0"
|
||||||
lru = "0.12.5"
|
lru = "0.12.5"
|
@ -1,4 +1,5 @@
|
|||||||
mod merkle_tree;
|
mod merkle_tree;
|
||||||
|
mod metrics;
|
||||||
mod node_manager;
|
mod node_manager;
|
||||||
mod proof;
|
mod proof;
|
||||||
mod sha3;
|
mod sha3;
|
||||||
@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
|
|||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use tracing::{trace, warn};
|
use tracing::{trace, warn};
|
||||||
|
|
||||||
use crate::merkle_tree::MerkleTreeWrite;
|
use crate::merkle_tree::MerkleTreeWrite;
|
||||||
@ -145,6 +147,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn append(&mut self, new_leaf: E) {
|
pub fn append(&mut self, new_leaf: E) {
|
||||||
|
let start_time = Instant::now();
|
||||||
if new_leaf == E::null() {
|
if new_leaf == E::null() {
|
||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
@ -152,10 +155,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.node_manager.start_transaction();
|
self.node_manager.start_transaction();
|
||||||
self.node_manager.push_node(0, new_leaf);
|
self.node_manager.push_node(0, new_leaf);
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
|
|
||||||
self.node_manager.commit();
|
self.node_manager.commit();
|
||||||
|
metrics::APPEND.update_since(start_time);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||||
|
let start_time = Instant::now();
|
||||||
if leaf_list.contains(&E::null()) {
|
if leaf_list.contains(&E::null()) {
|
||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.node_manager.append_nodes(0, &leaf_list);
|
self.node_manager.append_nodes(0, &leaf_list);
|
||||||
self.recompute_after_append_leaves(start_index);
|
self.recompute_after_append_leaves(start_index);
|
||||||
self.node_manager.commit();
|
self.node_manager.commit();
|
||||||
|
metrics::APPEND_LIST.update_since(start_time);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a leaf list by providing their intermediate node hash.
|
/// Append a leaf list by providing their intermediate node hash.
|
||||||
@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
/// Other nodes in the subtree will be set to `null` nodes.
|
/// Other nodes in the subtree will be set to `null` nodes.
|
||||||
/// TODO: Optimize to avoid storing the `null` nodes?
|
/// TODO: Optimize to avoid storing the `null` nodes?
|
||||||
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
|
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
|
||||||
|
let start_time = Instant::now();
|
||||||
if subtree_root == E::null() {
|
if subtree_root == E::null() {
|
||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_root is null");
|
bail!("subtree_root is null");
|
||||||
@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
self.node_manager.commit();
|
self.node_manager.commit();
|
||||||
|
metrics::APPEND_SUBTREE.update_since(start_time);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
|
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
|
||||||
|
let start_time = Instant::now();
|
||||||
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
|
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
|
||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_list contains null");
|
bail!("subtree_list contains null");
|
||||||
@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
}
|
}
|
||||||
self.node_manager.commit();
|
self.node_manager.commit();
|
||||||
|
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Change the value of the last leaf and return the new merkle root.
|
/// Change the value of the last leaf and return the new merkle root.
|
||||||
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
|
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
|
||||||
pub fn update_last(&mut self, updated_leaf: E) {
|
pub fn update_last(&mut self, updated_leaf: E) {
|
||||||
|
let start_time = Instant::now();
|
||||||
if updated_leaf == E::null() {
|
if updated_leaf == E::null() {
|
||||||
// updating to null is not allowed.
|
// updating to null is not allowed.
|
||||||
return;
|
return;
|
||||||
@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
self.node_manager.commit();
|
self.node_manager.commit();
|
||||||
|
metrics::UPDATE_LAST.update_since(start_time);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fill an unknown `null` leaf with its real value.
|
/// Fill an unknown `null` leaf with its real value.
|
||||||
|
11
common/append_merkle/src/metrics.rs
Normal file
11
common/append_merkle/src/metrics.rs
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use metrics::{register_timer, Timer};
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
|
||||||
|
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
|
||||||
|
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
|
||||||
|
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
|
||||||
|
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
|
||||||
|
}
|
@ -1,7 +1,13 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use metrics::{register_timer, Timer};
|
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
|
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
|
||||||
|
|
||||||
|
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
|
||||||
|
|
||||||
|
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
|
||||||
|
|
||||||
|
pub static ref FlOW_CONTRACT_ROOT: Arc<dyn Timer> = register_timer("log_manager_flow_contract_root");
|
||||||
}
|
}
|
||||||
|
@ -408,6 +408,7 @@ impl LogSyncManager {
|
|||||||
}
|
}
|
||||||
LogFetchProgress::Transaction((tx, block_number)) => {
|
LogFetchProgress::Transaction((tx, block_number)) => {
|
||||||
let mut stop = false;
|
let mut stop = false;
|
||||||
|
let start_time = Instant::now();
|
||||||
match self.put_tx(tx.clone()).await {
|
match self.put_tx(tx.clone()).await {
|
||||||
Some(false) => stop = true,
|
Some(false) => stop = true,
|
||||||
Some(true) => {
|
Some(true) => {
|
||||||
@ -441,6 +442,8 @@ impl LogSyncManager {
|
|||||||
// no receivers will be created.
|
// no receivers will be created.
|
||||||
warn!("log sync broadcast error, error={:?}", e);
|
warn!("log sync broadcast error, error={:?}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
|
||||||
}
|
}
|
||||||
LogFetchProgress::Reverted(reverted) => {
|
LogFetchProgress::Reverted(reverted) => {
|
||||||
self.process_reverted(reverted).await;
|
self.process_reverted(reverted).await;
|
||||||
@ -453,7 +456,6 @@ impl LogSyncManager {
|
|||||||
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
|
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
let result = self.store.put_tx(tx.clone());
|
let result = self.store.put_tx(tx.clone());
|
||||||
metrics::STORE_PUT_TX.update_since(start_time);
|
|
||||||
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
error!("put_tx error: e={:?}", e);
|
error!("put_tx error: e={:?}", e);
|
||||||
@ -514,6 +516,8 @@ impl LogSyncManager {
|
|||||||
// Check if the computed data root matches on-chain state.
|
// Check if the computed data root matches on-chain state.
|
||||||
// If the call fails, we won't check the root here and return `true` directly.
|
// If the call fails, we won't check the root here and return `true` directly.
|
||||||
let flow_contract = self.log_fetcher.flow_contract();
|
let flow_contract = self.log_fetcher.flow_contract();
|
||||||
|
|
||||||
|
let flow_time = Instant::now();
|
||||||
match flow_contract
|
match flow_contract
|
||||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||||
.call()
|
.call()
|
||||||
@ -544,6 +548,11 @@ impl LogSyncManager {
|
|||||||
warn!(?e, "fail to read the on-chain flow root");
|
warn!(?e, "fail to read the on-chain flow root");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
metrics::FlOW_CONTRACT_ROOT.update_since(flow_time);
|
||||||
|
|
||||||
|
metrics::STORE_PUT_TX_SPEED_IN_BYTES
|
||||||
|
.update((tx.size / start_time.elapsed().as_millis() as u64) as usize);
|
||||||
|
metrics::STORE_PUT_TX.update_since(start_time);
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,8 @@ parking_lot = "0.12.3"
|
|||||||
serde_json = "1.0.127"
|
serde_json = "1.0.127"
|
||||||
tokio = { version = "1.38.0", features = ["full"] }
|
tokio = { version = "1.38.0", features = ["full"] }
|
||||||
task_executor = { path = "../../common/task_executor" }
|
task_executor = { path = "../../common/task_executor" }
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
metrics = { workspace = true }
|
||||||
once_cell = { version = "1.19.0", features = [] }
|
once_cell = { version = "1.19.0", features = [] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
use super::load_chunk::EntryBatch;
|
|
||||||
use super::seal_task_manager::SealTaskManager;
|
|
||||||
use super::{MineLoadChunk, SealAnswer, SealTask};
|
|
||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
use crate::log_store::load_chunk::EntryBatch;
|
||||||
use crate::log_store::log_manager::{
|
use crate::log_store::log_manager::{
|
||||||
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
|
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
|
||||||
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||||
};
|
};
|
||||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
use crate::log_store::seal_task_manager::SealTaskManager;
|
||||||
|
use crate::log_store::{
|
||||||
|
metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
|
||||||
|
};
|
||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
use any::Any;
|
use any::Any;
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
@ -22,6 +23,7 @@ use std::cmp::Ordering;
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use std::{any, cmp, mem};
|
use std::{any, cmp, mem};
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, error, trace};
|
||||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||||
@ -42,7 +44,11 @@ impl FlowStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
|
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
|
||||||
self.db.put_batch_root_list(root_map)
|
let start_time = Instant::now();
|
||||||
|
let res = self.db.put_batch_root_list(root_map);
|
||||||
|
|
||||||
|
metrics::PUT_BATCH_ROOT_LIST.update_since(start_time);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert_subtree_list_for_batch(
|
pub fn insert_subtree_list_for_batch(
|
||||||
@ -50,6 +56,7 @@ impl FlowStore {
|
|||||||
batch_index: usize,
|
batch_index: usize,
|
||||||
subtree_list: Vec<(usize, usize, DataRoot)>,
|
subtree_list: Vec<(usize, usize, DataRoot)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
let start_time = Instant::now();
|
||||||
let mut batch = self
|
let mut batch = self
|
||||||
.db
|
.db
|
||||||
.get_entry_batch(batch_index as u64)?
|
.get_entry_batch(batch_index as u64)?
|
||||||
@ -57,6 +64,8 @@ impl FlowStore {
|
|||||||
batch.set_subtree_list(subtree_list);
|
batch.set_subtree_list(subtree_list);
|
||||||
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
||||||
|
|
||||||
|
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +84,10 @@ impl FlowStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
||||||
self.db.put_mpt_node_list(node_list)
|
let start_time = Instant::now();
|
||||||
|
let res = self.db.put_mpt_node_list(node_list);
|
||||||
|
metrics::PUT_MPT_NODE.update_since(start_time);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
||||||
@ -227,6 +239,7 @@ impl FlowWrite for FlowStore {
|
|||||||
/// Return the roots of completed chunks. The order is guaranteed to be increasing
|
/// Return the roots of completed chunks. The order is guaranteed to be increasing
|
||||||
/// by chunk index.
|
/// by chunk index.
|
||||||
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
||||||
|
let start_time = Instant::now();
|
||||||
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||||
trace!("append_entries: {} {}", data.start_index, data.data.len());
|
trace!("append_entries: {} {}", data.start_index, data.data.len());
|
||||||
if data.data.len() % BYTES_PER_SECTOR != 0 {
|
if data.data.len() % BYTES_PER_SECTOR != 0 {
|
||||||
@ -269,6 +282,8 @@ impl FlowWrite for FlowStore {
|
|||||||
|
|
||||||
batch_list.push((chunk_index, batch));
|
batch_list.push((chunk_index, batch));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::APPEND_ENTRIES.update_since(start_time);
|
||||||
self.db.put_entry_batch_list(batch_list)
|
self.db.put_entry_batch_list(batch_list)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -378,6 +393,7 @@ impl FlowDBStore {
|
|||||||
&self,
|
&self,
|
||||||
batch_list: Vec<(u64, EntryBatch)>,
|
batch_list: Vec<(u64, EntryBatch)>,
|
||||||
) -> Result<Vec<(u64, DataRoot)>> {
|
) -> Result<Vec<(u64, DataRoot)>> {
|
||||||
|
let start_time = Instant::now();
|
||||||
let mut completed_batches = Vec::new();
|
let mut completed_batches = Vec::new();
|
||||||
let mut tx = self.kvdb.transaction();
|
let mut tx = self.kvdb.transaction();
|
||||||
for (batch_index, batch) in batch_list {
|
for (batch_index, batch) in batch_list {
|
||||||
@ -398,6 +414,7 @@ impl FlowDBStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.kvdb.write(tx)?;
|
self.kvdb.write(tx)?;
|
||||||
|
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
|
||||||
Ok(completed_batches)
|
Ok(completed_batches)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
use super::tx_store::BlockHashAndSubmissionIndex;
|
|
||||||
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
|
|
||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
|
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
|
||||||
use crate::log_store::tx_store::TransactionStore;
|
use crate::log_store::tx_store::TransactionStore;
|
||||||
@ -26,8 +24,13 @@ use std::collections::BTreeMap;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use tracing::{debug, error, info, instrument, trace, warn};
|
use tracing::{debug, error, info, instrument, trace, warn};
|
||||||
|
|
||||||
|
use crate::log_store::metrics;
|
||||||
|
use crate::log_store::tx_store::BlockHashAndSubmissionIndex;
|
||||||
|
use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
|
||||||
|
|
||||||
/// 256 Bytes
|
/// 256 Bytes
|
||||||
pub const ENTRY_SIZE: usize = 256;
|
pub const ENTRY_SIZE: usize = 256;
|
||||||
/// 1024 Entries.
|
/// 1024 Entries.
|
||||||
@ -189,6 +192,7 @@ impl LogStoreChunkWrite for LogManager {
|
|||||||
chunks: ChunkArray,
|
chunks: ChunkArray,
|
||||||
maybe_file_proof: Option<FlowProof>,
|
maybe_file_proof: Option<FlowProof>,
|
||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
|
let start_time = Instant::now();
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
let tx = self
|
let tx = self
|
||||||
.tx_store
|
.tx_store
|
||||||
@ -221,6 +225,7 @@ impl LogStoreChunkWrite for LogManager {
|
|||||||
)?;
|
)?;
|
||||||
self.flow_store.put_mpt_node_list(updated_node_list)?;
|
self.flow_store.put_mpt_node_list(updated_node_list)?;
|
||||||
}
|
}
|
||||||
|
metrics::PUT_CHUNKS.update_since(start_time);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,6 +256,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
|
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
|
||||||
///
|
///
|
||||||
fn put_tx(&self, tx: Transaction) -> Result<()> {
|
fn put_tx(&self, tx: Transaction) -> Result<()> {
|
||||||
|
let start_time = Instant::now();
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
debug!("put_tx: tx={:?}", tx);
|
debug!("put_tx: tx={:?}", tx);
|
||||||
let expected_seq = self.tx_store.next_tx_seq();
|
let expected_seq = self.tx_store.next_tx_seq();
|
||||||
@ -280,6 +286,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
metrics::PUT_TX.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -310,6 +317,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
|
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
|
||||||
|
let start_time = Instant::now();
|
||||||
trace!(
|
trace!(
|
||||||
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
|
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
|
||||||
tx_seq,
|
tx_seq,
|
||||||
@ -338,6 +346,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||||
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||||
}
|
}
|
||||||
|
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
|
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
|
||||||
@ -875,6 +884,7 @@ impl LogManager {
|
|||||||
if merkle_list.is_empty() {
|
if merkle_list.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
self.pad_tx(tx_start_index, &mut *merkle)?;
|
self.pad_tx(tx_start_index, &mut *merkle)?;
|
||||||
|
|
||||||
@ -920,12 +930,15 @@ impl LogManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.flow_store.put_batch_root_list(batch_root_map)?;
|
self.flow_store.put_batch_root_list(batch_root_map)?;
|
||||||
|
|
||||||
|
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self, merkle))]
|
#[instrument(skip(self, merkle))]
|
||||||
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
||||||
// Check if we need to pad the flow.
|
// Check if we need to pad the flow.
|
||||||
|
let start_time = Instant::now();
|
||||||
let mut tx_start_flow_index =
|
let mut tx_start_flow_index =
|
||||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
||||||
let pad_size = tx_start_index - tx_start_flow_index;
|
let pad_size = tx_start_index - tx_start_flow_index;
|
||||||
@ -1014,6 +1027,8 @@ impl LogManager {
|
|||||||
merkle.pora_chunks_merkle.leaves(),
|
merkle.pora_chunks_merkle.leaves(),
|
||||||
merkle.last_chunk_merkle.leaves()
|
merkle.last_chunk_merkle.leaves()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
metrics::PAD_TX.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1142,6 +1157,8 @@ impl LogManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
let shard_config = self.flow_store.get_shard_config();
|
let shard_config = self.flow_store.get_shard_config();
|
||||||
// We have all the data need for this tx, so just copy them.
|
// We have all the data need for this tx, so just copy them.
|
||||||
@ -1190,6 +1207,8 @@ impl LogManager {
|
|||||||
for (seq, _) in to_tx_offset_list {
|
for (seq, _) in to_tx_offset_list {
|
||||||
self.tx_store.finalize_tx(seq)?;
|
self.tx_store.finalize_tx(seq)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1262,6 +1281,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
|
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
|
||||||
|
let start_time = Instant::now();
|
||||||
if leaf_data.len() % ENTRY_SIZE != 0 {
|
if leaf_data.len() % ENTRY_SIZE != 0 {
|
||||||
bail!("merkle_tree: mismatched data size");
|
bail!("merkle_tree: mismatched data size");
|
||||||
}
|
}
|
||||||
@ -1277,6 +1297,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
|
|||||||
.map(Sha3Algorithm::leaf)
|
.map(Sha3Algorithm::leaf)
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
|
||||||
Ok(r)
|
Ok(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
39
node/storage/src/log_store/metrics.rs
Normal file
39
node/storage/src/log_store/metrics.rs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use metrics::{register_timer, Timer};
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
|
||||||
|
|
||||||
|
pub static ref PUT_CHUNKS: Arc<dyn Timer> = register_timer("log_store_put_chunks");
|
||||||
|
|
||||||
|
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
|
||||||
|
|
||||||
|
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_log_manager_check_tx_completed");
|
||||||
|
|
||||||
|
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_log_manager_append_subtree_list");
|
||||||
|
|
||||||
|
pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_log_manager_data_to_merkle_leaves");
|
||||||
|
|
||||||
|
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_log_manager_copy_tx_and_finalize");
|
||||||
|
|
||||||
|
pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
|
||||||
|
|
||||||
|
pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
|
||||||
|
|
||||||
|
pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_flow_store_insert_subtree_list");
|
||||||
|
|
||||||
|
pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
|
||||||
|
|
||||||
|
pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
|
||||||
|
register_timer("log_store_flow_store_put_entry_batch_list");
|
||||||
|
|
||||||
|
pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
|
||||||
|
|
||||||
|
pub static ref FINALIZE_TX_WITH_HASH: Arc<dyn Timer> = register_timer("log_store_log_manager_finalize_tx_with_hash");
|
||||||
|
}
|
@ -15,6 +15,7 @@ pub mod config;
|
|||||||
mod flow_store;
|
mod flow_store;
|
||||||
mod load_chunk;
|
mod load_chunk;
|
||||||
pub mod log_manager;
|
pub mod log_manager;
|
||||||
|
mod metrics;
|
||||||
mod seal_task_manager;
|
mod seal_task_manager;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
|
|||||||
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
|
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
|
||||||
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||||
};
|
};
|
||||||
|
use crate::log_store::metrics;
|
||||||
use crate::{try_option, LogManager, ZgsKeyValueDB};
|
use crate::{try_option, LogManager, ZgsKeyValueDB};
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||||
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use tracing::{error, instrument};
|
use tracing::{error, instrument};
|
||||||
|
|
||||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
||||||
@ -51,6 +53,8 @@ impl TransactionStore {
|
|||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
|
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
|
||||||
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
|
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
|
||||||
|
let start_time = Instant::now();
|
||||||
|
|
||||||
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
|
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
|
||||||
// The last tx is inserted again, so no need to process it.
|
// The last tx is inserted again, so no need to process it.
|
||||||
@ -86,6 +90,7 @@ impl TransactionStore {
|
|||||||
);
|
);
|
||||||
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
|
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
|
||||||
self.kvdb.write(db_tx)?;
|
self.kvdb.write(db_tx)?;
|
||||||
|
metrics::TX_STORE_PUT.update_since(start_time);
|
||||||
Ok(old_tx_seq_list)
|
Ok(old_tx_seq_list)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,8 +180,12 @@ impl TransactionStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
||||||
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
|
let start_time = Instant::now();
|
||||||
== Some(vec![TX_STATUS_FINALIZED]))
|
let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
|
||||||
|
== Some(vec![TX_STATUS_FINALIZED]);
|
||||||
|
|
||||||
|
metrics::CHECK_TX_COMPLETED.update_since(start_time);
|
||||||
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
|
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
from test_framework.test_framework import TestFramework
|
from test_framework.test_framework import TestFramework
|
||||||
from utility.utils import wait_until
|
from utility.utils import wait_until
|
||||||
|
|
||||||
|
|
||||||
class AutoRandomSyncV2Test(TestFramework):
|
class AutoRandomSyncV2Test(TestFramework):
|
||||||
def setup_params(self):
|
def setup_params(self):
|
||||||
self.num_nodes = 4
|
self.num_nodes = 4
|
||||||
@ -30,5 +31,6 @@ class AutoRandomSyncV2Test(TestFramework):
|
|||||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
|
||||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
AutoRandomSyncV2Test().main()
|
AutoRandomSyncV2Test().main()
|
||||||
|
Loading…
Reference in New Issue
Block a user