From 5100c22933d5ca537f3c606ced24da5da98bddcf Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 8 Oct 2024 15:40:47 +0800 Subject: [PATCH 01/11] Add trait. --- common/append_merkle/src/lib.rs | 43 +++++++++++++--- common/append_merkle/src/node_manager.rs | 53 ++++++++++++++++++++ node/storage/src/log_store/flow_store.rs | 35 +++++++++++-- node/storage/src/log_store/load_chunk/mod.rs | 8 +-- node/storage/src/log_store/log_manager.rs | 23 ++++++--- node/storage/src/log_store/tests.rs | 13 ++++- 6 files changed, 152 insertions(+), 23 deletions(-) create mode 100644 common/append_merkle/src/node_manager.rs diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 55fe806..32677d3 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -1,4 +1,5 @@ mod merkle_tree; +mod node_manager; mod proof; mod sha3; @@ -7,17 +8,20 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::marker::PhantomData; +use std::sync::Arc; use tracing::{trace, warn}; pub use crate::merkle_tree::{ Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES, }; +pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager}; pub use proof::{Proof, RangeProof}; pub use sha3::Sha3Algorithm; pub struct AppendMerkleTree> { /// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves. layers: Vec>, + node_manager: NodeManager, /// Keep the delta nodes that can be used to construct a history tree. /// The key is the root node of that version. delta_nodes_map: BTreeMap>, @@ -33,9 +37,15 @@ pub struct AppendMerkleTree> { } impl> AppendMerkleTree { - pub fn new(leaves: Vec, leaf_height: usize, start_tx_seq: Option) -> Self { + pub fn new( + node_db: Arc>, + leaves: Vec, + leaf_height: usize, + start_tx_seq: Option, + ) -> Self { let mut merkle = Self { layers: vec![leaves], + node_manager: NodeManager::new(node_db), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, @@ -62,12 +72,14 @@ impl> AppendMerkleTree { } pub fn new_with_subtrees( + node_db: Arc>, initial_data: MerkleTreeInitialData, leaf_height: usize, start_tx_seq: Option, ) -> Result { let mut merkle = Self { layers: vec![vec![]], + node_manager: NodeManager::new(node_db), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, @@ -103,6 +115,8 @@ impl> AppendMerkleTree { // Create an empty merkle tree with `depth`. let mut merkle = Self { layers: vec![vec![]; depth], + // dummy node manager for the last chunk. + node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -123,6 +137,8 @@ impl> AppendMerkleTree { layers[0] = leaves; let mut merkle = Self { layers, + // dummy node manager for the last chunk. + node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -699,9 +715,11 @@ macro_rules! ensure_eq { #[cfg(test)] mod tests { use crate::merkle_tree::MerkleTreeRead; + use crate::node_manager::EmptyNodeDatabase; use crate::sha3::Sha3Algorithm; use crate::AppendMerkleTree; use ethereum_types::H256; + use std::sync::Arc; #[test] fn test_proof() { @@ -711,8 +729,12 @@ mod tests { for _ in 0..entry_len { data.push(H256::random()); } - let mut merkle = - AppendMerkleTree::::new(vec![H256::zero()], 0, None); + let mut merkle = AppendMerkleTree::::new( + Arc::new(EmptyNodeDatabase {}), + vec![H256::zero()], + 0, + None, + ); merkle.append_list(data.clone()); merkle.commit(Some(0)); verify(&data, &mut merkle); @@ -739,8 +761,12 @@ mod tests { for _ in 0..entry_len { data.push(H256::random()); } - let mut merkle = - AppendMerkleTree::::new(vec![H256::zero()], 0, None); + let mut merkle = AppendMerkleTree::::new( + Arc::new(EmptyNodeDatabase {}), + vec![H256::zero()], + 0, + None, + ); merkle.append_list(data.clone()); merkle.commit(Some(0)); @@ -764,7 +790,12 @@ mod tests { #[test] fn test_proof_at_version() { let n = [2, 255, 256, 257]; - let mut merkle = AppendMerkleTree::::new(vec![H256::zero()], 0, None); + let mut merkle = AppendMerkleTree::::new( + Arc::new(EmptyNodeDatabase {}), + vec![H256::zero()], + 0, + None, + ); let mut start_pos = 0; for (tx_seq, &entry_len) in n.iter().enumerate() { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs new file mode 100644 index 0000000..dbef6b7 --- /dev/null +++ b/common/append_merkle/src/node_manager.rs @@ -0,0 +1,53 @@ +use crate::HashElement; +use anyhow::Result; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::error; + +pub struct NodeManager { + cache: HashMap<(usize, usize), E>, + db: Arc>, +} + +impl NodeManager { + pub fn new(db: Arc>) -> Self { + Self { + cache: HashMap::new(), + db, + } + } + + pub fn get_node(&self, layer: usize, pos: usize) -> Option { + match self.cache.get(&(layer, pos)) { + Some(node) => Some(node.clone()), + None => self.db.get_node(layer, pos).unwrap_or_else(|e| { + error!("Failed to get node: {}", e); + None + }), + } + } + + pub fn add_node(&mut self, layer: usize, pos: usize, node: E) { + if let Err(e) = self.db.save_node(layer, pos, &node) { + error!("Failed to save node: {}", e); + } + self.cache.insert((layer, pos), node); + } +} + +pub trait NodeDatabase: Send + Sync { + fn get_node(&self, layer: usize, pos: usize) -> Result>; + fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>; +} + +/// A dummy database structure for in-memory merkle tree that will not read/write db. +pub struct EmptyNodeDatabase {} +impl NodeDatabase for EmptyNodeDatabase { + fn get_node(&self, _layer: usize, _pos: usize) -> Result> { + Ok(None) + } + + fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> { + Ok(()) + } +} diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 37ee1e2..b99aa58 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -10,7 +10,7 @@ use crate::log_store::log_manager::{ use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; -use append_merkle::{MerkleTreeInitialData, MerkleTreeRead}; +use append_merkle::{EmptyNodeDatabase, MerkleTreeInitialData, MerkleTreeRead, NodeDatabase}; use itertools::Itertools; use parking_lot::RwLock; use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle}; @@ -25,15 +25,15 @@ use tracing::{debug, error, trace}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; pub struct FlowStore { - db: FlowDBStore, + db: Arc, seal_manager: SealTaskManager, config: FlowConfig, } impl FlowStore { - pub fn new(db: Arc, config: FlowConfig) -> Self { + pub fn new(db: Arc, config: FlowConfig) -> Self { Self { - db: FlowDBStore::new(db), + db, seal_manager: Default::default(), config, } @@ -436,7 +436,13 @@ impl FlowDBStore { let mut expected_index = 0; let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE]; - let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root(); + let empty_root = *Merkle::new( + Arc::new(EmptyNodeDatabase {}), + data_to_merkle_leaves(&empty_data)?, + 0, + None, + ) + .root(); for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) { let (index_bytes, root_bytes) = r?; @@ -666,3 +672,22 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> { let position = try_decode_usize(&data[mem::size_of::()..])?; Ok((layer_index, position)) } + +impl NodeDatabase for FlowDBStore { + fn get_node(&self, layer: usize, pos: usize) -> Result> { + Ok(self + .kvdb + .get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))? + .map(|v| DataRoot::from_slice(&v))) + } + + fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> { + let mut tx = self.kvdb.transaction(); + tx.put( + COL_FLOW_MPT_NODES, + &encode_mpt_node_key(layer, pos), + node.as_bytes(), + ); + Ok(self.kvdb.write(tx)?) + } +} diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index 3daff69..c27f13c 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -4,15 +4,15 @@ mod seal; mod serde; use ::serde::{Deserialize, Serialize}; -use std::cmp::min; - use anyhow::Result; use ethereum_types::H256; use ssz_derive::{Decode, Encode}; +use std::cmp::min; +use std::sync::Arc; use crate::log_store::log_manager::data_to_merkle_leaves; use crate::try_option; -use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; +use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm}; use shared_types::{ChunkArray, DataRoot, Merkle}; use tracing::trace; use zgs_spec::{ @@ -248,7 +248,7 @@ impl EntryBatch { } else { vec![] }; - let mut merkle = Merkle::new(initial_leaves, 0, None); + let mut merkle = Merkle::new(Arc::new(EmptyNodeDatabase {}), initial_leaves, 0, None); for subtree in self.data.get_subtree_list() { trace!(?subtree, "get subtree, leaves={}", merkle.leaves()); if subtree.start_sector != merkle.leaves() { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 20b72ec..4fe4bbb 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1,12 +1,12 @@ use crate::config::ShardConfig; -use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore}; +use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore}; use crate::log_store::tx_store::TransactionStore; use crate::log_store::{ FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite, }; use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; -use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; +use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm}; use ethereum_types::H256; use kvdb_rocksdb::{Database, DatabaseConfig}; use merkle_light::merkle::{log2_pow2, MerkleTree}; @@ -626,7 +626,8 @@ impl LogManager { executor: task_executor::TaskExecutor, ) -> Result { let tx_store = TransactionStore::new(db.clone())?; - let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow)); + let flow_db = Arc::new(FlowDBStore::new(db.clone())); + let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow)); let mut initial_data = flow_store.get_chunk_root_list()?; // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` @@ -705,8 +706,12 @@ impl LogManager { } } - let mut pora_chunks_merkle = - Merkle::new_with_subtrees(initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq)?; + let mut pora_chunks_merkle = Merkle::new_with_subtrees( + flow_db, + initial_data, + log2_pow2(PORA_CHUNK_SIZE), + start_tx_seq, + )?; let last_chunk_merkle = match start_tx_seq { Some(tx_seq) => { tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)? @@ -977,7 +982,13 @@ impl LogManager { let data = pad_data[start_index * ENTRY_SIZE ..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] .to_vec(); - let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root(); + let root = *Merkle::new( + Arc::new(EmptyNodeDatabase {}), + data_to_merkle_leaves(&data)?, + 0, + None, + ) + .root(); merkle.pora_chunks_merkle.append(root); root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1)); start_index += PORA_CHUNK_SIZE; diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index d56613d..aef7d55 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -3,11 +3,14 @@ use crate::log_store::log_manager::{ PORA_CHUNK_SIZE, }; use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite}; -use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm}; +use append_merkle::{ + Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm, +}; use ethereum_types::H256; use rand::random; use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE}; use std::cmp; +use std::sync::Arc; use task_executor::test_utils::TestRuntime; #[test] @@ -26,7 +29,12 @@ fn test_put_get() { data[i * CHUNK_SIZE] = random(); } let (padded_chunks, _) = compute_padded_chunk_size(data_size); - let mut merkle = AppendMerkleTree::::new(vec![H256::zero()], 0, None); + let mut merkle = AppendMerkleTree::::new( + Arc::new(EmptyNodeDatabase {}), + vec![H256::zero()], + 0, + None, + ); merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap()); let mut data_padded = data.clone(); data_padded.append(&mut vec![0u8; CHUNK_SIZE]); @@ -124,6 +132,7 @@ fn test_root() { let mt = sub_merkle_tree(&data).unwrap(); println!("{:?} {}", mt.root(), hex::encode(mt.root())); let append_mt = AppendMerkleTree::::new( + Arc::new(EmptyNodeDatabase {}), data_to_merkle_leaves(&data).unwrap(), 0, None, From 7589bdf4bbe4c3dcdc317df9c974207c5448070b Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 8 Oct 2024 16:19:06 +0800 Subject: [PATCH 02/11] Update merkle tree trait. --- common/append_merkle/src/lib.rs | 31 ++++++++++++-------- common/append_merkle/src/merkle_tree.rs | 16 +++++----- common/append_merkle/src/node_manager.rs | 17 +++++++++++ node/storage/src/log_store/flow_store.rs | 2 +- node/storage/src/log_store/load_chunk/mod.rs | 2 +- node/storage/src/log_store/log_manager.rs | 24 +++++++-------- 6 files changed, 57 insertions(+), 35 deletions(-) diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 32677d3..edae9f0 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -104,7 +104,7 @@ impl> AppendMerkleTree { } for (layer_index, position, h) in initial_data.extra_mpt_nodes { // TODO: Delete duplicate nodes from DB. - merkle.layers[layer_index][position] = h; + merkle.node_manager.add_node(layer_index, position, h); } Ok(merkle) } @@ -385,7 +385,7 @@ impl> AppendMerkleTree { for layer in &self.layers { right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone())); } - let root = self.root().clone(); + let root = self.root(); self.delta_nodes_map .insert(tx_seq, DeltaNodes::new(right_most_nodes)); self.root_to_tx_seq_map.insert(root, tx_seq); @@ -566,7 +566,7 @@ impl> AppendMerkleTree { bail!("empty tree"); } Ok(HistoryTree { - layers: &self.layers, + node_manager: &self.node_manager, delta_nodes, leaf_height: self.leaf_height, }) @@ -596,10 +596,10 @@ impl> AppendMerkleTree { fn first_known_root_at(&self, index: usize) -> (usize, E) { let mut height = 0; let mut index_in_layer = index; - while height < self.layers.len() { + while height < self.node_manager.num_layers() { let node = self.node(height, index_in_layer); if !node.is_null() { - return (height + 1, node.clone()); + return (height + 1, node); } height += 1; index_in_layer /= 2; @@ -644,7 +644,7 @@ impl DeltaNodes { pub struct HistoryTree<'m, E: HashElement> { /// A reference to the global tree nodes. - layers: &'m Vec>, + node_manager: &'m NodeManager, /// The delta nodes that are difference from `layers`. /// This could be a reference, we just take ownership for convenience. delta_nodes: &'m DeltaNodes, @@ -655,16 +655,18 @@ pub struct HistoryTree<'m, E: HashElement> { impl> MerkleTreeRead for AppendMerkleTree { type E = E; - fn node(&self, layer: usize, index: usize) -> &Self::E { - &self.layers[layer][index] + fn node(&self, layer: usize, index: usize) -> Self::E { + self.node_manager + .get_node(layer, index) + .expect("index checked") } fn height(&self) -> usize { - self.layers.len() + self.node_manager.num_layers() } fn layer_len(&self, layer_height: usize) -> usize { - self.layers[layer_height].len() + self.node_manager.layer_size(layer_height) } fn padding_node(&self, height: usize) -> Self::E { @@ -674,10 +676,13 @@ impl> MerkleTreeRead for AppendMerkleTree impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> { type E = E; - fn node(&self, layer: usize, index: usize) -> &Self::E { + fn node(&self, layer: usize, index: usize) -> Self::E { match self.delta_nodes.get(layer, index).expect("range checked") { - Some(node) if *node != E::null() => node, - _ => &self.layers[layer][index], + Some(node) if *node != E::null() => node.clone(), + _ => self + .node_manager + .get_node(layer, index) + .expect("index checked"), } } diff --git a/common/append_merkle/src/merkle_tree.rs b/common/append_merkle/src/merkle_tree.rs index 975cbb3..8e26286 100644 --- a/common/append_merkle/src/merkle_tree.rs +++ b/common/append_merkle/src/merkle_tree.rs @@ -49,7 +49,7 @@ pub trait Algorithm { pub trait MerkleTreeRead { type E: HashElement; - fn node(&self, layer: usize, index: usize) -> &Self::E; + fn node(&self, layer: usize, index: usize) -> Self::E; fn height(&self) -> usize; fn layer_len(&self, layer_height: usize) -> usize; fn padding_node(&self, height: usize) -> Self::E; @@ -58,7 +58,7 @@ pub trait MerkleTreeRead { self.layer_len(0) } - fn root(&self) -> &Self::E { + fn root(&self) -> Self::E { self.node(self.height() - 1, 0) } @@ -70,16 +70,16 @@ pub trait MerkleTreeRead { self.leaves() ); } - if self.node(0, leaf_index) == &Self::E::null() { + if self.node(0, leaf_index) == Self::E::null() { bail!("Not ready to generate proof for leaf_index={}", leaf_index); } if self.height() == 1 { - return Proof::new(vec![self.root().clone(), self.root().clone()], vec![]); + return Proof::new(vec![self.root(), self.root().clone()], vec![]); } let mut lemma: Vec = Vec::with_capacity(self.height()); // path + root let mut path: Vec = Vec::with_capacity(self.height() - 2); // path - 1 let mut index_in_layer = leaf_index; - lemma.push(self.node(0, leaf_index).clone()); + lemma.push(self.node(0, leaf_index)); for height in 0..(self.height() - 1) { trace!( "gen_proof: height={} index={} hash={:?}", @@ -93,15 +93,15 @@ pub trait MerkleTreeRead { // TODO: This can be skipped if the tree size is available in validation. lemma.push(self.padding_node(height)); } else { - lemma.push(self.node(height, index_in_layer + 1).clone()); + lemma.push(self.node(height, index_in_layer + 1)); } } else { path.push(false); - lemma.push(self.node(height, index_in_layer - 1).clone()); + lemma.push(self.node(height, index_in_layer - 1)); } index_in_layer >>= 1; } - lemma.push(self.root().clone()); + lemma.push(self.root()); if lemma.contains(&Self::E::null()) { bail!( "Not enough data to generate proof, lemma={:?} path={:?}", diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index dbef6b7..05615d8 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -6,6 +6,7 @@ use tracing::error; pub struct NodeManager { cache: HashMap<(usize, usize), E>, + layer_size: Vec, db: Arc>, } @@ -13,6 +14,7 @@ impl NodeManager { pub fn new(db: Arc>) -> Self { Self { cache: HashMap::new(), + layer_size: vec![], db, } } @@ -32,6 +34,21 @@ impl NodeManager { error!("Failed to save node: {}", e); } self.cache.insert((layer, pos), node); + if pos + 1 > self.layer_size[layer] { + self.layer_size[layer] = pos + 1; + } + } + + pub fn add_layer(&mut self) { + self.layer_size.push(0); + } + + pub fn layer_size(&self, layer: usize) -> usize { + self.layer_size[layer] + } + + pub fn num_layers(&self) -> usize { + self.layer_size.len() } } diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index b99aa58..06d77f9 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -436,7 +436,7 @@ impl FlowDBStore { let mut expected_index = 0; let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE]; - let empty_root = *Merkle::new( + let empty_root = Merkle::new( Arc::new(EmptyNodeDatabase {}), data_to_merkle_leaves(&empty_data)?, 0, diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index c27f13c..5fa88ee 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -206,7 +206,7 @@ impl EntryBatch { } } Ok(Some( - *try_option!(self.to_merkle_tree(is_first_chunk)?).root(), + try_option!(self.to_merkle_tree(is_first_chunk)?).root(), )) } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 4fe4bbb..32c6d3e 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -116,7 +116,7 @@ impl MerkleManager { if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 { self.last_chunk_merkle.append(H256::zero()); self.pora_chunks_merkle - .update_last(*self.last_chunk_merkle.root()); + .update_last(self.last_chunk_merkle.root()); } else if self.last_chunk_merkle.leaves() != 0 { let last_chunk_start_index = self.last_chunk_start_index(); let last_chunk_data = flow_store.get_available_entries( @@ -355,7 +355,7 @@ impl LogStoreWrite for LogManager { merkle.revert_merkle_tree(tx_seq, &self.tx_store)?; merkle.try_initialize(&self.flow_store)?; assert_eq!( - Some(*merkle.last_chunk_merkle.root()), + Some(merkle.last_chunk_merkle.root()), merkle .pora_chunks_merkle .leaf_at(merkle.pora_chunks_merkle.leaves() - 1)? @@ -577,7 +577,7 @@ impl LogStoreRead for LogManager { fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> { let merkle = self.merkle.read_recursive(); Ok(( - *merkle.pora_chunks_merkle.root(), + merkle.pora_chunks_merkle.root(), merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64, )) } @@ -727,7 +727,7 @@ impl LogManager { last_chunk_merkle.leaves(), ); if last_chunk_merkle.leaves() != 0 { - pora_chunks_merkle.append(*last_chunk_merkle.root()); + pora_chunks_merkle.append(last_chunk_merkle.root()); // update the merkle root pora_chunks_merkle.commit(start_tx_seq); } @@ -893,16 +893,16 @@ impl LogManager { // `last_chunk_merkle` was empty, so this is a new leaf in the top_tree. merkle .pora_chunks_merkle - .append_subtree(1, *merkle.last_chunk_merkle.root())?; + .append_subtree(1, merkle.last_chunk_merkle.root())?; } else { merkle .pora_chunks_merkle - .update_last(*merkle.last_chunk_merkle.root()); + .update_last(merkle.last_chunk_merkle.root()); } if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE { batch_root_map.insert( merkle.pora_chunks_merkle.leaves() - 1, - (*merkle.last_chunk_merkle.root(), 1), + (merkle.last_chunk_merkle.root(), 1), ); self.complete_last_chunk_merkle( merkle.pora_chunks_merkle.leaves() - 1, @@ -958,7 +958,7 @@ impl LogManager { .append_list(data_to_merkle_leaves(&pad_data)?); merkle .pora_chunks_merkle - .update_last(*merkle.last_chunk_merkle.root()); + .update_last(merkle.last_chunk_merkle.root()); } else { if last_chunk_pad != 0 { is_full_empty = false; @@ -968,10 +968,10 @@ impl LogManager { .append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?); merkle .pora_chunks_merkle - .update_last(*merkle.last_chunk_merkle.root()); + .update_last(merkle.last_chunk_merkle.root()); root_map.insert( merkle.pora_chunks_merkle.leaves() - 1, - (*merkle.last_chunk_merkle.root(), 1), + (merkle.last_chunk_merkle.root(), 1), ); completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1); } @@ -982,7 +982,7 @@ impl LogManager { let data = pad_data[start_index * ENTRY_SIZE ..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] .to_vec(); - let root = *Merkle::new( + let root = Merkle::new( Arc::new(EmptyNodeDatabase {}), data_to_merkle_leaves(&data)?, 0, @@ -1068,7 +1068,7 @@ impl LogManager { } merkle .pora_chunks_merkle - .update_last(*merkle.last_chunk_merkle.root()); + .update_last(merkle.last_chunk_merkle.root()); } let chunk_roots = self.flow_store.append_entries(flow_entry_array)?; for (chunk_index, chunk_root) in chunk_roots { From 5e14db64224da5a81db97ac76a307a7d2caf44ab Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Wed, 9 Oct 2024 10:36:13 +0800 Subject: [PATCH 03/11] Use NodeManager. --- Cargo.lock | 1 + common/append_merkle/Cargo.toml | 3 +- common/append_merkle/src/lib.rs | 193 ++++++++++++++--------- common/append_merkle/src/merkle_tree.rs | 7 + common/append_merkle/src/node_manager.rs | 87 +++++++++- node/storage/src/log_store/flow_store.rs | 23 +++ 6 files changed, 230 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64ec1d3..ed02af5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -223,6 +223,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "ethereum-types 0.14.1", + "itertools 0.13.0", "lazy_static", "once_cell", "serde", diff --git a/common/append_merkle/Cargo.toml b/common/append_merkle/Cargo.toml index 21543b2..19f4750 100644 --- a/common/append_merkle/Cargo.toml +++ b/common/append_merkle/Cargo.toml @@ -12,4 +12,5 @@ eth2_ssz_derive = "0.3.0" serde = { version = "1.0.137", features = ["derive"] } lazy_static = "1.4.0" tracing = "0.1.36" -once_cell = "1.19.0" \ No newline at end of file +once_cell = "1.19.0" +itertools = "0.13.0" \ No newline at end of file diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index edae9f0..80e2f1a 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -4,6 +4,7 @@ mod proof; mod sha3; use anyhow::{anyhow, bail, Result}; +use itertools::Itertools; use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -11,6 +12,7 @@ use std::marker::PhantomData; use std::sync::Arc; use tracing::{trace, warn}; +use crate::merkle_tree::MerkleTreeWrite; pub use crate::merkle_tree::{ Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES, }; @@ -20,7 +22,6 @@ pub use sha3::Sha3Algorithm; pub struct AppendMerkleTree> { /// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves. - layers: Vec>, node_manager: NodeManager, /// Keep the delta nodes that can be used to construct a history tree. /// The key is the root node of that version. @@ -44,7 +45,6 @@ impl> AppendMerkleTree { start_tx_seq: Option, ) -> Self { let mut merkle = Self { - layers: vec![leaves], node_manager: NodeManager::new(node_db), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), @@ -52,6 +52,8 @@ impl> AppendMerkleTree { leaf_height, _a: Default::default(), }; + merkle.node_manager.add_layer(); + merkle.node_manager.append_nodes(0, &leaves); if merkle.leaves() == 0 { if let Some(seq) = start_tx_seq { merkle.delta_nodes_map.insert( @@ -78,7 +80,6 @@ impl> AppendMerkleTree { start_tx_seq: Option, ) -> Result { let mut merkle = Self { - layers: vec![vec![]], node_manager: NodeManager::new(node_db), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), @@ -86,6 +87,7 @@ impl> AppendMerkleTree { leaf_height, _a: Default::default(), }; + merkle.node_manager.add_layer(); if initial_data.subtree_list.is_empty() { if let Some(seq) = start_tx_seq { merkle.delta_nodes_map.insert( @@ -104,7 +106,7 @@ impl> AppendMerkleTree { } for (layer_index, position, h) in initial_data.extra_mpt_nodes { // TODO: Delete duplicate nodes from DB. - merkle.node_manager.add_node(layer_index, position, h); + merkle.update_node(layer_index, position, h); } Ok(merkle) } @@ -114,7 +116,6 @@ impl> AppendMerkleTree { if leaves.is_empty() { // Create an empty merkle tree with `depth`. let mut merkle = Self { - layers: vec![vec![]; depth], // dummy node manager for the last chunk. node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), delta_nodes_map: BTreeMap::new(), @@ -123,6 +124,9 @@ impl> AppendMerkleTree { leaf_height: 0, _a: Default::default(), }; + for _ in 0..depth { + merkle.node_manager.add_layer(); + } if let Some(seq) = start_tx_seq { merkle.delta_nodes_map.insert( seq, @@ -133,10 +137,7 @@ impl> AppendMerkleTree { } merkle } else { - let mut layers = vec![vec![]; depth]; - layers[0] = leaves; let mut merkle = Self { - layers, // dummy node manager for the last chunk. node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), delta_nodes_map: BTreeMap::new(), @@ -145,6 +146,11 @@ impl> AppendMerkleTree { leaf_height: 0, _a: Default::default(), }; + merkle.node_manager.add_layer(); + merkle.append_nodes(0, &leaves); + for _ in 1..depth { + merkle.node_manager.add_layer(); + } // Reconstruct the whole tree. merkle.recompute(0, 0, None); // Commit the first version in memory. @@ -158,17 +164,17 @@ impl> AppendMerkleTree { // appending null is not allowed. return; } - self.layers[0].push(new_leaf); + self.node_manager.push_node(0, new_leaf); self.recompute_after_append_leaves(self.leaves() - 1); } - pub fn append_list(&mut self, mut leaf_list: Vec) { + pub fn append_list(&mut self, leaf_list: Vec) { if leaf_list.contains(&E::null()) { // appending null is not allowed. return; } let start_index = self.leaves(); - self.layers[0].append(&mut leaf_list); + self.node_manager.append_nodes(0, &leaf_list); self.recompute_after_append_leaves(start_index); } @@ -208,11 +214,11 @@ impl> AppendMerkleTree { // updating to null is not allowed. return; } - if self.layers[0].is_empty() { + if self.layer_len(0) == 0 { // Special case for the first data. - self.layers[0].push(updated_leaf); + self.push_node(0, updated_leaf); } else { - *self.layers[0].last_mut().unwrap() = updated_leaf; + self.update_node(0, self.layer_len(0) - 1, updated_leaf); } self.recompute_after_append_leaves(self.leaves() - 1); } @@ -223,13 +229,15 @@ impl> AppendMerkleTree { pub fn fill_leaf(&mut self, index: usize, leaf: E) { if leaf == E::null() { // fill leaf with null is not allowed. - } else if self.layers[0][index] == E::null() { - self.layers[0][index] = leaf; + } else if self.node(0, index) == E::null() { + self.update_node(0, index, leaf); self.recompute_after_fill_leaves(index, index + 1); - } else if self.layers[0][index] != leaf { + } else if self.node(0, index) != leaf { panic!( "Fill with invalid leaf, index={} was={:?} get={:?}", - index, self.layers[0][index], leaf + index, + self.node(0, index), + leaf ); } } @@ -296,28 +304,27 @@ impl> AppendMerkleTree { let mut updated_nodes = Vec::new(); // A valid proof should not fail the following checks. for (i, (position, data)) in position_and_data.into_iter().enumerate() { - let layer = &mut self.layers[i]; - if position > layer.len() { + if position > self.layer_len(i) { bail!( "proof position out of range, position={} layer.len()={}", position, - layer.len() + self.layer_len(i) ); } - if position == layer.len() { + if position == self.layer_len(i) { // skip padding node. continue; } - if layer[position] == E::null() { - layer[position] = data.clone(); + if self.node(i, position) == E::null() { + self.update_node(i, position, data.clone()); updated_nodes.push((i, position, data)) - } else if layer[position] != data { + } else if self.node(i, position) != data { // The last node in each layer may have changed in the tree. trace!( "conflict data layer={} position={} tree_data={:?} proof_data={:?}", i, position, - layer[position], + self.node(i, position), data ); } @@ -333,8 +340,8 @@ impl> AppendMerkleTree { if position >= self.leaves() { bail!("Out of bound: position={} end={}", position, self.leaves()); } - if self.layers[0][position] != E::null() { - Ok(Some(self.layers[0][position].clone())) + if self.node(0, position) != E::null() { + Ok(Some(self.node(0, position))) } else { // The leaf hash is unknown. Ok(None) @@ -382,8 +389,9 @@ impl> AppendMerkleTree { return; } let mut right_most_nodes = Vec::new(); - for layer in &self.layers { - right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone())); + for height in 0..self.height() { + let pos = self.layer_len(height) - 1; + right_most_nodes.push((pos, self.node(height, pos))); } let root = self.root(); self.delta_nodes_map @@ -393,8 +401,8 @@ impl> AppendMerkleTree { } fn before_extend_layer(&mut self, height: usize) { - if height == self.layers.len() { - self.layers.push(Vec::new()); + if height == self.height() { + self.node_manager.add_layer() } } @@ -411,7 +419,6 @@ impl> AppendMerkleTree { } /// Given a range of changed leaf nodes and recompute the tree. - /// Since this tree is append-only, we always compute to the end. fn recompute( &mut self, mut start_index: usize, @@ -421,42 +428,51 @@ impl> AppendMerkleTree { start_index >>= height; maybe_end_index = maybe_end_index.map(|end| end >> height); // Loop until we compute the new root and reach `tree_depth`. - while self.layers[height].len() > 1 || height < self.layers.len() - 1 { + while self.layer_len(height) > 1 || height < self.height() - 1 { let next_layer_start_index = start_index >> 1; if start_index % 2 == 1 { start_index -= 1; } - let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len()); - if end_index % 2 == 1 && end_index != self.layers[height].len() { + let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height)); + if end_index % 2 == 1 && end_index != self.layer_len(height) { end_index += 1; } let mut i = 0; - let mut iter = self.layers[height][start_index..end_index].chunks_exact(2); + let iter = self + .node_manager + .get_nodes(height, start_index, end_index) + .chunks(2); // We cannot modify the parent layer while iterating the child layer, // so just keep the changes and update them later. let mut parent_update = Vec::new(); - while let Some([left, right]) = iter.next() { - // If either left or right is null (unknown), we cannot compute the parent hash. - // Note that if we are recompute a range of an existing tree, - // we do not need to keep these possibly null parent. This is only saved - // for the case of constructing a new tree from the leaves. - let parent = if *left == E::null() || *right == E::null() { - E::null() + for chunk_iter in &iter { + let chunk: Vec<_> = chunk_iter.collect(); + if chunk.len() == 2 { + let left = &chunk[0]; + let right = &chunk[1]; + // If either left or right is null (unknown), we cannot compute the parent hash. + // Note that if we are recompute a range of an existing tree, + // we do not need to keep these possibly null parent. This is only saved + // for the case of constructing a new tree from the leaves. + let parent = if *left == E::null() || *right == E::null() { + E::null() + } else { + A::parent(left, right) + }; + parent_update.push((next_layer_start_index + i, parent)); + i += 1; } else { - A::parent(left, right) - }; - parent_update.push((next_layer_start_index + i, parent)); - i += 1; - } - if let [r] = iter.remainder() { - // Same as above. - let parent = if *r == E::null() { - E::null() - } else { - A::parent_single(r, height + self.leaf_height) - }; - parent_update.push((next_layer_start_index + i, parent)); + assert_eq!(chunk.len(), 1); + let r = &chunk[0]; + // Same as above. + let parent = if *r == E::null() { + E::null() + } else { + A::parent_single(r, height + self.leaf_height) + }; + parent_update.push((next_layer_start_index + i, parent)); + } } if !parent_update.is_empty() { self.before_extend_layer(height + 1); @@ -465,27 +481,27 @@ impl> AppendMerkleTree { // we can just overwrite `last_changed_parent_index` with new values. let mut last_changed_parent_index = None; for (parent_index, parent) in parent_update { - match parent_index.cmp(&self.layers[height + 1].len()) { + match parent_index.cmp(&self.layer_len(height + 1)) { Ordering::Less => { // We do not overwrite with null. if parent != E::null() { - if self.layers[height + 1][parent_index] == E::null() + if self.node(height + 1, parent_index) == E::null() // The last node in a layer can be updated. - || (self.layers[height + 1][parent_index] != parent - && parent_index == self.layers[height + 1].len() - 1) + || (self.node(height + 1, parent_index) != parent + && parent_index == self.layer_len(height + 1) - 1) { - self.layers[height + 1][parent_index] = parent; + self.update_node(height + 1, parent_index, parent); last_changed_parent_index = Some(parent_index); - } else if self.layers[height + 1][parent_index] != parent { + } else if self.node(height + 1, parent_index) != parent { // Recompute changes a node in the middle. This should be impossible // if the inputs are valid. panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}", - height + 1, parent_index, self.layers[height + 1][parent_index], parent); + height + 1, parent_index, self.node(height + 1, parent_index), parent); } } } Ordering::Equal => { - self.layers[height + 1].push(parent); + self.push_node(height + 1, parent); last_changed_parent_index = Some(parent_index); } Ordering::Greater => { @@ -516,10 +532,10 @@ impl> AppendMerkleTree { for height in 0..(subtree_depth - 1) { self.before_extend_layer(height); let subtree_layer_size = 1 << (subtree_depth - 1 - height); - self.layers[height].append(&mut vec![E::null(); subtree_layer_size]); + self.append_nodes(height, &vec![E::null(); subtree_layer_size]); } self.before_extend_layer(subtree_depth - 1); - self.layers[subtree_depth - 1].push(subtree_root); + self.push_node(subtree_depth - 1, subtree_root); Ok(()) } @@ -530,21 +546,24 @@ impl> AppendMerkleTree { } pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> { - if self.layers[0].is_empty() { + if self.layer_len(0) == 0 { // Any previous state of an empty tree is always empty. return Ok(()); } let delta_nodes = self .delta_nodes_map .get(&tx_seq) - .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?; + .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))? + .clone(); // Dropping the upper layers that are not in the old merkle tree. - self.layers.truncate(delta_nodes.right_most_nodes.len()); + for height in (self.height() - 1)..=delta_nodes.right_most_nodes.len() { + self.node_manager.truncate_layer(height); + } for (height, (last_index, right_most_node)) in delta_nodes.right_most_nodes.iter().enumerate() { - self.layers[height].truncate(*last_index + 1); - self.layers[height][*last_index] = right_most_node.clone(); + self.node_manager.truncate_nodes(height, *last_index + 1); + self.update_node(height, *last_index, right_most_node.clone()) } self.clear_after(tx_seq); Ok(()) @@ -573,10 +592,14 @@ impl> AppendMerkleTree { } pub fn reset(&mut self) { - self.layers = match self.min_depth { - None => vec![vec![]], - Some(depth) => vec![vec![]; depth], - }; + for height in (self.height() - 1)..=0 { + self.node_manager.truncate_layer(height); + } + if let Some(depth) = self.min_depth { + for _ in 0..depth { + self.node_manager.add_layer(); + } + } } fn clear_after(&mut self, tx_seq: u64) { @@ -596,7 +619,7 @@ impl> AppendMerkleTree { fn first_known_root_at(&self, index: usize) -> (usize, E) { let mut height = 0; let mut index_in_layer = index; - while height < self.node_manager.num_layers() { + while height < self.height() { let node = self.node(height, index_in_layer); if !node.is_null() { return (height + 1, node); @@ -699,6 +722,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> { } } +impl> MerkleTreeWrite for AppendMerkleTree { + type E = E; + + fn push_node(&mut self, layer: usize, node: Self::E) { + self.node_manager.push_node(layer, node); + } + + fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) { + self.node_manager.append_nodes(layer, nodes); + } + + fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) { + self.node_manager.add_node(layer, pos, node); + } +} + #[macro_export] macro_rules! ensure_eq { ($given:expr, $expected:expr) => { diff --git a/common/append_merkle/src/merkle_tree.rs b/common/append_merkle/src/merkle_tree.rs index 8e26286..eac083f 100644 --- a/common/append_merkle/src/merkle_tree.rs +++ b/common/append_merkle/src/merkle_tree.rs @@ -130,6 +130,13 @@ pub trait MerkleTreeRead { } } +pub trait MerkleTreeWrite { + type E: HashElement; + fn push_node(&mut self, layer: usize, node: Self::E); + fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]); + fn update_node(&mut self, layer: usize, pos: usize, node: Self::E); +} + /// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes /// are `null`. Other intermediate nodes will be computed based on these known nodes. pub struct MerkleTreeInitialData { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 05615d8..8fa87ed 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -1,11 +1,11 @@ use crate::HashElement; use anyhow::Result; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::sync::Arc; use tracing::error; pub struct NodeManager { - cache: HashMap<(usize, usize), E>, + cache: BTreeMap<(usize, usize), E>, layer_size: Vec, db: Arc>, } @@ -13,12 +13,30 @@ pub struct NodeManager { impl NodeManager { pub fn new(db: Arc>) -> Self { Self { - cache: HashMap::new(), + cache: BTreeMap::new(), layer_size: vec![], db, } } + pub fn push_node(&mut self, layer: usize, node: E) { + self.add_node(layer, self.layer_size[layer], node); + self.layer_size[layer] += 1; + } + + pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) { + let pos = &mut self.layer_size[layer]; + let mut saved_nodes = Vec::with_capacity(nodes.len()); + for node in nodes { + self.cache.insert((layer, *pos), node.clone()); + saved_nodes.push((layer, *pos, node)); + *pos += 1; + } + if let Err(e) = self.db.save_node_list(&saved_nodes) { + error!("Failed to save node list: {:?}", e); + } + } + pub fn get_node(&self, layer: usize, pos: usize) -> Option { match self.cache.get(&(layer, pos)) { Some(node) => Some(node.clone()), @@ -29,14 +47,20 @@ impl NodeManager { } } + pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator { + NodeIterator { + node_manager: &self, + layer, + start_pos, + end_pos, + } + } + pub fn add_node(&mut self, layer: usize, pos: usize, node: E) { if let Err(e) = self.db.save_node(layer, pos, &node) { error!("Failed to save node: {}", e); } self.cache.insert((layer, pos), node); - if pos + 1 > self.layer_size[layer] { - self.layer_size[layer] = pos + 1; - } } pub fn add_layer(&mut self) { @@ -50,11 +74,54 @@ impl NodeManager { pub fn num_layers(&self) -> usize { self.layer_size.len() } + + pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) { + let mut removed_nodes = Vec::new(); + for pos in pos_end..self.layer_size[layer] { + self.cache.remove(&(layer, pos)); + removed_nodes.push((layer, pos)); + } + if let Err(e) = self.db.remove_node_list(&removed_nodes) { + error!("Failed to remove node list: {:?}", e); + } + self.layer_size[layer] = pos_end; + } + + pub fn truncate_layer(&mut self, layer: usize) { + self.truncate_nodes(layer, 0); + if layer == self.num_layers() - 1 { + self.layer_size.pop(); + } + } +} + +pub struct NodeIterator<'a, E: HashElement> { + node_manager: &'a NodeManager, + layer: usize, + start_pos: usize, + end_pos: usize, +} + +impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> { + type Item = E; + + fn next(&mut self) -> Option { + if self.start_pos < self.end_pos { + let r = self.node_manager.get_node(self.layer, self.start_pos); + self.start_pos += 1; + r + } else { + None + } + } } pub trait NodeDatabase: Send + Sync { fn get_node(&self, layer: usize, pos: usize) -> Result>; fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>; + /// `nodes` are a list of tuples `(layer, pos, node)`. + fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>; + fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>; } /// A dummy database structure for in-memory merkle tree that will not read/write db. @@ -67,4 +134,12 @@ impl NodeDatabase for EmptyNodeDatabase { fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> { Ok(()) } + + fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> { + Ok(()) + } + + fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> { + Ok(()) + } } diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 06d77f9..204f595 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -690,4 +690,27 @@ impl NodeDatabase for FlowDBStore { ); Ok(self.kvdb.write(tx)?) } + + fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> { + let mut tx = self.kvdb.transaction(); + for (layer_index, position, data) in nodes { + tx.put( + COL_FLOW_MPT_NODES, + &encode_mpt_node_key(*layer_index, *position), + data.as_bytes(), + ); + } + Ok(self.kvdb.write(tx)?) + } + + fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> { + let mut tx = self.kvdb.transaction(); + for (layer_index, position) in nodes { + tx.delete( + COL_FLOW_MPT_NODES, + &encode_mpt_node_key(*layer_index, *position), + ); + } + Ok(self.kvdb.write(tx)?) + } } From b16abe23035445406fefe4c5a978ac737c8edd63 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Thu, 10 Oct 2024 19:01:20 +0800 Subject: [PATCH 04/11] fix. --- Cargo.toml | 6 +++++- common/append_merkle/src/lib.rs | 6 ++++-- node/storage/src/log_store/log_manager.rs | 1 + 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 270c70d..9a8a679 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,4 +36,8 @@ eth2_ssz = { path = "version-meld/eth2_ssz" } enr = { path = "version-meld/enr" } [profile.bench.package.'storage'] -debug = true \ No newline at end of file +debug = true + +[profile.dev] +# enabling debug_assertions will make node fail to start because of checks in `clap`. +debug-assertions = false \ No newline at end of file diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 80e2f1a..bfbf1b5 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -556,7 +556,7 @@ impl> AppendMerkleTree { .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))? .clone(); // Dropping the upper layers that are not in the old merkle tree. - for height in (self.height() - 1)..=delta_nodes.right_most_nodes.len() { + for height in (delta_nodes.right_most_nodes.len()..(self.height() - 1)).rev() { self.node_manager.truncate_layer(height); } for (height, (last_index, right_most_node)) in @@ -592,13 +592,15 @@ impl> AppendMerkleTree { } pub fn reset(&mut self) { - for height in (self.height() - 1)..=0 { + for height in (0..self.height()).rev() { self.node_manager.truncate_layer(height); } if let Some(depth) = self.min_depth { for _ in 0..depth { self.node_manager.add_layer(); } + } else { + self.node_manager.add_layer(); } } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 32c6d3e..2d07ef9 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -94,6 +94,7 @@ impl MerkleManager { } fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> { + debug!("revert merkle tree {}", tx_seq); // Special case for reverting tx_seq == 0 if tx_seq == u64::MAX { self.pora_chunks_merkle.reset(); From fd96737c6dbc5e4cd389d01d07d68a4f31473a53 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Thu, 10 Oct 2024 20:38:02 +0800 Subject: [PATCH 05/11] Use LRU for cache. --- Cargo.lock | 33 +++++++++++++++-- common/append_merkle/Cargo.toml | 3 +- common/append_merkle/src/lib.rs | 39 ++++++-------------- common/append_merkle/src/node_manager.rs | 25 +++++++++---- node/src/client/builder.rs | 2 +- node/src/config/convert.rs | 4 ++ node/src/config/mod.rs | 1 + node/storage/src/config.rs | 2 + node/storage/src/log_store/flow_store.rs | 13 +++---- node/storage/src/log_store/load_chunk/mod.rs | 5 +-- node/storage/src/log_store/log_manager.rs | 13 ++----- node/storage/src/log_store/tests.rs | 8 +--- 12 files changed, 80 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed02af5..0bdf505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,7 @@ dependencies = [ "ethereum-types 0.14.1", "itertools 0.13.0", "lazy_static", + "lru 0.12.5", "once_cell", "serde", "tiny-keccak", @@ -1674,7 +1675,7 @@ dependencies = [ "hkdf", "lazy_static", "libp2p-core 0.30.2", - "lru", + "lru 0.7.8", "parking_lot 0.11.2", "rand 0.8.5", "rlp", @@ -2515,6 +2516,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -2947,6 +2954,17 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "hashers" version = "1.0.1" @@ -4118,7 +4136,7 @@ dependencies = [ "libp2p-core 0.33.0", "libp2p-swarm", "log", - "lru", + "lru 0.7.8", "prost 0.10.4", "prost-build 0.10.4", "prost-codec", @@ -4651,6 +4669,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.0", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -5019,7 +5046,7 @@ dependencies = [ "lazy_static", "libp2p", "lighthouse_metrics", - "lru", + "lru 0.7.8", "parking_lot 0.12.3", "rand 0.8.5", "regex", diff --git a/common/append_merkle/Cargo.toml b/common/append_merkle/Cargo.toml index 19f4750..2b1b8d6 100644 --- a/common/append_merkle/Cargo.toml +++ b/common/append_merkle/Cargo.toml @@ -13,4 +13,5 @@ serde = { version = "1.0.137", features = ["derive"] } lazy_static = "1.4.0" tracing = "0.1.36" once_cell = "1.19.0" -itertools = "0.13.0" \ No newline at end of file +itertools = "0.13.0" +lru = "0.12.5" \ No newline at end of file diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index bfbf1b5..eb02147 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -38,14 +38,9 @@ pub struct AppendMerkleTree> { } impl> AppendMerkleTree { - pub fn new( - node_db: Arc>, - leaves: Vec, - leaf_height: usize, - start_tx_seq: Option, - ) -> Self { + pub fn new(leaves: Vec, leaf_height: usize, start_tx_seq: Option) -> Self { let mut merkle = Self { - node_manager: NodeManager::new(node_db), + node_manager: NodeManager::new_dummy(), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, @@ -75,12 +70,13 @@ impl> AppendMerkleTree { pub fn new_with_subtrees( node_db: Arc>, + node_cache_capacity: usize, initial_data: MerkleTreeInitialData, leaf_height: usize, start_tx_seq: Option, ) -> Result { let mut merkle = Self { - node_manager: NodeManager::new(node_db), + node_manager: NodeManager::new(node_db, node_cache_capacity), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, @@ -117,7 +113,7 @@ impl> AppendMerkleTree { // Create an empty merkle tree with `depth`. let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), + node_manager: NodeManager::new_dummy(), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -139,7 +135,7 @@ impl> AppendMerkleTree { } else { let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), + node_manager: NodeManager::new_dummy(), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -775,12 +771,8 @@ mod tests { for _ in 0..entry_len { data.push(H256::random()); } - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = + AppendMerkleTree::::new(vec![H256::zero()], 0, None); merkle.append_list(data.clone()); merkle.commit(Some(0)); verify(&data, &mut merkle); @@ -807,12 +799,8 @@ mod tests { for _ in 0..entry_len { data.push(H256::random()); } - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = + AppendMerkleTree::::new(vec![H256::zero()], 0, None); merkle.append_list(data.clone()); merkle.commit(Some(0)); @@ -836,12 +824,7 @@ mod tests { #[test] fn test_proof_at_version() { let n = [2, 255, 256, 257]; - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = AppendMerkleTree::::new(vec![H256::zero()], 0, None); let mut start_pos = 0; for (tx_seq, &entry_len) in n.iter().enumerate() { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 8fa87ed..369fc2f 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -1,24 +1,33 @@ use crate::HashElement; use anyhow::Result; -use std::collections::BTreeMap; +use lru::LruCache; +use std::num::NonZeroUsize; use std::sync::Arc; use tracing::error; pub struct NodeManager { - cache: BTreeMap<(usize, usize), E>, + cache: LruCache<(usize, usize), E>, layer_size: Vec, db: Arc>, } impl NodeManager { - pub fn new(db: Arc>) -> Self { + pub fn new(db: Arc>, capacity: usize) -> Self { Self { - cache: BTreeMap::new(), + cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")), layer_size: vec![], db, } } + pub fn new_dummy() -> Self { + Self { + cache: LruCache::unbounded(), + layer_size: vec![], + db: Arc::new(EmptyNodeDatabase {}), + } + } + pub fn push_node(&mut self, layer: usize, node: E) { self.add_node(layer, self.layer_size[layer], node); self.layer_size[layer] += 1; @@ -28,7 +37,7 @@ impl NodeManager { let pos = &mut self.layer_size[layer]; let mut saved_nodes = Vec::with_capacity(nodes.len()); for node in nodes { - self.cache.insert((layer, *pos), node.clone()); + self.cache.put((layer, *pos), node.clone()); saved_nodes.push((layer, *pos, node)); *pos += 1; } @@ -38,7 +47,7 @@ impl NodeManager { } pub fn get_node(&self, layer: usize, pos: usize) -> Option { - match self.cache.get(&(layer, pos)) { + match self.cache.peek(&(layer, pos)) { Some(node) => Some(node.clone()), None => self.db.get_node(layer, pos).unwrap_or_else(|e| { error!("Failed to get node: {}", e); @@ -60,7 +69,7 @@ impl NodeManager { if let Err(e) = self.db.save_node(layer, pos, &node) { error!("Failed to save node: {}", e); } - self.cache.insert((layer, pos), node); + self.cache.put((layer, pos), node); } pub fn add_layer(&mut self) { @@ -78,7 +87,7 @@ impl NodeManager { pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) { let mut removed_nodes = Vec::new(); for pos in pos_end..self.layer_size[layer] { - self.cache.remove(&(layer, pos)); + self.cache.pop(&(layer, pos)); removed_nodes.push((layer, pos)); } if let Err(e) = self.db.remove_node_list(&removed_nodes) { diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 4d742e1..0cb284d 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -112,7 +112,7 @@ impl ClientBuilder { pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result { let executor = require!("sync", self, runtime_context).clone().executor; let store = Arc::new( - LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor) + LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, ); diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index e871b84..43ca653 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion}; use std::net::IpAddr; use std::time::Duration; use storage::config::ShardConfig; +use storage::log_store::log_manager::LogConfig; use storage::StorageConfig; impl ZgsConfig { @@ -101,8 +102,11 @@ impl ZgsConfig { } pub fn storage_config(&self) -> Result { + let mut log_config = LogConfig::default(); + log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity; Ok(StorageConfig { db_dir: self.db_dir.clone().into(), + log_config, }) } diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index bad72b9..6d30745 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -60,6 +60,7 @@ build_config! { (prune_check_time_s, (u64), 60) (prune_batch_size, (usize), 16 * 1024) (prune_batch_wait_time_ms, (u64), 1000) + (merkle_node_cache_capacity, (usize), 32 * 1024 * 1024) // misc (log_config_file, (String), "log_config".to_string()) diff --git a/node/storage/src/config.rs b/node/storage/src/config.rs index b5fde0c..2b7160e 100644 --- a/node/storage/src/config.rs +++ b/node/storage/src/config.rs @@ -1,3 +1,4 @@ +use crate::log_store::log_manager::LogConfig; use serde::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr}; @@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config"; #[derive(Clone)] pub struct Config { pub db_dir: PathBuf, + pub log_config: LogConfig, } #[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)] diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 204f595..e6c454a 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -10,7 +10,7 @@ use crate::log_store::log_manager::{ use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; -use append_merkle::{EmptyNodeDatabase, MerkleTreeInitialData, MerkleTreeRead, NodeDatabase}; +use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase}; use itertools::Itertools; use parking_lot::RwLock; use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle}; @@ -93,6 +93,7 @@ impl FlowStore { #[derive(Clone, Debug)] pub struct FlowConfig { pub batch_size: usize, + pub merkle_node_cache_capacity: usize, pub shard_config: Arc>, } @@ -100,6 +101,8 @@ impl Default for FlowConfig { fn default() -> Self { Self { batch_size: SECTORS_PER_LOAD, + // Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size. + merkle_node_cache_capacity: 32 * 1024 * 1024, shard_config: Default::default(), } } @@ -436,13 +439,7 @@ impl FlowDBStore { let mut expected_index = 0; let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE]; - let empty_root = Merkle::new( - Arc::new(EmptyNodeDatabase {}), - data_to_merkle_leaves(&empty_data)?, - 0, - None, - ) - .root(); + let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root(); for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) { let (index_bytes, root_bytes) = r?; diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index 5fa88ee..0a68dbc 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -8,11 +8,10 @@ use anyhow::Result; use ethereum_types::H256; use ssz_derive::{Decode, Encode}; use std::cmp::min; -use std::sync::Arc; use crate::log_store::log_manager::data_to_merkle_leaves; use crate::try_option; -use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm}; +use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; use shared_types::{ChunkArray, DataRoot, Merkle}; use tracing::trace; use zgs_spec::{ @@ -248,7 +247,7 @@ impl EntryBatch { } else { vec![] }; - let mut merkle = Merkle::new(Arc::new(EmptyNodeDatabase {}), initial_leaves, 0, None); + let mut merkle = Merkle::new(initial_leaves, 0, None); for subtree in self.data.get_subtree_list() { trace!(?subtree, "get subtree, leaves={}", merkle.leaves()); if subtree.start_sector != merkle.leaves() { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 2d07ef9..d81f500 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -6,7 +6,7 @@ use crate::log_store::{ }; use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; -use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm}; +use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; use ethereum_types::H256; use kvdb_rocksdb::{Database, DatabaseConfig}; use merkle_light::merkle::{log2_pow2, MerkleTree}; @@ -628,7 +628,7 @@ impl LogManager { ) -> Result { let tx_store = TransactionStore::new(db.clone())?; let flow_db = Arc::new(FlowDBStore::new(db.clone())); - let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow)); + let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone())); let mut initial_data = flow_store.get_chunk_root_list()?; // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` @@ -709,6 +709,7 @@ impl LogManager { let mut pora_chunks_merkle = Merkle::new_with_subtrees( flow_db, + config.flow.merkle_node_cache_capacity, initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq, @@ -983,13 +984,7 @@ impl LogManager { let data = pad_data[start_index * ENTRY_SIZE ..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] .to_vec(); - let root = Merkle::new( - Arc::new(EmptyNodeDatabase {}), - data_to_merkle_leaves(&data)?, - 0, - None, - ) - .root(); + let root = Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root(); merkle.pora_chunks_merkle.append(root); root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1)); start_index += PORA_CHUNK_SIZE; diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index aef7d55..d77b5f5 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -29,12 +29,7 @@ fn test_put_get() { data[i * CHUNK_SIZE] = random(); } let (padded_chunks, _) = compute_padded_chunk_size(data_size); - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = AppendMerkleTree::::new(vec![H256::zero()], 0, None); merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap()); let mut data_padded = data.clone(); data_padded.append(&mut vec![0u8; CHUNK_SIZE]); @@ -132,7 +127,6 @@ fn test_root() { let mt = sub_merkle_tree(&data).unwrap(); println!("{:?} {}", mt.root(), hex::encode(mt.root())); let append_mt = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), data_to_merkle_leaves(&data).unwrap(), 0, None, From 90650294afd260a1081f43151f18cc91fc93b1a4 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Thu, 10 Oct 2024 20:50:30 +0800 Subject: [PATCH 06/11] fix clippy. --- common/append_merkle/src/node_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 369fc2f..259e812 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -58,7 +58,7 @@ impl NodeManager { pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator { NodeIterator { - node_manager: &self, + node_manager: self, layer, start_pos, end_pos, From 19829f1def24b8b124dad6a4b460094a723f2323 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 14 Oct 2024 20:41:32 +0800 Subject: [PATCH 07/11] Save layer size. --- common/append_merkle/src/lib.rs | 54 ++++++++++----- common/append_merkle/src/node_manager.rs | 88 ++++++++++++++++++------ node/storage/src/log_store/flow_store.rs | 64 +++++++++++++---- node/storage/src/log_store/tests.rs | 6 +- tests/example_test.py | 21 +++++- 5 files changed, 178 insertions(+), 55 deletions(-) diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index eb02147..70faf2e 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -16,7 +16,7 @@ use crate::merkle_tree::MerkleTreeWrite; pub use crate::merkle_tree::{ Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES, }; -pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager}; +pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction}; pub use proof::{Proof, RangeProof}; pub use sha3::Sha3Algorithm; @@ -160,8 +160,10 @@ impl> AppendMerkleTree { // appending null is not allowed. return; } + self.node_manager.start_transaction(); self.node_manager.push_node(0, new_leaf); self.recompute_after_append_leaves(self.leaves() - 1); + self.node_manager.commit(); } pub fn append_list(&mut self, leaf_list: Vec) { @@ -169,9 +171,11 @@ impl> AppendMerkleTree { // appending null is not allowed. return; } + self.node_manager.start_transaction(); let start_index = self.leaves(); self.node_manager.append_nodes(0, &leaf_list); self.recompute_after_append_leaves(start_index); + self.node_manager.commit(); } /// Append a leaf list by providing their intermediate node hash. @@ -184,9 +188,11 @@ impl> AppendMerkleTree { // appending null is not allowed. bail!("subtree_root is null"); } + self.node_manager.start_transaction(); let start_index = self.leaves(); self.append_subtree_inner(subtree_depth, subtree_root)?; self.recompute_after_append_subtree(start_index, subtree_depth - 1); + self.node_manager.commit(); Ok(()) } @@ -195,11 +201,13 @@ impl> AppendMerkleTree { // appending null is not allowed. bail!("subtree_list contains null"); } + self.node_manager.start_transaction(); for (subtree_depth, subtree_root) in subtree_list { let start_index = self.leaves(); self.append_subtree_inner(subtree_depth, subtree_root)?; self.recompute_after_append_subtree(start_index, subtree_depth - 1); } + self.node_manager.commit(); Ok(()) } @@ -210,6 +218,7 @@ impl> AppendMerkleTree { // updating to null is not allowed. return; } + self.node_manager.start_transaction(); if self.layer_len(0) == 0 { // Special case for the first data. self.push_node(0, updated_leaf); @@ -217,6 +226,7 @@ impl> AppendMerkleTree { self.update_node(0, self.layer_len(0) - 1, updated_leaf); } self.recompute_after_append_leaves(self.leaves() - 1); + self.node_manager.commit(); } /// Fill an unknown `null` leaf with its real value. @@ -226,8 +236,10 @@ impl> AppendMerkleTree { if leaf == E::null() { // fill leaf with null is not allowed. } else if self.node(0, index) == E::null() { + self.node_manager.start_transaction(); self.update_node(0, index, leaf); self.recompute_after_fill_leaves(index, index + 1); + self.node_manager.commit(); } else if self.node(0, index) != leaf { panic!( "Fill with invalid leaf, index={} was={:?} get={:?}", @@ -246,18 +258,26 @@ impl> AppendMerkleTree { &mut self, proof: RangeProof, ) -> Result> { - self.fill_with_proof( - proof - .left_proof - .proof_nodes_in_tree() - .split_off(self.leaf_height), - )?; - self.fill_with_proof( - proof - .right_proof - .proof_nodes_in_tree() - .split_off(self.leaf_height), - ) + self.node_manager.start_transaction(); + let mut updated_nodes = Vec::new(); + updated_nodes.append( + &mut self.fill_with_proof( + proof + .left_proof + .proof_nodes_in_tree() + .split_off(self.leaf_height), + )?, + ); + updated_nodes.append( + &mut self.fill_with_proof( + proof + .right_proof + .proof_nodes_in_tree() + .split_off(self.leaf_height), + )?, + ); + self.node_manager.commit(); + Ok(updated_nodes) } pub fn fill_with_file_proof( @@ -282,13 +302,16 @@ impl> AppendMerkleTree { if tx_merkle_nodes.is_empty() { return Ok(Vec::new()); } + self.node_manager.start_transaction(); let mut position_and_data = proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size); let start_index = (start_index >> self.leaf_height) as usize; for (i, (position, _)) in position_and_data.iter_mut().enumerate() { *position += start_index >> i; } - self.fill_with_proof(position_and_data) + let updated_nodes = self.fill_with_proof(position_and_data)?; + self.node_manager.commit(); + Ok(updated_nodes) } /// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof. @@ -757,11 +780,10 @@ macro_rules! ensure_eq { #[cfg(test)] mod tests { use crate::merkle_tree::MerkleTreeRead; - use crate::node_manager::EmptyNodeDatabase; + use crate::sha3::Sha3Algorithm; use crate::AppendMerkleTree; use ethereum_types::H256; - use std::sync::Arc; #[test] fn test_proof() { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 259e812..a583467 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -9,6 +9,7 @@ pub struct NodeManager { cache: LruCache<(usize, usize), E>, layer_size: Vec, db: Arc>, + db_tx: Option>>, } impl NodeManager { @@ -17,6 +18,7 @@ impl NodeManager { cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")), layer_size: vec![], db, + db_tx: None, } } @@ -25,6 +27,7 @@ impl NodeManager { cache: LruCache::unbounded(), layer_size: vec![], db: Arc::new(EmptyNodeDatabase {}), + db_tx: None, } } @@ -41,9 +44,9 @@ impl NodeManager { saved_nodes.push((layer, *pos, node)); *pos += 1; } - if let Err(e) = self.db.save_node_list(&saved_nodes) { - error!("Failed to save node list: {:?}", e); - } + let size = *pos; + self.db_tx().save_layer_size(layer, size); + self.db_tx().save_node_list(&saved_nodes); } pub fn get_node(&self, layer: usize, pos: usize) -> Option { @@ -66,14 +69,17 @@ impl NodeManager { } pub fn add_node(&mut self, layer: usize, pos: usize, node: E) { - if let Err(e) = self.db.save_node(layer, pos, &node) { - error!("Failed to save node: {}", e); + // No need to insert if the value is unchanged. + if self.cache.get(&(layer, pos)) != Some(&node) { + self.db_tx().save_node(layer, pos, &node); + self.cache.put((layer, pos), node); } - self.cache.put((layer, pos), node); } pub fn add_layer(&mut self) { self.layer_size.push(0); + let layer = self.layer_size.len() - 1; + self.db_tx().save_layer_size(layer, 0); } pub fn layer_size(&self, layer: usize) -> usize { @@ -90,18 +96,42 @@ impl NodeManager { self.cache.pop(&(layer, pos)); removed_nodes.push((layer, pos)); } - if let Err(e) = self.db.remove_node_list(&removed_nodes) { - error!("Failed to remove node list: {:?}", e); - } + self.db_tx().remove_node_list(&removed_nodes); self.layer_size[layer] = pos_end; + self.db_tx().save_layer_size(layer, pos_end); } pub fn truncate_layer(&mut self, layer: usize) { self.truncate_nodes(layer, 0); if layer == self.num_layers() - 1 { self.layer_size.pop(); + self.db_tx().remove_layer_size(layer); } } + + pub fn start_transaction(&mut self) { + if self.db_tx.is_none() { + error!("start new tx before commit"); + } + self.db_tx = Some(self.db.start_transaction()); + } + + pub fn commit(&mut self) { + let tx = match self.db_tx.take() { + Some(tx) => tx, + None => { + error!("db_tx is None"); + return; + } + }; + if let Err(e) = self.db.commit(tx) { + error!("Failed to commit db transaction: {}", e); + } + } + + fn db_tx(&mut self) -> &mut dyn NodeTransaction { + (*self.db_tx.as_mut().expect("tx checked")).as_mut() + } } pub struct NodeIterator<'a, E: HashElement> { @@ -127,28 +157,46 @@ impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> { pub trait NodeDatabase: Send + Sync { fn get_node(&self, layer: usize, pos: usize) -> Result>; - fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>; + fn get_layer_size(&self, layer: usize) -> Result>; + fn start_transaction(&self) -> Box>; + fn commit(&self, tx: Box>) -> Result<()>; +} + +pub trait NodeTransaction: Send + Sync { + fn save_node(&mut self, layer: usize, pos: usize, node: &E); /// `nodes` are a list of tuples `(layer, pos, node)`. - fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>; - fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>; + fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]); + fn remove_node_list(&mut self, nodes: &[(usize, usize)]); + fn save_layer_size(&mut self, layer: usize, size: usize); + fn remove_layer_size(&mut self, layer: usize); } /// A dummy database structure for in-memory merkle tree that will not read/write db. pub struct EmptyNodeDatabase {} +pub struct EmptyNodeTransaction {} impl NodeDatabase for EmptyNodeDatabase { fn get_node(&self, _layer: usize, _pos: usize) -> Result> { Ok(None) } - - fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> { - Ok(()) + fn get_layer_size(&self, _layer: usize) -> Result> { + Ok(None) } - - fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> { - Ok(()) + fn start_transaction(&self) -> Box> { + Box::new(EmptyNodeTransaction {}) } - - fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> { + fn commit(&self, _tx: Box>) -> Result<()> { Ok(()) } } + +impl NodeTransaction for EmptyNodeTransaction { + fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {} + + fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {} + + fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {} + + fn save_layer_size(&mut self, _layer: usize, _size: usize) {} + + fn remove_layer_size(&mut self, _layer: usize) {} +} diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index e6c454a..dda1688 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -9,9 +9,11 @@ use crate::log_store::log_manager::{ }; use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::{try_option, ZgsKeyValueDB}; +use any::Any; use anyhow::{anyhow, bail, Result}; -use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase}; +use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction}; use itertools::Itertools; +use kvdb::DBTransaction; use parking_lot::RwLock; use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle}; use ssz::{Decode, Encode}; @@ -20,7 +22,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Debug; use std::sync::Arc; -use std::{cmp, mem}; +use std::{any, cmp, mem}; use tracing::{debug, error, trace}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; @@ -670,6 +672,14 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> { Ok((layer_index, position)) } +fn layer_size_key(layer: usize) -> Vec { + let mut key = "layer_size".as_bytes().to_vec(); + key.extend_from_slice(&layer.to_be_bytes()); + key +} + +pub struct NodeDBTransaction(DBTransaction); + impl NodeDatabase for FlowDBStore { fn get_node(&self, layer: usize, pos: usize) -> Result> { Ok(self @@ -678,36 +688,62 @@ impl NodeDatabase for FlowDBStore { .map(|v| DataRoot::from_slice(&v))) } - fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> { - let mut tx = self.kvdb.transaction(); - tx.put( + fn get_layer_size(&self, layer: usize) -> Result> { + match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? { + Some(v) => Ok(Some(try_decode_usize(&v)?)), + None => Ok(None), + } + } + + fn start_transaction(&self) -> Box> { + Box::new(NodeDBTransaction(self.kvdb.transaction())) + } + + fn commit(&self, tx: Box>) -> Result<()> { + let db_tx: &NodeDBTransaction = (&tx as &dyn Any) + .downcast_ref() + .ok_or(anyhow!("downcast failed"))?; + self.kvdb.write(db_tx.0.clone()).map_err(Into::into) + } +} + +impl NodeTransaction for NodeDBTransaction { + fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) { + self.0.put( COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos), node.as_bytes(), ); - Ok(self.kvdb.write(tx)?) } - fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> { - let mut tx = self.kvdb.transaction(); + fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) { for (layer_index, position, data) in nodes { - tx.put( + self.0.put( COL_FLOW_MPT_NODES, &encode_mpt_node_key(*layer_index, *position), data.as_bytes(), ); } - Ok(self.kvdb.write(tx)?) } - fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> { - let mut tx = self.kvdb.transaction(); + fn remove_node_list(&mut self, nodes: &[(usize, usize)]) { for (layer_index, position) in nodes { - tx.delete( + self.0.delete( COL_FLOW_MPT_NODES, &encode_mpt_node_key(*layer_index, *position), ); } - Ok(self.kvdb.write(tx)?) + } + + fn save_layer_size(&mut self, layer: usize, size: usize) { + self.0.put( + COL_FLOW_MPT_NODES, + &layer_size_key(layer), + &size.to_be_bytes(), + ); + } + + fn remove_layer_size(&mut self, layer: usize) { + self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer)); } } diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index d77b5f5..50b4cc5 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -3,14 +3,12 @@ use crate::log_store::log_manager::{ PORA_CHUNK_SIZE, }; use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite}; -use append_merkle::{ - Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm, -}; +use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm}; use ethereum_types::H256; use rand::random; use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE}; use std::cmp; -use std::sync::Arc; + use task_executor::test_utils::TestRuntime; #[test] diff --git a/tests/example_test.py b/tests/example_test.py index 19ff56c..9fd6bf4 100755 --- a/tests/example_test.py +++ b/tests/example_test.py @@ -6,10 +6,15 @@ from utility.utils import wait_until class ExampleTest(TestFramework): + def setup_params(self): + self.zgs_node_configs[0] = { + "merkle_node_cache_capacity": 1024, + } + def run_test(self): client = self.nodes[0] - chunk_data = b"\x02" * 5253123 + chunk_data = b"\x02" * 256 * 1024 * 1024 * 3 submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions) wait_until(lambda: self.contract.num_submissions() == 1) @@ -19,6 +24,20 @@ class ExampleTest(TestFramework): self.log.info("segment: %s", len(segment)) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + self.stop_storage_node(0) + self.start_storage_node(0) + self.nodes[0].wait_for_rpc_connection() + + chunk_data = b"\x03" * 256 * (1024 * 765 + 5) + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + wait_until(lambda: self.contract.num_submissions() == 2) + wait_until(lambda: client.zgs_get_file_info(data_root) is not None) + + segment = submit_data(client, chunk_data) + self.log.info("segment: %s", len(segment)) + wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + if __name__ == "__main__": ExampleTest().main() From 9c2f6e9d7d2c59da9b3ff6ec8f7b33a912b8f551 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 14 Oct 2024 23:35:59 +0800 Subject: [PATCH 08/11] Initialize LogManager with NodeManager. --- common/append_merkle/src/lib.rs | 55 +++++++++++---------- common/append_merkle/src/node_manager.rs | 14 ++++-- node/storage/src/log_store/log_manager.rs | 58 +++++------------------ 3 files changed, 51 insertions(+), 76 deletions(-) diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 70faf2e..12f8fcf 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -71,49 +71,33 @@ impl> AppendMerkleTree { pub fn new_with_subtrees( node_db: Arc>, node_cache_capacity: usize, - initial_data: MerkleTreeInitialData, leaf_height: usize, - start_tx_seq: Option, ) -> Result { let mut merkle = Self { - node_manager: NodeManager::new(node_db, node_cache_capacity), + node_manager: NodeManager::new(node_db, node_cache_capacity)?, delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, leaf_height, _a: Default::default(), }; - merkle.node_manager.add_layer(); - if initial_data.subtree_list.is_empty() { - if let Some(seq) = start_tx_seq { - merkle.delta_nodes_map.insert( - seq, - DeltaNodes { - right_most_nodes: vec![], - }, - ); - } - return Ok(merkle); - } - merkle.append_subtree_list(initial_data.subtree_list)?; - merkle.commit(start_tx_seq); - for (index, h) in initial_data.known_leaves { - merkle.fill_leaf(index, h); - } - for (layer_index, position, h) in initial_data.extra_mpt_nodes { - // TODO: Delete duplicate nodes from DB. - merkle.update_node(layer_index, position, h); + if merkle.height() == 0 { + merkle.node_manager.start_transaction(); + merkle.node_manager.add_layer(); + merkle.node_manager.commit(); } Ok(merkle) } /// This is only used for the last chunk, so `leaf_height` is always 0 so far. pub fn new_with_depth(leaves: Vec, depth: usize, start_tx_seq: Option) -> Self { + let mut node_manager = NodeManager::new_dummy(); + node_manager.start_transaction(); if leaves.is_empty() { // Create an empty merkle tree with `depth`. let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new_dummy(), + node_manager, delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -135,7 +119,7 @@ impl> AppendMerkleTree { } else { let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new_dummy(), + node_manager, delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -569,6 +553,7 @@ impl> AppendMerkleTree { // Any previous state of an empty tree is always empty. return Ok(()); } + self.node_manager.start_transaction(); let delta_nodes = self .delta_nodes_map .get(&tx_seq) @@ -585,6 +570,24 @@ impl> AppendMerkleTree { self.update_node(height, *last_index, right_most_node.clone()) } self.clear_after(tx_seq); + self.node_manager.commit(); + Ok(()) + } + + // Revert to a tx_seq not in `delta_nodes_map`. + // This is needed to revert the last unfinished tx after restart. + pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> { + self.node_manager.start_transaction(); + for height in (0..self.height()).rev() { + let kept_nodes = leaves >> height; + if kept_nodes == 0 { + self.node_manager.truncate_layer(height); + } else { + self.node_manager.truncate_nodes(height, kept_nodes + 1); + } + } + self.recompute_after_append_leaves(leaves); + self.node_manager.commit(); Ok(()) } @@ -611,6 +614,7 @@ impl> AppendMerkleTree { } pub fn reset(&mut self) { + self.node_manager.start_transaction(); for height in (0..self.height()).rev() { self.node_manager.truncate_layer(height); } @@ -621,6 +625,7 @@ impl> AppendMerkleTree { } else { self.node_manager.add_layer(); } + self.node_manager.commit(); } fn clear_after(&mut self, tx_seq: u64) { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index a583467..50c30f1 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -13,13 +13,19 @@ pub struct NodeManager { } impl NodeManager { - pub fn new(db: Arc>, capacity: usize) -> Self { - Self { + pub fn new(db: Arc>, capacity: usize) -> Result { + let mut layer = 0; + let mut layer_size = Vec::new(); + while let Some(size) = db.get_layer_size(layer)? { + layer_size.push(size); + layer += 1; + } + Ok(Self { cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")), - layer_size: vec![], + layer_size, db, db_tx: None, - } + }) } pub fn new_dummy() -> Self { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index d81f500..a33ad45 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -629,12 +629,8 @@ impl LogManager { let tx_store = TransactionStore::new(db.clone())?; let flow_db = Arc::new(FlowDBStore::new(db.clone())); let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone())); - let mut initial_data = flow_store.get_chunk_root_list()?; - // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` - // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` - // and inserted later. - let mut extra_leaves = Vec::new(); - + // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle` + // first and call `put_tx` later. let next_tx_seq = tx_store.next_tx_seq(); let mut start_tx_seq = if next_tx_seq > 0 { Some(next_tx_seq - 1) @@ -642,13 +638,19 @@ impl LogManager { None }; let mut last_tx_to_insert = None; + + let mut pora_chunks_merkle = Merkle::new_with_subtrees( + flow_db, + config.flow.merkle_node_cache_capacity, + log2_pow2(PORA_CHUNK_SIZE), + )?; if let Some(last_tx_seq) = start_tx_seq { if !tx_store.check_tx_completed(last_tx_seq)? { // Last tx not finalized, we need to check if its `put_tx` is completed. let last_tx = tx_store .get_tx_by_seq_number(last_tx_seq)? .expect("tx missing"); - let mut current_len = initial_data.leaves(); + let current_len = pora_chunks_merkle.leaves(); let expected_len = sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64); match expected_len.cmp(&(current_len)) { @@ -678,42 +680,15 @@ impl LogManager { previous_tx.start_entry_index + previous_tx.num_entries() as u64, ); if current_len > expected_len { - while let Some((subtree_depth, _)) = initial_data.subtree_list.pop() - { - current_len -= 1 << (subtree_depth - 1); - if current_len == expected_len { - break; - } - } - } else { - warn!( - "revert last tx with no-op: {} {}", - current_len, expected_len - ); + pora_chunks_merkle.revert_to_leaves(expected_len)?; } - assert_eq!(current_len, expected_len); - while let Some((index, h)) = initial_data.known_leaves.pop() { - if index < current_len { - initial_data.known_leaves.push((index, h)); - break; - } else { - extra_leaves.push((index, h)); - } - } - start_tx_seq = Some(last_tx_seq - 1); + start_tx_seq = Some(previous_tx.seq); }; } } } } - let mut pora_chunks_merkle = Merkle::new_with_subtrees( - flow_db, - config.flow.merkle_node_cache_capacity, - initial_data, - log2_pow2(PORA_CHUNK_SIZE), - start_tx_seq, - )?; let last_chunk_merkle = match start_tx_seq { Some(tx_seq) => { tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)? @@ -751,18 +726,7 @@ impl LogManager { log_manager.start_receiver(receiver, executor); if let Some(tx) = last_tx_to_insert { - log_manager.revert_to(tx.seq - 1)?; log_manager.put_tx(tx)?; - let mut merkle = log_manager.merkle.write(); - for (index, h) in extra_leaves { - if index < merkle.pora_chunks_merkle.leaves() { - merkle.pora_chunks_merkle.fill_leaf(index, h); - } else { - error!("out of range extra leaf: index={} hash={:?}", index, h); - } - } - } else { - assert!(extra_leaves.is_empty()); } log_manager .merkle From 1c4257f6925bacdaa92d6323bf60cbaea19a08bf Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 15 Oct 2024 00:59:25 +0800 Subject: [PATCH 09/11] Fix. --- common/append_merkle/src/lib.rs | 1 + common/append_merkle/src/node_manager.rs | 28 ++++++++++----- node/storage/src/log_store/flow_store.rs | 13 ++++--- tests/example_test.py | 21 +----------- tests/node_cache_test.py | 43 ++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 33 deletions(-) create mode 100755 tests/node_cache_test.py diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 12f8fcf..6623342 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -47,6 +47,7 @@ impl> AppendMerkleTree { leaf_height, _a: Default::default(), }; + merkle.node_manager.start_transaction(); merkle.node_manager.add_layer(); merkle.node_manager.append_nodes(0, &leaves); if merkle.leaves() == 0 { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 50c30f1..005787e 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -1,6 +1,7 @@ use crate::HashElement; use anyhow::Result; use lru::LruCache; +use std::any::Any; use std::num::NonZeroUsize; use std::sync::Arc; use tracing::error; @@ -39,19 +40,18 @@ impl NodeManager { pub fn push_node(&mut self, layer: usize, node: E) { self.add_node(layer, self.layer_size[layer], node); - self.layer_size[layer] += 1; + self.set_layer_size(layer, self.layer_size[layer] + 1); } pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) { - let pos = &mut self.layer_size[layer]; + let mut pos = self.layer_size[layer]; let mut saved_nodes = Vec::with_capacity(nodes.len()); for node in nodes { - self.cache.put((layer, *pos), node.clone()); - saved_nodes.push((layer, *pos, node)); - *pos += 1; + self.cache.put((layer, pos), node.clone()); + saved_nodes.push((layer, pos, node)); + pos += 1; } - let size = *pos; - self.db_tx().save_layer_size(layer, size); + self.set_layer_size(layer, pos); self.db_tx().save_node_list(&saved_nodes); } @@ -103,8 +103,7 @@ impl NodeManager { removed_nodes.push((layer, pos)); } self.db_tx().remove_node_list(&removed_nodes); - self.layer_size[layer] = pos_end; - self.db_tx().save_layer_size(layer, pos_end); + self.set_layer_size(layer, pos_end); } pub fn truncate_layer(&mut self, layer: usize) { @@ -138,6 +137,11 @@ impl NodeManager { fn db_tx(&mut self) -> &mut dyn NodeTransaction { (*self.db_tx.as_mut().expect("tx checked")).as_mut() } + + fn set_layer_size(&mut self, layer: usize, size: usize) { + self.layer_size[layer] = size; + self.db_tx().save_layer_size(layer, size); + } } pub struct NodeIterator<'a, E: HashElement> { @@ -175,6 +179,8 @@ pub trait NodeTransaction: Send + Sync { fn remove_node_list(&mut self, nodes: &[(usize, usize)]); fn save_layer_size(&mut self, layer: usize, size: usize); fn remove_layer_size(&mut self, layer: usize); + + fn into_any(self: Box) -> Box; } /// A dummy database structure for in-memory merkle tree that will not read/write db. @@ -205,4 +211,8 @@ impl NodeTransaction for EmptyNodeTransaction { fn save_layer_size(&mut self, _layer: usize, _size: usize) {} fn remove_layer_size(&mut self, _layer: usize) {} + + fn into_any(self: Box) -> Box { + self + } } diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index dda1688..88b11e9 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -700,10 +700,11 @@ impl NodeDatabase for FlowDBStore { } fn commit(&self, tx: Box>) -> Result<()> { - let db_tx: &NodeDBTransaction = (&tx as &dyn Any) - .downcast_ref() - .ok_or(anyhow!("downcast failed"))?; - self.kvdb.write(db_tx.0.clone()).map_err(Into::into) + let db_tx: Box = tx + .into_any() + .downcast() + .map_err(|e| anyhow!("downcast failed, e={:?}", e))?; + self.kvdb.write(db_tx.0).map_err(Into::into) } } @@ -746,4 +747,8 @@ impl NodeTransaction for NodeDBTransaction { fn remove_layer_size(&mut self, layer: usize) { self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer)); } + + fn into_any(self: Box) -> Box { + self + } } diff --git a/tests/example_test.py b/tests/example_test.py index 9fd6bf4..19ff56c 100755 --- a/tests/example_test.py +++ b/tests/example_test.py @@ -6,15 +6,10 @@ from utility.utils import wait_until class ExampleTest(TestFramework): - def setup_params(self): - self.zgs_node_configs[0] = { - "merkle_node_cache_capacity": 1024, - } - def run_test(self): client = self.nodes[0] - chunk_data = b"\x02" * 256 * 1024 * 1024 * 3 + chunk_data = b"\x02" * 5253123 submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions) wait_until(lambda: self.contract.num_submissions() == 1) @@ -24,20 +19,6 @@ class ExampleTest(TestFramework): self.log.info("segment: %s", len(segment)) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) - self.stop_storage_node(0) - self.start_storage_node(0) - self.nodes[0].wait_for_rpc_connection() - - chunk_data = b"\x03" * 256 * (1024 * 765 + 5) - submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) - wait_until(lambda: self.contract.num_submissions() == 2) - wait_until(lambda: client.zgs_get_file_info(data_root) is not None) - - segment = submit_data(client, chunk_data) - self.log.info("segment: %s", len(segment)) - wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) - if __name__ == "__main__": ExampleTest().main() diff --git a/tests/node_cache_test.py b/tests/node_cache_test.py new file mode 100755 index 0000000..354cc01 --- /dev/null +++ b/tests/node_cache_test.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +from test_framework.test_framework import TestFramework +from utility.submission import create_submission, submit_data +from utility.utils import wait_until + + +class NodeCacheTest(TestFramework): + def setup_params(self): + self.zgs_node_configs[0] = { + "merkle_node_cache_capacity": 1024, + } + + def run_test(self): + client = self.nodes[0] + + chunk_data = b"\x02" * 256 * 1024 * 1024 * 3 + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + wait_until(lambda: self.contract.num_submissions() == 1) + wait_until(lambda: client.zgs_get_file_info(data_root) is not None) + + segment = submit_data(client, chunk_data) + self.log.info("segment: %s", len(segment)) + wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + + self.stop_storage_node(0) + self.start_storage_node(0) + self.nodes[0].wait_for_rpc_connection() + + chunk_data = b"\x03" * 256 * (1024 * 765 + 5) + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + wait_until(lambda: self.contract.num_submissions() == 2) + wait_until(lambda: client.zgs_get_file_info(data_root) is not None) + + segment = submit_data(client, chunk_data) + self.log.info("segment: %s", len(segment)) + wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + + +if __name__ == "__main__": + NodeCacheTest().main() From 703d926a235b489f64630e003e13cfff04f8e09c Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 15 Oct 2024 01:52:42 +0800 Subject: [PATCH 10/11] Fix test. --- common/append_merkle/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 9468394..d4292c1 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -555,7 +555,7 @@ impl> AppendMerkleTree { .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))? .clone(); // Dropping the upper layers that are not in the old merkle tree. - for height in (delta_nodes.right_most_nodes.len()..(self.height() - 1)).rev() { + for height in (delta_nodes.right_most_nodes.len()..self.height()).rev() { self.node_manager.truncate_layer(height); } for (height, (last_index, right_most_node)) in From 005e28266ae8b4f9ee38aaf7c0aba5fd45ad5a06 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 15 Oct 2024 12:42:17 +0800 Subject: [PATCH 11/11] fix. --- common/append_merkle/src/lib.rs | 4 +++ common/append_merkle/src/node_manager.rs | 3 ++- node/storage/src/log_store/log_manager.rs | 30 +++++++++++++++++------ node/storage/src/log_store/tx_store.rs | 3 +++ tests/crash_test.py | 0 5 files changed, 32 insertions(+), 8 deletions(-) mode change 100644 => 100755 tests/crash_test.py diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index d4292c1..86b550d 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -59,10 +59,12 @@ impl> AppendMerkleTree { }, ); } + merkle.node_manager.commit(); return merkle; } // Reconstruct the whole tree. merkle.recompute(0, 0, None); + merkle.node_manager.commit(); // Commit the first version in memory. // TODO(zz): Check when the roots become available. merkle.commit(start_tx_seq); @@ -116,6 +118,7 @@ impl> AppendMerkleTree { }, ); } + merkle.node_manager.commit(); merkle } else { let mut merkle = Self { @@ -134,6 +137,7 @@ impl> AppendMerkleTree { } // Reconstruct the whole tree. merkle.recompute(0, 0, None); + merkle.node_manager.commit(); // Commit the first version in memory. merkle.commit(start_tx_seq); merkle diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 005787e..3cdc54a 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -115,8 +115,9 @@ impl NodeManager { } pub fn start_transaction(&mut self) { - if self.db_tx.is_none() { + if self.db_tx.is_some() { error!("start new tx before commit"); + panic!("start new tx before commit"); } self.db_tx = Some(self.db.start_transaction()); } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index ee694da..e83ccb0 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -651,8 +651,12 @@ impl LogManager { .get_tx_by_seq_number(last_tx_seq)? .expect("tx missing"); let current_len = pora_chunks_merkle.leaves(); - let expected_len = - sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64); + let expected_len = sector_to_segment( + last_tx.start_entry_index + + last_tx.num_entries() as u64 + + PORA_CHUNK_SIZE as u64 + - 1, + ); match expected_len.cmp(&(current_len)) { Ordering::Less => { bail!( @@ -681,6 +685,8 @@ impl LogManager { ); if current_len > expected_len { pora_chunks_merkle.revert_to_leaves(expected_len)?; + } else { + assert_eq!(current_len, expected_len); } start_tx_seq = Some(previous_tx.seq); }; @@ -691,10 +697,20 @@ impl LogManager { let last_chunk_merkle = match start_tx_seq { Some(tx_seq) => { - tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)? + let tx = tx_store.get_tx_by_seq_number(tx_seq)?.expect("tx missing"); + if (tx.start_entry_index() + tx.num_entries() as u64) % PORA_CHUNK_SIZE as u64 == 0 + { + // The last chunk should be aligned, so it's empty. + Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None) + } else { + tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves() - 1, tx_seq)? + } } // Initialize - None => Merkle::new_with_depth(vec![], 1, None), + None => { + pora_chunks_merkle.reset(); + Merkle::new_with_depth(vec![], 1, None) + } }; debug!( @@ -704,10 +720,10 @@ impl LogManager { last_chunk_merkle.leaves(), ); if last_chunk_merkle.leaves() != 0 { - pora_chunks_merkle.append(last_chunk_merkle.root()); - // update the merkle root - pora_chunks_merkle.commit(start_tx_seq); + pora_chunks_merkle.update_last(last_chunk_merkle.root()); } + // update the merkle root + pora_chunks_merkle.commit(start_tx_seq); let merkle = RwLock::new(MerkleManager { pora_chunks_merkle, last_chunk_merkle, diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 39fb893..37d4b45 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -292,6 +292,9 @@ impl TransactionStore { match tx.start_entry_index.cmp(&last_chunk_start_index) { cmp::Ordering::Greater => { tx_list.push((tx_seq, tx.merkle_nodes)); + if tx.start_entry_index >= last_chunk_start_index + PORA_CHUNK_SIZE as u64 { + break; + } } cmp::Ordering::Equal => { tx_list.push((tx_seq, tx.merkle_nodes)); diff --git a/tests/crash_test.py b/tests/crash_test.py old mode 100644 new mode 100755