mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
2 Commits
16e70bde68
...
f4d5228234
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f4d5228234 | ||
![]() |
d93f453d50 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -226,6 +226,7 @@ dependencies = [
|
||||
"itertools 0.13.0",
|
||||
"lazy_static",
|
||||
"lru 0.12.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"serde",
|
||||
"tiny-keccak",
|
||||
@ -7305,8 +7306,10 @@ dependencies = [
|
||||
"kvdb",
|
||||
"kvdb-memorydb",
|
||||
"kvdb-rocksdb",
|
||||
"lazy_static",
|
||||
"merkle_light",
|
||||
"merkle_tree",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
@ -7329,6 +7332,7 @@ name = "storage-async"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
"eth2_ssz",
|
||||
"shared_types",
|
||||
"storage",
|
||||
|
@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
|
||||
lazy_static = "1.4.0"
|
||||
tracing = "0.1.36"
|
||||
once_cell = "1.19.0"
|
||||
|
||||
metrics = { workspace = true }
|
||||
|
||||
itertools = "0.13.0"
|
||||
lru = "0.12.5"
|
@ -1,4 +1,5 @@
|
||||
mod merkle_tree;
|
||||
mod metrics;
|
||||
mod node_manager;
|
||||
mod proof;
|
||||
mod sha3;
|
||||
@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
use crate::merkle_tree::MerkleTreeWrite;
|
||||
@ -145,6 +147,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
pub fn append(&mut self, new_leaf: E) {
|
||||
let start_time = Instant::now();
|
||||
if new_leaf == E::null() {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
@ -152,10 +155,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.node_manager.start_transaction();
|
||||
self.node_manager.push_node(0, new_leaf);
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND.update_since(start_time);
|
||||
}
|
||||
|
||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||
let start_time = Instant::now();
|
||||
if leaf_list.contains(&E::null()) {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.node_manager.append_nodes(0, &leaf_list);
|
||||
self.recompute_after_append_leaves(start_index);
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND_LIST.update_since(start_time);
|
||||
}
|
||||
|
||||
/// Append a leaf list by providing their intermediate node hash.
|
||||
@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
/// Other nodes in the subtree will be set to `null` nodes.
|
||||
/// TODO: Optimize to avoid storing the `null` nodes?
|
||||
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
if subtree_root == E::null() {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_root is null");
|
||||
@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND_SUBTREE.update_since(start_time);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_list contains null");
|
||||
@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
}
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Change the value of the last leaf and return the new merkle root.
|
||||
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
|
||||
pub fn update_last(&mut self, updated_leaf: E) {
|
||||
let start_time = Instant::now();
|
||||
if updated_leaf == E::null() {
|
||||
// updating to null is not allowed.
|
||||
return;
|
||||
@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
self.node_manager.commit();
|
||||
metrics::UPDATE_LAST.update_since(start_time);
|
||||
}
|
||||
|
||||
/// Fill an unknown `null` leaf with its real value.
|
||||
|
11
common/append_merkle/src/metrics.rs
Normal file
11
common/append_merkle/src/metrics.rs
Normal file
@ -0,0 +1,11 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_timer, Timer};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
|
||||
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
|
||||
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
|
||||
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
|
||||
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
use crate::sync_manager::log_query::LogQuery;
|
||||
use crate::sync_manager::RETRY_WAIT_MS;
|
||||
use crate::sync_manager::{metrics, RETRY_WAIT_MS};
|
||||
use crate::{ContractAddress, LogSyncConfig};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{Algorithm, Sha3Algorithm};
|
||||
@ -13,15 +13,12 @@ use jsonrpsee::tracing::{debug, error, info, warn};
|
||||
use shared_types::{DataRoot, Transaction};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
},
|
||||
time::Instant,
|
||||
use tokio::sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
};
|
||||
|
||||
pub struct LogEntryFetcher {
|
||||
@ -242,6 +239,7 @@ impl LogEntryFetcher {
|
||||
);
|
||||
let (mut block_hash_sent, mut block_number_sent) = (None, None);
|
||||
while let Some(maybe_log) = stream.next().await {
|
||||
let start_time = Instant::now();
|
||||
match maybe_log {
|
||||
Ok(log) => {
|
||||
let sync_progress =
|
||||
@ -301,6 +299,7 @@ impl LogEntryFetcher {
|
||||
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
|
||||
}
|
||||
}
|
||||
metrics::RECOVER_LOG.update_since(start_time);
|
||||
}
|
||||
|
||||
info!("log recover end");
|
||||
|
@ -1,7 +1,13 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_timer, Timer};
|
||||
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
|
||||
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
|
||||
|
||||
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
|
||||
|
||||
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
|
||||
|
||||
pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
|
||||
// Each tx has less than 10KB, so the cache size should be acceptable.
|
||||
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
||||
const CATCH_UP_END_GAP: u64 = 10;
|
||||
const CHECK_ROOT_INTERVAL: u64 = 500;
|
||||
|
||||
/// Errors while handle data
|
||||
#[derive(Error, Debug)]
|
||||
@ -402,6 +403,7 @@ impl LogSyncManager {
|
||||
}
|
||||
LogFetchProgress::Transaction((tx, block_number)) => {
|
||||
let mut stop = false;
|
||||
let start_time = Instant::now();
|
||||
match self.put_tx(tx.clone()).await {
|
||||
Some(false) => stop = true,
|
||||
Some(true) => {
|
||||
@ -435,6 +437,8 @@ impl LogSyncManager {
|
||||
// no receivers will be created.
|
||||
warn!("log sync broadcast error, error={:?}", e);
|
||||
}
|
||||
|
||||
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
|
||||
}
|
||||
LogFetchProgress::Reverted(reverted) => {
|
||||
self.process_reverted(reverted).await;
|
||||
@ -447,7 +451,6 @@ impl LogSyncManager {
|
||||
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
|
||||
let start_time = Instant::now();
|
||||
let result = self.store.put_tx(tx.clone());
|
||||
metrics::STORE_PUT_TX.update_since(start_time);
|
||||
|
||||
if let Err(e) = result {
|
||||
error!("put_tx error: e={:?}", e);
|
||||
@ -508,38 +511,45 @@ impl LogSyncManager {
|
||||
|
||||
// Check if the computed data root matches on-chain state.
|
||||
// If the call fails, we won't check the root here and return `true` directly.
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::STORE_PUT_TX_SPEED_IN_BYTES
|
||||
.update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
|
||||
metrics::STORE_PUT_TX.update_since(start_time);
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
@ -53,6 +53,8 @@ pub struct FileInfo {
|
||||
pub finalized: bool,
|
||||
pub is_cached: bool,
|
||||
pub uploaded_seg_num: usize,
|
||||
/// Whether file is pruned, in which case `finalized` will be `false`.
|
||||
pub pruned: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
@ -8,6 +8,7 @@ use jsonrpsee::core::RpcResult;
|
||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||
use std::fmt::{Debug, Formatter, Result};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::tx_store::TxStatus;
|
||||
use storage::{try_option, H256};
|
||||
|
||||
pub struct RpcServerImpl {
|
||||
@ -245,7 +246,17 @@ impl RpcServerImpl {
|
||||
}
|
||||
|
||||
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
||||
let finalized = self.ctx.log_store.check_tx_completed(tx.seq).await?;
|
||||
let (finalized, pruned) = match self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_store()
|
||||
.get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))?
|
||||
{
|
||||
Some(TxStatus::Finalized) => (true, false),
|
||||
Some(TxStatus::Pruned) => (false, true),
|
||||
None => (false, false),
|
||||
};
|
||||
|
||||
let (uploaded_seg_num, is_cached) = match self
|
||||
.ctx
|
||||
.chunk_pool
|
||||
@ -254,7 +265,7 @@ impl RpcServerImpl {
|
||||
{
|
||||
Some(v) => v,
|
||||
_ => (
|
||||
if finalized {
|
||||
if finalized || pruned {
|
||||
let chunks_per_segment = self.ctx.config.chunks_per_segment;
|
||||
let (num_segments, _) = SegmentWithProof::split_file_into_segments(
|
||||
tx.size as usize,
|
||||
@ -273,6 +284,7 @@ impl RpcServerImpl {
|
||||
finalized,
|
||||
is_cached,
|
||||
uploaded_seg_num,
|
||||
pruned,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -10,4 +10,5 @@ storage = { path = "../storage" }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
tokio = { version = "1.19.2", features = ["sync"] }
|
||||
tracing = "0.1.35"
|
||||
eth2_ssz = "0.4.0"
|
||||
eth2_ssz = "0.4.0"
|
||||
backtrace = "0.3"
|
@ -2,6 +2,7 @@
|
||||
extern crate tracing;
|
||||
|
||||
use anyhow::bail;
|
||||
use backtrace::Backtrace;
|
||||
use shared_types::{
|
||||
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
||||
};
|
||||
@ -139,6 +140,9 @@ impl Store {
|
||||
{
|
||||
let store = self.store.clone();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let mut backtrace = Backtrace::new();
|
||||
let frames = backtrace.frames().to_vec();
|
||||
backtrace = frames.into();
|
||||
|
||||
self.executor.spawn_blocking(
|
||||
move || {
|
||||
@ -146,6 +150,7 @@ impl Store {
|
||||
let res = f(&*store);
|
||||
|
||||
if tx.send(res).is_err() {
|
||||
warn!("Backtrace: {:?}", backtrace);
|
||||
error!("Unable to complete async storage operation: the receiver dropped");
|
||||
}
|
||||
},
|
||||
|
@ -31,6 +31,8 @@ parking_lot = "0.12.3"
|
||||
serde_json = "1.0.127"
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
lazy_static = "1.4.0"
|
||||
metrics = { workspace = true }
|
||||
once_cell = { version = "1.19.0", features = [] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -1,13 +1,14 @@
|
||||
use super::load_chunk::EntryBatch;
|
||||
use super::log_manager::{COL_PAD_DATA_LIST, COL_PAD_DATA_SYNC_HEIGH};
|
||||
use super::seal_task_manager::SealTaskManager;
|
||||
use super::{MineLoadChunk, SealAnswer, SealTask};
|
||||
use crate::config::ShardConfig;
|
||||
use crate::error::Error;
|
||||
use crate::log_store::load_chunk::EntryBatch;
|
||||
use crate::log_store::log_manager::{
|
||||
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
|
||||
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, COL_PAD_DATA_LIST,
|
||||
COL_PAD_DATA_SYNC_HEIGH, PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::seal_task_manager::SealTaskManager;
|
||||
use crate::log_store::{
|
||||
metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
|
||||
};
|
||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
use any::Any;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
@ -21,6 +22,7 @@ use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::{any, cmp};
|
||||
use tracing::{debug, error, trace};
|
||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||
@ -47,6 +49,7 @@ impl FlowStore {
|
||||
batch_index: usize,
|
||||
subtree_list: Vec<(usize, usize, DataRoot)>,
|
||||
) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
let mut batch = self
|
||||
.data_db
|
||||
.get_entry_batch(batch_index as u64)?
|
||||
@ -54,7 +57,7 @@ impl FlowStore {
|
||||
batch.set_subtree_list(subtree_list);
|
||||
self.data_db
|
||||
.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
||||
|
||||
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -216,6 +219,7 @@ impl FlowWrite for FlowStore {
|
||||
/// Return the roots of completed chunks. The order is guaranteed to be increasing
|
||||
/// by chunk index.
|
||||
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
||||
let start_time = Instant::now();
|
||||
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||
trace!("append_entries: {} {}", data.start_index, data.data.len());
|
||||
if data.data.len() % BYTES_PER_SECTOR != 0 {
|
||||
@ -258,6 +262,8 @@ impl FlowWrite for FlowStore {
|
||||
|
||||
batch_list.push((chunk_index, batch));
|
||||
}
|
||||
|
||||
metrics::APPEND_ENTRIES.update_since(start_time);
|
||||
self.data_db.put_entry_batch_list(batch_list)
|
||||
}
|
||||
|
||||
@ -381,6 +387,7 @@ impl FlowDBStore {
|
||||
&self,
|
||||
batch_list: Vec<(u64, EntryBatch)>,
|
||||
) -> Result<Vec<(u64, DataRoot)>> {
|
||||
let start_time = Instant::now();
|
||||
let mut completed_batches = Vec::new();
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for (batch_index, batch) in batch_list {
|
||||
@ -395,6 +402,7 @@ impl FlowDBStore {
|
||||
}
|
||||
}
|
||||
self.kvdb.write(tx)?;
|
||||
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
|
||||
Ok(completed_batches)
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,7 @@ use crate::config::ShardConfig;
|
||||
use crate::log_store::flow_store::{
|
||||
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
|
||||
};
|
||||
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore};
|
||||
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus};
|
||||
use crate::log_store::{
|
||||
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
|
||||
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
|
||||
@ -21,14 +21,18 @@ use rayon::prelude::ParallelSlice;
|
||||
use shared_types::{
|
||||
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
|
||||
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
||||
TxSeqOrRoot,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
use crate::log_store::metrics;
|
||||
|
||||
/// 256 Bytes
|
||||
pub const ENTRY_SIZE: usize = 256;
|
||||
/// 1024 Entries.
|
||||
@ -193,6 +197,7 @@ impl LogStoreChunkWrite for LogManager {
|
||||
chunks: ChunkArray,
|
||||
maybe_file_proof: Option<FlowProof>,
|
||||
) -> Result<bool> {
|
||||
let start_time = Instant::now();
|
||||
let mut merkle = self.merkle.write();
|
||||
let tx = self
|
||||
.tx_store
|
||||
@ -224,6 +229,7 @@ impl LogStoreChunkWrite for LogManager {
|
||||
tx.start_entry_index,
|
||||
)?;
|
||||
}
|
||||
metrics::PUT_CHUNKS.update_since(start_time);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@ -254,6 +260,7 @@ impl LogStoreWrite for LogManager {
|
||||
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
|
||||
///
|
||||
fn put_tx(&self, tx: Transaction) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
let mut merkle = self.merkle.write();
|
||||
debug!("put_tx: tx={:?}", tx);
|
||||
let expected_seq = self.tx_store.next_tx_seq();
|
||||
@ -288,6 +295,7 @@ impl LogStoreWrite for LogManager {
|
||||
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
||||
}
|
||||
}
|
||||
metrics::PUT_TX.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -318,6 +326,7 @@ impl LogStoreWrite for LogManager {
|
||||
}
|
||||
|
||||
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
|
||||
let start_time = Instant::now();
|
||||
trace!(
|
||||
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
|
||||
tx_seq,
|
||||
@ -346,6 +355,7 @@ impl LogStoreWrite for LogManager {
|
||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||
}
|
||||
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
|
||||
Ok(true)
|
||||
} else {
|
||||
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
|
||||
@ -572,6 +582,17 @@ impl LogStoreRead for LogManager {
|
||||
}))
|
||||
}
|
||||
|
||||
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>> {
|
||||
let tx_seq = match tx_seq_or_data_root {
|
||||
TxSeqOrRoot::TxSeq(v) => v,
|
||||
TxSeqOrRoot::Root(root) => {
|
||||
try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?)
|
||||
}
|
||||
};
|
||||
|
||||
self.tx_store.get_tx_status(tx_seq)
|
||||
}
|
||||
|
||||
fn check_tx_completed(&self, tx_seq: u64) -> crate::error::Result<bool> {
|
||||
self.tx_store.check_tx_completed(tx_seq)
|
||||
}
|
||||
@ -872,6 +893,7 @@ impl LogManager {
|
||||
if merkle_list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let start_time = Instant::now();
|
||||
|
||||
self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?;
|
||||
|
||||
@ -907,12 +929,15 @@ impl LogManager {
|
||||
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
||||
}
|
||||
}
|
||||
|
||||
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self, merkle))]
|
||||
fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
||||
// Check if we need to pad the flow.
|
||||
let start_time = Instant::now();
|
||||
let mut tx_start_flow_index =
|
||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
||||
let pad_size = tx_start_index - tx_start_flow_index;
|
||||
@ -993,6 +1018,8 @@ impl LogManager {
|
||||
);
|
||||
|
||||
self.flow_store.put_pad_data(&pad_list, tx_seq)?;
|
||||
|
||||
metrics::PAD_TX.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -1121,6 +1148,8 @@ impl LogManager {
|
||||
}
|
||||
|
||||
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut merkle = self.merkle.write();
|
||||
let shard_config = self.flow_store.get_shard_config();
|
||||
// We have all the data need for this tx, so just copy them.
|
||||
@ -1169,6 +1198,8 @@ impl LogManager {
|
||||
for (seq, _) in to_tx_offset_list {
|
||||
self.tx_store.finalize_tx(seq)?;
|
||||
}
|
||||
|
||||
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -1241,6 +1272,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
|
||||
}
|
||||
|
||||
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
|
||||
let start_time = Instant::now();
|
||||
if leaf_data.len() % ENTRY_SIZE != 0 {
|
||||
bail!("merkle_tree: mismatched data size");
|
||||
}
|
||||
@ -1256,6 +1288,9 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
|
||||
.map(Sha3Algorithm::leaf)
|
||||
.collect()
|
||||
};
|
||||
|
||||
metrics::DATA_TO_MERKLE_LEAVES_SIZE.update(leaf_data.len());
|
||||
metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
|
41
node/storage/src/log_store/metrics.rs
Normal file
41
node/storage/src/log_store/metrics.rs
Normal file
@ -0,0 +1,41 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
|
||||
|
||||
pub static ref PUT_CHUNKS: Arc<dyn Timer> = register_timer("log_store_put_chunks");
|
||||
|
||||
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
|
||||
|
||||
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_check_tx_completed");
|
||||
|
||||
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_append_subtree_list");
|
||||
|
||||
pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_data_to_merkle_leaves");
|
||||
|
||||
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_copy_tx_and_finalize");
|
||||
|
||||
pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
|
||||
|
||||
pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
|
||||
|
||||
pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
|
||||
register_timer("log_store_flow_store_insert_subtree_list");
|
||||
|
||||
pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
|
||||
|
||||
pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
|
||||
register_timer("log_store_flow_store_put_entry_batch_list");
|
||||
|
||||
pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
|
||||
|
||||
pub static ref FINALIZE_TX_WITH_HASH: Arc<dyn Timer> = register_timer("log_store_log_manager_finalize_tx_with_hash");
|
||||
|
||||
pub static ref DATA_TO_MERKLE_LEAVES_SIZE: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_store_data_to_merkle_leaves_size");
|
||||
}
|
@ -4,18 +4,19 @@ use ethereum_types::H256;
|
||||
use flow_store::PadPair;
|
||||
use shared_types::{
|
||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||
Transaction,
|
||||
Transaction, TxSeqOrRoot,
|
||||
};
|
||||
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
use self::tx_store::BlockHashAndSubmissionIndex;
|
||||
use self::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
|
||||
|
||||
pub mod config;
|
||||
mod flow_store;
|
||||
mod load_chunk;
|
||||
pub mod log_manager;
|
||||
mod metrics;
|
||||
mod seal_task_manager;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@ -57,6 +58,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
||||
|
||||
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
|
||||
|
||||
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>>;
|
||||
|
||||
fn next_tx_seq(&self) -> u64;
|
||||
|
||||
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
|
||||
|
@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
|
||||
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
|
||||
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::metrics;
|
||||
use crate::{try_option, LogManager, ZgsKeyValueDB};
|
||||
use anyhow::{anyhow, Result};
|
||||
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||
@ -15,14 +16,38 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tracing::{error, instrument};
|
||||
|
||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
||||
const NEXT_TX_KEY: &str = "next_tx_seq";
|
||||
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
|
||||
|
||||
const TX_STATUS_FINALIZED: u8 = 0;
|
||||
const TX_STATUS_PRUNED: u8 = 1;
|
||||
pub enum TxStatus {
|
||||
Finalized,
|
||||
Pruned,
|
||||
}
|
||||
|
||||
impl From<TxStatus> for u8 {
|
||||
fn from(value: TxStatus) -> Self {
|
||||
match value {
|
||||
TxStatus::Finalized => 0,
|
||||
TxStatus::Pruned => 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for TxStatus {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
||||
match value {
|
||||
0 => Ok(TxStatus::Finalized),
|
||||
1 => Ok(TxStatus::Pruned),
|
||||
_ => Err(anyhow!("invalid value for tx status {}", value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BlockHashAndSubmissionIndex {
|
||||
@ -51,6 +76,8 @@ impl TransactionStore {
|
||||
#[instrument(skip(self))]
|
||||
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
|
||||
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
|
||||
// The last tx is inserted again, so no need to process it.
|
||||
@ -86,6 +113,7 @@ impl TransactionStore {
|
||||
);
|
||||
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
|
||||
self.kvdb.write(db_tx)?;
|
||||
metrics::TX_STORE_PUT.update_since(start_time);
|
||||
Ok(old_tx_seq_list)
|
||||
}
|
||||
|
||||
@ -163,24 +191,38 @@ impl TransactionStore {
|
||||
Ok(self.kvdb.put(
|
||||
COL_TX_COMPLETED,
|
||||
&tx_seq.to_be_bytes(),
|
||||
&[TX_STATUS_FINALIZED],
|
||||
&[TxStatus::Finalized.into()],
|
||||
)?)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
|
||||
Ok(self
|
||||
.kvdb
|
||||
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
|
||||
Ok(self.kvdb.put(
|
||||
COL_TX_COMPLETED,
|
||||
&tx_seq.to_be_bytes(),
|
||||
&[TxStatus::Pruned.into()],
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
|
||||
let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?);
|
||||
match value.first() {
|
||||
Some(v) => Ok(Some(TxStatus::try_from(*v)?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
||||
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
|
||||
== Some(vec![TX_STATUS_FINALIZED]))
|
||||
let start_time = Instant::now();
|
||||
let status = self.get_tx_status(tx_seq)?;
|
||||
|
||||
metrics::CHECK_TX_COMPLETED.update_since(start_time);
|
||||
Ok(matches!(status, Some(TxStatus::Finalized)))
|
||||
}
|
||||
|
||||
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
|
||||
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
|
||||
let status = self.get_tx_status(tx_seq)?;
|
||||
Ok(matches!(status, Some(TxStatus::Pruned)))
|
||||
}
|
||||
|
||||
pub fn next_tx_seq(&self) -> u64 {
|
||||
|
Loading…
Reference in New Issue
Block a user