From 5e14db64224da5a81db97ac76a307a7d2caf44ab Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Wed, 9 Oct 2024 10:36:13 +0800 Subject: [PATCH] Use NodeManager. --- Cargo.lock | 1 + common/append_merkle/Cargo.toml | 3 +- common/append_merkle/src/lib.rs | 193 ++++++++++++++--------- common/append_merkle/src/merkle_tree.rs | 7 + common/append_merkle/src/node_manager.rs | 87 +++++++++- node/storage/src/log_store/flow_store.rs | 23 +++ 6 files changed, 230 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64ec1d3..ed02af5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -223,6 +223,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "ethereum-types 0.14.1", + "itertools 0.13.0", "lazy_static", "once_cell", "serde", diff --git a/common/append_merkle/Cargo.toml b/common/append_merkle/Cargo.toml index 21543b2..19f4750 100644 --- a/common/append_merkle/Cargo.toml +++ b/common/append_merkle/Cargo.toml @@ -12,4 +12,5 @@ eth2_ssz_derive = "0.3.0" serde = { version = "1.0.137", features = ["derive"] } lazy_static = "1.4.0" tracing = "0.1.36" -once_cell = "1.19.0" \ No newline at end of file +once_cell = "1.19.0" +itertools = "0.13.0" \ No newline at end of file diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index edae9f0..80e2f1a 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -4,6 +4,7 @@ mod proof; mod sha3; use anyhow::{anyhow, bail, Result}; +use itertools::Itertools; use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -11,6 +12,7 @@ use std::marker::PhantomData; use std::sync::Arc; use tracing::{trace, warn}; +use crate::merkle_tree::MerkleTreeWrite; pub use crate::merkle_tree::{ Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES, }; @@ -20,7 +22,6 @@ pub use sha3::Sha3Algorithm; pub struct AppendMerkleTree> { /// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves. - layers: Vec>, node_manager: NodeManager, /// Keep the delta nodes that can be used to construct a history tree. /// The key is the root node of that version. @@ -44,7 +45,6 @@ impl> AppendMerkleTree { start_tx_seq: Option, ) -> Self { let mut merkle = Self { - layers: vec![leaves], node_manager: NodeManager::new(node_db), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), @@ -52,6 +52,8 @@ impl> AppendMerkleTree { leaf_height, _a: Default::default(), }; + merkle.node_manager.add_layer(); + merkle.node_manager.append_nodes(0, &leaves); if merkle.leaves() == 0 { if let Some(seq) = start_tx_seq { merkle.delta_nodes_map.insert( @@ -78,7 +80,6 @@ impl> AppendMerkleTree { start_tx_seq: Option, ) -> Result { let mut merkle = Self { - layers: vec![vec![]], node_manager: NodeManager::new(node_db), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), @@ -86,6 +87,7 @@ impl> AppendMerkleTree { leaf_height, _a: Default::default(), }; + merkle.node_manager.add_layer(); if initial_data.subtree_list.is_empty() { if let Some(seq) = start_tx_seq { merkle.delta_nodes_map.insert( @@ -104,7 +106,7 @@ impl> AppendMerkleTree { } for (layer_index, position, h) in initial_data.extra_mpt_nodes { // TODO: Delete duplicate nodes from DB. - merkle.node_manager.add_node(layer_index, position, h); + merkle.update_node(layer_index, position, h); } Ok(merkle) } @@ -114,7 +116,6 @@ impl> AppendMerkleTree { if leaves.is_empty() { // Create an empty merkle tree with `depth`. let mut merkle = Self { - layers: vec![vec![]; depth], // dummy node manager for the last chunk. node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), delta_nodes_map: BTreeMap::new(), @@ -123,6 +124,9 @@ impl> AppendMerkleTree { leaf_height: 0, _a: Default::default(), }; + for _ in 0..depth { + merkle.node_manager.add_layer(); + } if let Some(seq) = start_tx_seq { merkle.delta_nodes_map.insert( seq, @@ -133,10 +137,7 @@ impl> AppendMerkleTree { } merkle } else { - let mut layers = vec![vec![]; depth]; - layers[0] = leaves; let mut merkle = Self { - layers, // dummy node manager for the last chunk. node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), delta_nodes_map: BTreeMap::new(), @@ -145,6 +146,11 @@ impl> AppendMerkleTree { leaf_height: 0, _a: Default::default(), }; + merkle.node_manager.add_layer(); + merkle.append_nodes(0, &leaves); + for _ in 1..depth { + merkle.node_manager.add_layer(); + } // Reconstruct the whole tree. merkle.recompute(0, 0, None); // Commit the first version in memory. @@ -158,17 +164,17 @@ impl> AppendMerkleTree { // appending null is not allowed. return; } - self.layers[0].push(new_leaf); + self.node_manager.push_node(0, new_leaf); self.recompute_after_append_leaves(self.leaves() - 1); } - pub fn append_list(&mut self, mut leaf_list: Vec) { + pub fn append_list(&mut self, leaf_list: Vec) { if leaf_list.contains(&E::null()) { // appending null is not allowed. return; } let start_index = self.leaves(); - self.layers[0].append(&mut leaf_list); + self.node_manager.append_nodes(0, &leaf_list); self.recompute_after_append_leaves(start_index); } @@ -208,11 +214,11 @@ impl> AppendMerkleTree { // updating to null is not allowed. return; } - if self.layers[0].is_empty() { + if self.layer_len(0) == 0 { // Special case for the first data. - self.layers[0].push(updated_leaf); + self.push_node(0, updated_leaf); } else { - *self.layers[0].last_mut().unwrap() = updated_leaf; + self.update_node(0, self.layer_len(0) - 1, updated_leaf); } self.recompute_after_append_leaves(self.leaves() - 1); } @@ -223,13 +229,15 @@ impl> AppendMerkleTree { pub fn fill_leaf(&mut self, index: usize, leaf: E) { if leaf == E::null() { // fill leaf with null is not allowed. - } else if self.layers[0][index] == E::null() { - self.layers[0][index] = leaf; + } else if self.node(0, index) == E::null() { + self.update_node(0, index, leaf); self.recompute_after_fill_leaves(index, index + 1); - } else if self.layers[0][index] != leaf { + } else if self.node(0, index) != leaf { panic!( "Fill with invalid leaf, index={} was={:?} get={:?}", - index, self.layers[0][index], leaf + index, + self.node(0, index), + leaf ); } } @@ -296,28 +304,27 @@ impl> AppendMerkleTree { let mut updated_nodes = Vec::new(); // A valid proof should not fail the following checks. for (i, (position, data)) in position_and_data.into_iter().enumerate() { - let layer = &mut self.layers[i]; - if position > layer.len() { + if position > self.layer_len(i) { bail!( "proof position out of range, position={} layer.len()={}", position, - layer.len() + self.layer_len(i) ); } - if position == layer.len() { + if position == self.layer_len(i) { // skip padding node. continue; } - if layer[position] == E::null() { - layer[position] = data.clone(); + if self.node(i, position) == E::null() { + self.update_node(i, position, data.clone()); updated_nodes.push((i, position, data)) - } else if layer[position] != data { + } else if self.node(i, position) != data { // The last node in each layer may have changed in the tree. trace!( "conflict data layer={} position={} tree_data={:?} proof_data={:?}", i, position, - layer[position], + self.node(i, position), data ); } @@ -333,8 +340,8 @@ impl> AppendMerkleTree { if position >= self.leaves() { bail!("Out of bound: position={} end={}", position, self.leaves()); } - if self.layers[0][position] != E::null() { - Ok(Some(self.layers[0][position].clone())) + if self.node(0, position) != E::null() { + Ok(Some(self.node(0, position))) } else { // The leaf hash is unknown. Ok(None) @@ -382,8 +389,9 @@ impl> AppendMerkleTree { return; } let mut right_most_nodes = Vec::new(); - for layer in &self.layers { - right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone())); + for height in 0..self.height() { + let pos = self.layer_len(height) - 1; + right_most_nodes.push((pos, self.node(height, pos))); } let root = self.root(); self.delta_nodes_map @@ -393,8 +401,8 @@ impl> AppendMerkleTree { } fn before_extend_layer(&mut self, height: usize) { - if height == self.layers.len() { - self.layers.push(Vec::new()); + if height == self.height() { + self.node_manager.add_layer() } } @@ -411,7 +419,6 @@ impl> AppendMerkleTree { } /// Given a range of changed leaf nodes and recompute the tree. - /// Since this tree is append-only, we always compute to the end. fn recompute( &mut self, mut start_index: usize, @@ -421,42 +428,51 @@ impl> AppendMerkleTree { start_index >>= height; maybe_end_index = maybe_end_index.map(|end| end >> height); // Loop until we compute the new root and reach `tree_depth`. - while self.layers[height].len() > 1 || height < self.layers.len() - 1 { + while self.layer_len(height) > 1 || height < self.height() - 1 { let next_layer_start_index = start_index >> 1; if start_index % 2 == 1 { start_index -= 1; } - let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len()); - if end_index % 2 == 1 && end_index != self.layers[height].len() { + let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height)); + if end_index % 2 == 1 && end_index != self.layer_len(height) { end_index += 1; } let mut i = 0; - let mut iter = self.layers[height][start_index..end_index].chunks_exact(2); + let iter = self + .node_manager + .get_nodes(height, start_index, end_index) + .chunks(2); // We cannot modify the parent layer while iterating the child layer, // so just keep the changes and update them later. let mut parent_update = Vec::new(); - while let Some([left, right]) = iter.next() { - // If either left or right is null (unknown), we cannot compute the parent hash. - // Note that if we are recompute a range of an existing tree, - // we do not need to keep these possibly null parent. This is only saved - // for the case of constructing a new tree from the leaves. - let parent = if *left == E::null() || *right == E::null() { - E::null() + for chunk_iter in &iter { + let chunk: Vec<_> = chunk_iter.collect(); + if chunk.len() == 2 { + let left = &chunk[0]; + let right = &chunk[1]; + // If either left or right is null (unknown), we cannot compute the parent hash. + // Note that if we are recompute a range of an existing tree, + // we do not need to keep these possibly null parent. This is only saved + // for the case of constructing a new tree from the leaves. + let parent = if *left == E::null() || *right == E::null() { + E::null() + } else { + A::parent(left, right) + }; + parent_update.push((next_layer_start_index + i, parent)); + i += 1; } else { - A::parent(left, right) - }; - parent_update.push((next_layer_start_index + i, parent)); - i += 1; - } - if let [r] = iter.remainder() { - // Same as above. - let parent = if *r == E::null() { - E::null() - } else { - A::parent_single(r, height + self.leaf_height) - }; - parent_update.push((next_layer_start_index + i, parent)); + assert_eq!(chunk.len(), 1); + let r = &chunk[0]; + // Same as above. + let parent = if *r == E::null() { + E::null() + } else { + A::parent_single(r, height + self.leaf_height) + }; + parent_update.push((next_layer_start_index + i, parent)); + } } if !parent_update.is_empty() { self.before_extend_layer(height + 1); @@ -465,27 +481,27 @@ impl> AppendMerkleTree { // we can just overwrite `last_changed_parent_index` with new values. let mut last_changed_parent_index = None; for (parent_index, parent) in parent_update { - match parent_index.cmp(&self.layers[height + 1].len()) { + match parent_index.cmp(&self.layer_len(height + 1)) { Ordering::Less => { // We do not overwrite with null. if parent != E::null() { - if self.layers[height + 1][parent_index] == E::null() + if self.node(height + 1, parent_index) == E::null() // The last node in a layer can be updated. - || (self.layers[height + 1][parent_index] != parent - && parent_index == self.layers[height + 1].len() - 1) + || (self.node(height + 1, parent_index) != parent + && parent_index == self.layer_len(height + 1) - 1) { - self.layers[height + 1][parent_index] = parent; + self.update_node(height + 1, parent_index, parent); last_changed_parent_index = Some(parent_index); - } else if self.layers[height + 1][parent_index] != parent { + } else if self.node(height + 1, parent_index) != parent { // Recompute changes a node in the middle. This should be impossible // if the inputs are valid. panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}", - height + 1, parent_index, self.layers[height + 1][parent_index], parent); + height + 1, parent_index, self.node(height + 1, parent_index), parent); } } } Ordering::Equal => { - self.layers[height + 1].push(parent); + self.push_node(height + 1, parent); last_changed_parent_index = Some(parent_index); } Ordering::Greater => { @@ -516,10 +532,10 @@ impl> AppendMerkleTree { for height in 0..(subtree_depth - 1) { self.before_extend_layer(height); let subtree_layer_size = 1 << (subtree_depth - 1 - height); - self.layers[height].append(&mut vec![E::null(); subtree_layer_size]); + self.append_nodes(height, &vec![E::null(); subtree_layer_size]); } self.before_extend_layer(subtree_depth - 1); - self.layers[subtree_depth - 1].push(subtree_root); + self.push_node(subtree_depth - 1, subtree_root); Ok(()) } @@ -530,21 +546,24 @@ impl> AppendMerkleTree { } pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> { - if self.layers[0].is_empty() { + if self.layer_len(0) == 0 { // Any previous state of an empty tree is always empty. return Ok(()); } let delta_nodes = self .delta_nodes_map .get(&tx_seq) - .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?; + .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))? + .clone(); // Dropping the upper layers that are not in the old merkle tree. - self.layers.truncate(delta_nodes.right_most_nodes.len()); + for height in (self.height() - 1)..=delta_nodes.right_most_nodes.len() { + self.node_manager.truncate_layer(height); + } for (height, (last_index, right_most_node)) in delta_nodes.right_most_nodes.iter().enumerate() { - self.layers[height].truncate(*last_index + 1); - self.layers[height][*last_index] = right_most_node.clone(); + self.node_manager.truncate_nodes(height, *last_index + 1); + self.update_node(height, *last_index, right_most_node.clone()) } self.clear_after(tx_seq); Ok(()) @@ -573,10 +592,14 @@ impl> AppendMerkleTree { } pub fn reset(&mut self) { - self.layers = match self.min_depth { - None => vec![vec![]], - Some(depth) => vec![vec![]; depth], - }; + for height in (self.height() - 1)..=0 { + self.node_manager.truncate_layer(height); + } + if let Some(depth) = self.min_depth { + for _ in 0..depth { + self.node_manager.add_layer(); + } + } } fn clear_after(&mut self, tx_seq: u64) { @@ -596,7 +619,7 @@ impl> AppendMerkleTree { fn first_known_root_at(&self, index: usize) -> (usize, E) { let mut height = 0; let mut index_in_layer = index; - while height < self.node_manager.num_layers() { + while height < self.height() { let node = self.node(height, index_in_layer); if !node.is_null() { return (height + 1, node); @@ -699,6 +722,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> { } } +impl> MerkleTreeWrite for AppendMerkleTree { + type E = E; + + fn push_node(&mut self, layer: usize, node: Self::E) { + self.node_manager.push_node(layer, node); + } + + fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) { + self.node_manager.append_nodes(layer, nodes); + } + + fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) { + self.node_manager.add_node(layer, pos, node); + } +} + #[macro_export] macro_rules! ensure_eq { ($given:expr, $expected:expr) => { diff --git a/common/append_merkle/src/merkle_tree.rs b/common/append_merkle/src/merkle_tree.rs index 8e26286..eac083f 100644 --- a/common/append_merkle/src/merkle_tree.rs +++ b/common/append_merkle/src/merkle_tree.rs @@ -130,6 +130,13 @@ pub trait MerkleTreeRead { } } +pub trait MerkleTreeWrite { + type E: HashElement; + fn push_node(&mut self, layer: usize, node: Self::E); + fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]); + fn update_node(&mut self, layer: usize, pos: usize, node: Self::E); +} + /// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes /// are `null`. Other intermediate nodes will be computed based on these known nodes. pub struct MerkleTreeInitialData { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 05615d8..8fa87ed 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -1,11 +1,11 @@ use crate::HashElement; use anyhow::Result; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::sync::Arc; use tracing::error; pub struct NodeManager { - cache: HashMap<(usize, usize), E>, + cache: BTreeMap<(usize, usize), E>, layer_size: Vec, db: Arc>, } @@ -13,12 +13,30 @@ pub struct NodeManager { impl NodeManager { pub fn new(db: Arc>) -> Self { Self { - cache: HashMap::new(), + cache: BTreeMap::new(), layer_size: vec![], db, } } + pub fn push_node(&mut self, layer: usize, node: E) { + self.add_node(layer, self.layer_size[layer], node); + self.layer_size[layer] += 1; + } + + pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) { + let pos = &mut self.layer_size[layer]; + let mut saved_nodes = Vec::with_capacity(nodes.len()); + for node in nodes { + self.cache.insert((layer, *pos), node.clone()); + saved_nodes.push((layer, *pos, node)); + *pos += 1; + } + if let Err(e) = self.db.save_node_list(&saved_nodes) { + error!("Failed to save node list: {:?}", e); + } + } + pub fn get_node(&self, layer: usize, pos: usize) -> Option { match self.cache.get(&(layer, pos)) { Some(node) => Some(node.clone()), @@ -29,14 +47,20 @@ impl NodeManager { } } + pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator { + NodeIterator { + node_manager: &self, + layer, + start_pos, + end_pos, + } + } + pub fn add_node(&mut self, layer: usize, pos: usize, node: E) { if let Err(e) = self.db.save_node(layer, pos, &node) { error!("Failed to save node: {}", e); } self.cache.insert((layer, pos), node); - if pos + 1 > self.layer_size[layer] { - self.layer_size[layer] = pos + 1; - } } pub fn add_layer(&mut self) { @@ -50,11 +74,54 @@ impl NodeManager { pub fn num_layers(&self) -> usize { self.layer_size.len() } + + pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) { + let mut removed_nodes = Vec::new(); + for pos in pos_end..self.layer_size[layer] { + self.cache.remove(&(layer, pos)); + removed_nodes.push((layer, pos)); + } + if let Err(e) = self.db.remove_node_list(&removed_nodes) { + error!("Failed to remove node list: {:?}", e); + } + self.layer_size[layer] = pos_end; + } + + pub fn truncate_layer(&mut self, layer: usize) { + self.truncate_nodes(layer, 0); + if layer == self.num_layers() - 1 { + self.layer_size.pop(); + } + } +} + +pub struct NodeIterator<'a, E: HashElement> { + node_manager: &'a NodeManager, + layer: usize, + start_pos: usize, + end_pos: usize, +} + +impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> { + type Item = E; + + fn next(&mut self) -> Option { + if self.start_pos < self.end_pos { + let r = self.node_manager.get_node(self.layer, self.start_pos); + self.start_pos += 1; + r + } else { + None + } + } } pub trait NodeDatabase: Send + Sync { fn get_node(&self, layer: usize, pos: usize) -> Result>; fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>; + /// `nodes` are a list of tuples `(layer, pos, node)`. + fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>; + fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>; } /// A dummy database structure for in-memory merkle tree that will not read/write db. @@ -67,4 +134,12 @@ impl NodeDatabase for EmptyNodeDatabase { fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> { Ok(()) } + + fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> { + Ok(()) + } + + fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> { + Ok(()) + } } diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 06d77f9..204f595 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -690,4 +690,27 @@ impl NodeDatabase for FlowDBStore { ); Ok(self.kvdb.write(tx)?) } + + fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> { + let mut tx = self.kvdb.transaction(); + for (layer_index, position, data) in nodes { + tx.put( + COL_FLOW_MPT_NODES, + &encode_mpt_node_key(*layer_index, *position), + data.as_bytes(), + ); + } + Ok(self.kvdb.write(tx)?) + } + + fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> { + let mut tx = self.kvdb.transaction(); + for (layer_index, position) in nodes { + tx.delete( + COL_FLOW_MPT_NODES, + &encode_mpt_node_key(*layer_index, *position), + ); + } + Ok(self.kvdb.write(tx)?) + } }