mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 00:27:39 +00:00 
			
		
		
		
	Merge daf261de8d into 9b68a8b7d7
				
					
				
			This commit is contained in:
		
						commit
						f47ac0e03a
					
				
							
								
								
									
										3
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										3
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@ -226,6 +226,7 @@ dependencies = [
 | 
			
		||||
 "itertools 0.13.0",
 | 
			
		||||
 "lazy_static",
 | 
			
		||||
 "lru 0.12.5",
 | 
			
		||||
 "metrics",
 | 
			
		||||
 "once_cell",
 | 
			
		||||
 "serde",
 | 
			
		||||
 "tiny-keccak",
 | 
			
		||||
@ -7300,8 +7301,10 @@ dependencies = [
 | 
			
		||||
 "kvdb",
 | 
			
		||||
 "kvdb-memorydb",
 | 
			
		||||
 "kvdb-rocksdb",
 | 
			
		||||
 "lazy_static",
 | 
			
		||||
 "merkle_light",
 | 
			
		||||
 "merkle_tree",
 | 
			
		||||
 "metrics",
 | 
			
		||||
 "once_cell",
 | 
			
		||||
 "parking_lot 0.12.3",
 | 
			
		||||
 "rand 0.8.5",
 | 
			
		||||
 | 
			
		||||
@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
 | 
			
		||||
lazy_static = "1.4.0"
 | 
			
		||||
tracing = "0.1.36"
 | 
			
		||||
once_cell = "1.19.0"
 | 
			
		||||
 | 
			
		||||
metrics = { workspace = true }
 | 
			
		||||
 | 
			
		||||
itertools = "0.13.0"
 | 
			
		||||
lru = "0.12.5"
 | 
			
		||||
@ -1,4 +1,5 @@
 | 
			
		||||
mod merkle_tree;
 | 
			
		||||
mod metrics;
 | 
			
		||||
mod node_manager;
 | 
			
		||||
mod proof;
 | 
			
		||||
mod sha3;
 | 
			
		||||
@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::marker::PhantomData;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use tracing::{trace, warn};
 | 
			
		||||
 | 
			
		||||
use crate::merkle_tree::MerkleTreeWrite;
 | 
			
		||||
@ -145,6 +147,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append(&mut self, new_leaf: E) {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if new_leaf == E::null() {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
@ -152,10 +155,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        self.node_manager.push_node(0, new_leaf);
 | 
			
		||||
        self.recompute_after_append_leaves(self.leaves() - 1);
 | 
			
		||||
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append_list(&mut self, leaf_list: Vec<E>) {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if leaf_list.contains(&E::null()) {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        self.node_manager.append_nodes(0, &leaf_list);
 | 
			
		||||
        self.recompute_after_append_leaves(start_index);
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND_LIST.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Append a leaf list by providing their intermediate node hash.
 | 
			
		||||
@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    /// Other nodes in the subtree will be set to `null` nodes.
 | 
			
		||||
    /// TODO: Optimize to avoid storing the `null` nodes?
 | 
			
		||||
    pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if subtree_root == E::null() {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            bail!("subtree_root is null");
 | 
			
		||||
@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        self.append_subtree_inner(subtree_depth, subtree_root)?;
 | 
			
		||||
        self.recompute_after_append_subtree(start_index, subtree_depth - 1);
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND_SUBTREE.update_since(start_time);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if subtree_list.iter().any(|(_, root)| root == &E::null()) {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            bail!("subtree_list contains null");
 | 
			
		||||
@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            self.recompute_after_append_subtree(start_index, subtree_depth - 1);
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND_SUBTREE_LIST.update_since(start_time);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Change the value of the last leaf and return the new merkle root.
 | 
			
		||||
    /// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
 | 
			
		||||
    pub fn update_last(&mut self, updated_leaf: E) {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if updated_leaf == E::null() {
 | 
			
		||||
            // updating to null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        }
 | 
			
		||||
        self.recompute_after_append_leaves(self.leaves() - 1);
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::UPDATE_LAST.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Fill an unknown `null` leaf with its real value.
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										11
									
								
								common/append_merkle/src/metrics.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								common/append_merkle/src/metrics.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,11 @@
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
 | 
			
		||||
use metrics::{register_timer, Timer};
 | 
			
		||||
 | 
			
		||||
lazy_static::lazy_static! {
 | 
			
		||||
    pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
 | 
			
		||||
    pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
 | 
			
		||||
    pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
 | 
			
		||||
    pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
 | 
			
		||||
    pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
 | 
			
		||||
}
 | 
			
		||||
@ -31,6 +31,8 @@ parking_lot = "0.12.3"
 | 
			
		||||
serde_json = "1.0.127"
 | 
			
		||||
tokio = { version = "1.38.0", features = ["full"] }
 | 
			
		||||
task_executor = { path = "../../common/task_executor" }
 | 
			
		||||
lazy_static = "1.4.0"
 | 
			
		||||
metrics = { workspace = true }
 | 
			
		||||
once_cell = { version = "1.19.0", features = [] }
 | 
			
		||||
 | 
			
		||||
[dev-dependencies]
 | 
			
		||||
 | 
			
		||||
@ -1,13 +1,14 @@
 | 
			
		||||
use super::load_chunk::EntryBatch;
 | 
			
		||||
use super::seal_task_manager::SealTaskManager;
 | 
			
		||||
use super::{MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
use crate::config::ShardConfig;
 | 
			
		||||
use crate::error::Error;
 | 
			
		||||
use crate::log_store::load_chunk::EntryBatch;
 | 
			
		||||
use crate::log_store::log_manager::{
 | 
			
		||||
    bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
 | 
			
		||||
    COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
 | 
			
		||||
};
 | 
			
		||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
 | 
			
		||||
use crate::log_store::seal_task_manager::SealTaskManager;
 | 
			
		||||
use crate::log_store::{
 | 
			
		||||
    metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
 | 
			
		||||
};
 | 
			
		||||
use crate::{try_option, ZgsKeyValueDB};
 | 
			
		||||
use any::Any;
 | 
			
		||||
use anyhow::{anyhow, bail, Result};
 | 
			
		||||
@ -22,6 +23,7 @@ use std::cmp::Ordering;
 | 
			
		||||
use std::collections::BTreeMap;
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use std::{any, cmp, mem};
 | 
			
		||||
use tracing::{debug, error, trace};
 | 
			
		||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
 | 
			
		||||
@ -42,7 +44,11 @@ impl FlowStore {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
 | 
			
		||||
        self.db.put_batch_root_list(root_map)
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let res = self.db.put_batch_root_list(root_map);
 | 
			
		||||
 | 
			
		||||
        metrics::PUT_BATCH_ROOT_LIST.update_since(start_time);
 | 
			
		||||
        res
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn insert_subtree_list_for_batch(
 | 
			
		||||
@ -50,6 +56,7 @@ impl FlowStore {
 | 
			
		||||
        batch_index: usize,
 | 
			
		||||
        subtree_list: Vec<(usize, usize, DataRoot)>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut batch = self
 | 
			
		||||
            .db
 | 
			
		||||
            .get_entry_batch(batch_index as u64)?
 | 
			
		||||
@ -57,6 +64,8 @@ impl FlowStore {
 | 
			
		||||
        batch.set_subtree_list(subtree_list);
 | 
			
		||||
        self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
 | 
			
		||||
 | 
			
		||||
        metrics::INSERT_SUBTREE_LIST.update_since(start_time);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -75,7 +84,10 @@ impl FlowStore {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
 | 
			
		||||
        self.db.put_mpt_node_list(node_list)
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let res = self.db.put_mpt_node_list(node_list);
 | 
			
		||||
        metrics::PUT_MPT_NODE.update_since(start_time);
 | 
			
		||||
        res
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
 | 
			
		||||
@ -227,6 +239,7 @@ impl FlowWrite for FlowStore {
 | 
			
		||||
    /// Return the roots of completed chunks. The order is guaranteed to be increasing
 | 
			
		||||
    /// by chunk index.
 | 
			
		||||
    fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut to_seal_set = self.seal_manager.to_seal_set.write();
 | 
			
		||||
        trace!("append_entries: {} {}", data.start_index, data.data.len());
 | 
			
		||||
        if data.data.len() % BYTES_PER_SECTOR != 0 {
 | 
			
		||||
@ -269,6 +282,8 @@ impl FlowWrite for FlowStore {
 | 
			
		||||
 | 
			
		||||
            batch_list.push((chunk_index, batch));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        metrics::APPEND_ENTRIES.update_since(start_time);
 | 
			
		||||
        self.db.put_entry_batch_list(batch_list)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -378,6 +393,7 @@ impl FlowDBStore {
 | 
			
		||||
        &self,
 | 
			
		||||
        batch_list: Vec<(u64, EntryBatch)>,
 | 
			
		||||
    ) -> Result<Vec<(u64, DataRoot)>> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut completed_batches = Vec::new();
 | 
			
		||||
        let mut tx = self.kvdb.transaction();
 | 
			
		||||
        for (batch_index, batch) in batch_list {
 | 
			
		||||
@ -398,6 +414,7 @@ impl FlowDBStore {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        self.kvdb.write(tx)?;
 | 
			
		||||
        metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
 | 
			
		||||
        Ok(completed_batches)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,3 @@
 | 
			
		||||
use super::tx_store::BlockHashAndSubmissionIndex;
 | 
			
		||||
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
use crate::config::ShardConfig;
 | 
			
		||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
 | 
			
		||||
use crate::log_store::tx_store::TransactionStore;
 | 
			
		||||
@ -26,8 +24,13 @@ use std::collections::BTreeMap;
 | 
			
		||||
use std::path::Path;
 | 
			
		||||
use std::sync::mpsc;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use tracing::{debug, error, info, instrument, trace, warn};
 | 
			
		||||
 | 
			
		||||
use crate::log_store::metrics;
 | 
			
		||||
use crate::log_store::tx_store::BlockHashAndSubmissionIndex;
 | 
			
		||||
use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
 | 
			
		||||
/// 256 Bytes
 | 
			
		||||
pub const ENTRY_SIZE: usize = 256;
 | 
			
		||||
/// 1024 Entries.
 | 
			
		||||
@ -875,6 +878,7 @@ impl LogManager {
 | 
			
		||||
        if merkle_list.is_empty() {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
 | 
			
		||||
        self.pad_tx(tx_start_index, &mut *merkle)?;
 | 
			
		||||
 | 
			
		||||
@ -920,12 +924,15 @@ impl LogManager {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        self.flow_store.put_batch_root_list(batch_root_map)?;
 | 
			
		||||
 | 
			
		||||
        metrics::APPEND_SUBTREE_LIST.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip(self, merkle))]
 | 
			
		||||
    fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
 | 
			
		||||
        // Check if we need to pad the flow.
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut tx_start_flow_index =
 | 
			
		||||
            merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
 | 
			
		||||
        let pad_size = tx_start_index - tx_start_flow_index;
 | 
			
		||||
@ -1014,6 +1021,8 @@ impl LogManager {
 | 
			
		||||
            merkle.pora_chunks_merkle.leaves(),
 | 
			
		||||
            merkle.last_chunk_merkle.leaves()
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        metrics::PAD_TX.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -1142,6 +1151,8 @@ impl LogManager {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
 | 
			
		||||
        let mut merkle = self.merkle.write();
 | 
			
		||||
        let shard_config = self.flow_store.get_shard_config();
 | 
			
		||||
        // We have all the data need for this tx, so just copy them.
 | 
			
		||||
@ -1190,6 +1201,8 @@ impl LogManager {
 | 
			
		||||
        for (seq, _) in to_tx_offset_list {
 | 
			
		||||
            self.tx_store.finalize_tx(seq)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -1262,6 +1275,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
 | 
			
		||||
    let start_time = Instant::now();
 | 
			
		||||
    if leaf_data.len() % ENTRY_SIZE != 0 {
 | 
			
		||||
        bail!("merkle_tree: mismatched data size");
 | 
			
		||||
    }
 | 
			
		||||
@ -1277,6 +1291,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
 | 
			
		||||
            .map(Sha3Algorithm::leaf)
 | 
			
		||||
            .collect()
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
 | 
			
		||||
    Ok(r)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										33
									
								
								node/storage/src/log_store/metrics.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								node/storage/src/log_store/metrics.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,33 @@
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
 | 
			
		||||
use metrics::{register_timer, Timer};
 | 
			
		||||
 | 
			
		||||
lazy_static::lazy_static! {
 | 
			
		||||
    pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
 | 
			
		||||
 | 
			
		||||
    pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_check_tx_completed");
 | 
			
		||||
 | 
			
		||||
    pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_append_subtree_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_data_to_merkle_leaves");
 | 
			
		||||
 | 
			
		||||
    pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_copy_tx_and_finalize");
 | 
			
		||||
 | 
			
		||||
    pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
 | 
			
		||||
 | 
			
		||||
    pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_flow_store_insert_subtree_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
 | 
			
		||||
 | 
			
		||||
    pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_flow_store_put_entry_batch_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
 | 
			
		||||
}
 | 
			
		||||
@ -15,6 +15,7 @@ pub mod config;
 | 
			
		||||
mod flow_store;
 | 
			
		||||
mod load_chunk;
 | 
			
		||||
pub mod log_manager;
 | 
			
		||||
mod metrics;
 | 
			
		||||
mod seal_task_manager;
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests;
 | 
			
		||||
 | 
			
		||||
@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
 | 
			
		||||
    data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
 | 
			
		||||
    COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
 | 
			
		||||
};
 | 
			
		||||
use crate::log_store::metrics;
 | 
			
		||||
use crate::{try_option, LogManager, ZgsKeyValueDB};
 | 
			
		||||
use anyhow::{anyhow, Result};
 | 
			
		||||
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
 | 
			
		||||
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::sync::atomic::{AtomicU64, Ordering};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use tracing::{error, instrument};
 | 
			
		||||
 | 
			
		||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
 | 
			
		||||
@ -51,6 +53,8 @@ impl TransactionStore {
 | 
			
		||||
    #[instrument(skip(self))]
 | 
			
		||||
    /// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
 | 
			
		||||
    pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
 | 
			
		||||
        let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
 | 
			
		||||
        if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
 | 
			
		||||
            // The last tx is inserted again, so no need to process it.
 | 
			
		||||
@ -86,6 +90,7 @@ impl TransactionStore {
 | 
			
		||||
        );
 | 
			
		||||
        self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
 | 
			
		||||
        self.kvdb.write(db_tx)?;
 | 
			
		||||
        metrics::TX_STORE_PUT.update_since(start_time);
 | 
			
		||||
        Ok(old_tx_seq_list)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -175,8 +180,12 @@ impl TransactionStore {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
 | 
			
		||||
        Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
 | 
			
		||||
            == Some(vec![TX_STATUS_FINALIZED]))
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
 | 
			
		||||
            == Some(vec![TX_STATUS_FINALIZED]);
 | 
			
		||||
 | 
			
		||||
        metrics::CHECK_TX_COMPLETED.update_since(start_time);
 | 
			
		||||
        Ok(res)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
 | 
			
		||||
 | 
			
		||||
@ -3,6 +3,7 @@
 | 
			
		||||
from test_framework.test_framework import TestFramework
 | 
			
		||||
from utility.utils import wait_until
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AutoRandomSyncV2Test(TestFramework):
 | 
			
		||||
    def setup_params(self):
 | 
			
		||||
        self.num_nodes = 4
 | 
			
		||||
@ -30,5 +31,6 @@ class AutoRandomSyncV2Test(TestFramework):
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    AutoRandomSyncV2Test().main()
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user