Use NodeManager.

This commit is contained in:
Peilun Li 2024-10-09 10:36:13 +08:00
parent 7589bdf4bb
commit 5e14db6422
6 changed files with 230 additions and 84 deletions

1
Cargo.lock generated
View File

@ -223,6 +223,7 @@ dependencies = [
"eth2_ssz",
"eth2_ssz_derive",
"ethereum-types 0.14.1",
"itertools 0.13.0",
"lazy_static",
"once_cell",
"serde",

View File

@ -13,3 +13,4 @@ serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
tracing = "0.1.36"
once_cell = "1.19.0"
itertools = "0.13.0"

View File

@ -4,6 +4,7 @@ mod proof;
mod sha3;
use anyhow::{anyhow, bail, Result};
use itertools::Itertools;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
@ -11,6 +12,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use tracing::{trace, warn};
use crate::merkle_tree::MerkleTreeWrite;
pub use crate::merkle_tree::{
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
};
@ -20,7 +22,6 @@ pub use sha3::Sha3Algorithm;
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
/// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
layers: Vec<Vec<E>>,
node_manager: NodeManager<E>,
/// Keep the delta nodes that can be used to construct a history tree.
/// The key is the root node of that version.
@ -44,7 +45,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
start_tx_seq: Option<u64>,
) -> Self {
let mut merkle = Self {
layers: vec![leaves],
node_manager: NodeManager::new(node_db),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
@ -52,6 +52,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
leaf_height,
_a: Default::default(),
};
merkle.node_manager.add_layer();
merkle.node_manager.append_nodes(0, &leaves);
if merkle.leaves() == 0 {
if let Some(seq) = start_tx_seq {
merkle.delta_nodes_map.insert(
@ -78,7 +80,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
start_tx_seq: Option<u64>,
) -> Result<Self> {
let mut merkle = Self {
layers: vec![vec![]],
node_manager: NodeManager::new(node_db),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
@ -86,6 +87,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
leaf_height,
_a: Default::default(),
};
merkle.node_manager.add_layer();
if initial_data.subtree_list.is_empty() {
if let Some(seq) = start_tx_seq {
merkle.delta_nodes_map.insert(
@ -104,7 +106,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
// TODO: Delete duplicate nodes from DB.
merkle.node_manager.add_node(layer_index, position, h);
merkle.update_node(layer_index, position, h);
}
Ok(merkle)
}
@ -114,7 +116,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
if leaves.is_empty() {
// Create an empty merkle tree with `depth`.
let mut merkle = Self {
layers: vec![vec![]; depth],
// dummy node manager for the last chunk.
node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})),
delta_nodes_map: BTreeMap::new(),
@ -123,6 +124,9 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
leaf_height: 0,
_a: Default::default(),
};
for _ in 0..depth {
merkle.node_manager.add_layer();
}
if let Some(seq) = start_tx_seq {
merkle.delta_nodes_map.insert(
seq,
@ -133,10 +137,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
merkle
} else {
let mut layers = vec![vec![]; depth];
layers[0] = leaves;
let mut merkle = Self {
layers,
// dummy node manager for the last chunk.
node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})),
delta_nodes_map: BTreeMap::new(),
@ -145,6 +146,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
leaf_height: 0,
_a: Default::default(),
};
merkle.node_manager.add_layer();
merkle.append_nodes(0, &leaves);
for _ in 1..depth {
merkle.node_manager.add_layer();
}
// Reconstruct the whole tree.
merkle.recompute(0, 0, None);
// Commit the first version in memory.
@ -158,17 +164,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// appending null is not allowed.
return;
}
self.layers[0].push(new_leaf);
self.node_manager.push_node(0, new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1);
}
pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
pub fn append_list(&mut self, leaf_list: Vec<E>) {
if leaf_list.contains(&E::null()) {
// appending null is not allowed.
return;
}
let start_index = self.leaves();
self.layers[0].append(&mut leaf_list);
self.node_manager.append_nodes(0, &leaf_list);
self.recompute_after_append_leaves(start_index);
}
@ -208,11 +214,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// updating to null is not allowed.
return;
}
if self.layers[0].is_empty() {
if self.layer_len(0) == 0 {
// Special case for the first data.
self.layers[0].push(updated_leaf);
self.push_node(0, updated_leaf);
} else {
*self.layers[0].last_mut().unwrap() = updated_leaf;
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
}
self.recompute_after_append_leaves(self.leaves() - 1);
}
@ -223,13 +229,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
if leaf == E::null() {
// fill leaf with null is not allowed.
} else if self.layers[0][index] == E::null() {
self.layers[0][index] = leaf;
} else if self.node(0, index) == E::null() {
self.update_node(0, index, leaf);
self.recompute_after_fill_leaves(index, index + 1);
} else if self.layers[0][index] != leaf {
} else if self.node(0, index) != leaf {
panic!(
"Fill with invalid leaf, index={} was={:?} get={:?}",
index, self.layers[0][index], leaf
index,
self.node(0, index),
leaf
);
}
}
@ -296,28 +304,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
let mut updated_nodes = Vec::new();
// A valid proof should not fail the following checks.
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
let layer = &mut self.layers[i];
if position > layer.len() {
if position > self.layer_len(i) {
bail!(
"proof position out of range, position={} layer.len()={}",
position,
layer.len()
self.layer_len(i)
);
}
if position == layer.len() {
if position == self.layer_len(i) {
// skip padding node.
continue;
}
if layer[position] == E::null() {
layer[position] = data.clone();
if self.node(i, position) == E::null() {
self.update_node(i, position, data.clone());
updated_nodes.push((i, position, data))
} else if layer[position] != data {
} else if self.node(i, position) != data {
// The last node in each layer may have changed in the tree.
trace!(
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
i,
position,
layer[position],
self.node(i, position),
data
);
}
@ -333,8 +340,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
if position >= self.leaves() {
bail!("Out of bound: position={} end={}", position, self.leaves());
}
if self.layers[0][position] != E::null() {
Ok(Some(self.layers[0][position].clone()))
if self.node(0, position) != E::null() {
Ok(Some(self.node(0, position)))
} else {
// The leaf hash is unknown.
Ok(None)
@ -382,8 +389,9 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
return;
}
let mut right_most_nodes = Vec::new();
for layer in &self.layers {
right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone()));
for height in 0..self.height() {
let pos = self.layer_len(height) - 1;
right_most_nodes.push((pos, self.node(height, pos)));
}
let root = self.root();
self.delta_nodes_map
@ -393,8 +401,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
fn before_extend_layer(&mut self, height: usize) {
if height == self.layers.len() {
self.layers.push(Vec::new());
if height == self.height() {
self.node_manager.add_layer()
}
}
@ -411,7 +419,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
/// Given a range of changed leaf nodes and recompute the tree.
/// Since this tree is append-only, we always compute to the end.
fn recompute(
&mut self,
mut start_index: usize,
@ -421,42 +428,51 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
start_index >>= height;
maybe_end_index = maybe_end_index.map(|end| end >> height);
// Loop until we compute the new root and reach `tree_depth`.
while self.layers[height].len() > 1 || height < self.layers.len() - 1 {
while self.layer_len(height) > 1 || height < self.height() - 1 {
let next_layer_start_index = start_index >> 1;
if start_index % 2 == 1 {
start_index -= 1;
}
let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len());
if end_index % 2 == 1 && end_index != self.layers[height].len() {
let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height));
if end_index % 2 == 1 && end_index != self.layer_len(height) {
end_index += 1;
}
let mut i = 0;
let mut iter = self.layers[height][start_index..end_index].chunks_exact(2);
let iter = self
.node_manager
.get_nodes(height, start_index, end_index)
.chunks(2);
// We cannot modify the parent layer while iterating the child layer,
// so just keep the changes and update them later.
let mut parent_update = Vec::new();
while let Some([left, right]) = iter.next() {
// If either left or right is null (unknown), we cannot compute the parent hash.
// Note that if we are recompute a range of an existing tree,
// we do not need to keep these possibly null parent. This is only saved
// for the case of constructing a new tree from the leaves.
let parent = if *left == E::null() || *right == E::null() {
E::null()
for chunk_iter in &iter {
let chunk: Vec<_> = chunk_iter.collect();
if chunk.len() == 2 {
let left = &chunk[0];
let right = &chunk[1];
// If either left or right is null (unknown), we cannot compute the parent hash.
// Note that if we are recompute a range of an existing tree,
// we do not need to keep these possibly null parent. This is only saved
// for the case of constructing a new tree from the leaves.
let parent = if *left == E::null() || *right == E::null() {
E::null()
} else {
A::parent(left, right)
};
parent_update.push((next_layer_start_index + i, parent));
i += 1;
} else {
A::parent(left, right)
};
parent_update.push((next_layer_start_index + i, parent));
i += 1;
}
if let [r] = iter.remainder() {
// Same as above.
let parent = if *r == E::null() {
E::null()
} else {
A::parent_single(r, height + self.leaf_height)
};
parent_update.push((next_layer_start_index + i, parent));
assert_eq!(chunk.len(), 1);
let r = &chunk[0];
// Same as above.
let parent = if *r == E::null() {
E::null()
} else {
A::parent_single(r, height + self.leaf_height)
};
parent_update.push((next_layer_start_index + i, parent));
}
}
if !parent_update.is_empty() {
self.before_extend_layer(height + 1);
@ -465,27 +481,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// we can just overwrite `last_changed_parent_index` with new values.
let mut last_changed_parent_index = None;
for (parent_index, parent) in parent_update {
match parent_index.cmp(&self.layers[height + 1].len()) {
match parent_index.cmp(&self.layer_len(height + 1)) {
Ordering::Less => {
// We do not overwrite with null.
if parent != E::null() {
if self.layers[height + 1][parent_index] == E::null()
if self.node(height + 1, parent_index) == E::null()
// The last node in a layer can be updated.
|| (self.layers[height + 1][parent_index] != parent
&& parent_index == self.layers[height + 1].len() - 1)
|| (self.node(height + 1, parent_index) != parent
&& parent_index == self.layer_len(height + 1) - 1)
{
self.layers[height + 1][parent_index] = parent;
self.update_node(height + 1, parent_index, parent);
last_changed_parent_index = Some(parent_index);
} else if self.layers[height + 1][parent_index] != parent {
} else if self.node(height + 1, parent_index) != parent {
// Recompute changes a node in the middle. This should be impossible
// if the inputs are valid.
panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}",
height + 1, parent_index, self.layers[height + 1][parent_index], parent);
height + 1, parent_index, self.node(height + 1, parent_index), parent);
}
}
}
Ordering::Equal => {
self.layers[height + 1].push(parent);
self.push_node(height + 1, parent);
last_changed_parent_index = Some(parent_index);
}
Ordering::Greater => {
@ -516,10 +532,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
for height in 0..(subtree_depth - 1) {
self.before_extend_layer(height);
let subtree_layer_size = 1 << (subtree_depth - 1 - height);
self.layers[height].append(&mut vec![E::null(); subtree_layer_size]);
self.append_nodes(height, &vec![E::null(); subtree_layer_size]);
}
self.before_extend_layer(subtree_depth - 1);
self.layers[subtree_depth - 1].push(subtree_root);
self.push_node(subtree_depth - 1, subtree_root);
Ok(())
}
@ -530,21 +546,24 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> {
if self.layers[0].is_empty() {
if self.layer_len(0) == 0 {
// Any previous state of an empty tree is always empty.
return Ok(());
}
let delta_nodes = self
.delta_nodes_map
.get(&tx_seq)
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?;
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
.clone();
// Dropping the upper layers that are not in the old merkle tree.
self.layers.truncate(delta_nodes.right_most_nodes.len());
for height in (self.height() - 1)..=delta_nodes.right_most_nodes.len() {
self.node_manager.truncate_layer(height);
}
for (height, (last_index, right_most_node)) in
delta_nodes.right_most_nodes.iter().enumerate()
{
self.layers[height].truncate(*last_index + 1);
self.layers[height][*last_index] = right_most_node.clone();
self.node_manager.truncate_nodes(height, *last_index + 1);
self.update_node(height, *last_index, right_most_node.clone())
}
self.clear_after(tx_seq);
Ok(())
@ -573,10 +592,14 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn reset(&mut self) {
self.layers = match self.min_depth {
None => vec![vec![]],
Some(depth) => vec![vec![]; depth],
};
for height in (self.height() - 1)..=0 {
self.node_manager.truncate_layer(height);
}
if let Some(depth) = self.min_depth {
for _ in 0..depth {
self.node_manager.add_layer();
}
}
}
fn clear_after(&mut self, tx_seq: u64) {
@ -596,7 +619,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
fn first_known_root_at(&self, index: usize) -> (usize, E) {
let mut height = 0;
let mut index_in_layer = index;
while height < self.node_manager.num_layers() {
while height < self.height() {
let node = self.node(height, index_in_layer);
if !node.is_null() {
return (height + 1, node);
@ -699,6 +722,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
}
}
impl<E: HashElement, A: Algorithm<E>> MerkleTreeWrite for AppendMerkleTree<E, A> {
type E = E;
fn push_node(&mut self, layer: usize, node: Self::E) {
self.node_manager.push_node(layer, node);
}
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) {
self.node_manager.append_nodes(layer, nodes);
}
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) {
self.node_manager.add_node(layer, pos, node);
}
}
#[macro_export]
macro_rules! ensure_eq {
($given:expr, $expected:expr) => {

View File

@ -130,6 +130,13 @@ pub trait MerkleTreeRead {
}
}
pub trait MerkleTreeWrite {
type E: HashElement;
fn push_node(&mut self, layer: usize, node: Self::E);
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]);
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E);
}
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes
/// are `null`. Other intermediate nodes will be computed based on these known nodes.
pub struct MerkleTreeInitialData<E: HashElement> {

View File

@ -1,11 +1,11 @@
use crate::HashElement;
use anyhow::Result;
use std::collections::HashMap;
use std::collections::BTreeMap;
use std::sync::Arc;
use tracing::error;
pub struct NodeManager<E: HashElement> {
cache: HashMap<(usize, usize), E>,
cache: BTreeMap<(usize, usize), E>,
layer_size: Vec<usize>,
db: Arc<dyn NodeDatabase<E>>,
}
@ -13,12 +13,30 @@ pub struct NodeManager<E: HashElement> {
impl<E: HashElement> NodeManager<E> {
pub fn new(db: Arc<dyn NodeDatabase<E>>) -> Self {
Self {
cache: HashMap::new(),
cache: BTreeMap::new(),
layer_size: vec![],
db,
}
}
pub fn push_node(&mut self, layer: usize, node: E) {
self.add_node(layer, self.layer_size[layer], node);
self.layer_size[layer] += 1;
}
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
let pos = &mut self.layer_size[layer];
let mut saved_nodes = Vec::with_capacity(nodes.len());
for node in nodes {
self.cache.insert((layer, *pos), node.clone());
saved_nodes.push((layer, *pos, node));
*pos += 1;
}
if let Err(e) = self.db.save_node_list(&saved_nodes) {
error!("Failed to save node list: {:?}", e);
}
}
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
match self.cache.get(&(layer, pos)) {
Some(node) => Some(node.clone()),
@ -29,14 +47,20 @@ impl<E: HashElement> NodeManager<E> {
}
}
pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator<E> {
NodeIterator {
node_manager: &self,
layer,
start_pos,
end_pos,
}
}
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
if let Err(e) = self.db.save_node(layer, pos, &node) {
error!("Failed to save node: {}", e);
}
self.cache.insert((layer, pos), node);
if pos + 1 > self.layer_size[layer] {
self.layer_size[layer] = pos + 1;
}
}
pub fn add_layer(&mut self) {
@ -50,11 +74,54 @@ impl<E: HashElement> NodeManager<E> {
pub fn num_layers(&self) -> usize {
self.layer_size.len()
}
pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) {
let mut removed_nodes = Vec::new();
for pos in pos_end..self.layer_size[layer] {
self.cache.remove(&(layer, pos));
removed_nodes.push((layer, pos));
}
if let Err(e) = self.db.remove_node_list(&removed_nodes) {
error!("Failed to remove node list: {:?}", e);
}
self.layer_size[layer] = pos_end;
}
pub fn truncate_layer(&mut self, layer: usize) {
self.truncate_nodes(layer, 0);
if layer == self.num_layers() - 1 {
self.layer_size.pop();
}
}
}
pub struct NodeIterator<'a, E: HashElement> {
node_manager: &'a NodeManager<E>,
layer: usize,
start_pos: usize,
end_pos: usize,
}
impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
type Item = E;
fn next(&mut self) -> Option<Self::Item> {
if self.start_pos < self.end_pos {
let r = self.node_manager.get_node(self.layer, self.start_pos);
self.start_pos += 1;
r
} else {
None
}
}
}
pub trait NodeDatabase<E: HashElement>: Send + Sync {
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>;
/// `nodes` are a list of tuples `(layer, pos, node)`.
fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>;
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>;
}
/// A dummy database structure for in-memory merkle tree that will not read/write db.
@ -67,4 +134,12 @@ impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> {
Ok(())
}
fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> {
Ok(())
}
fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> {
Ok(())
}
}

View File

@ -690,4 +690,27 @@ impl NodeDatabase<DataRoot> for FlowDBStore {
);
Ok(self.kvdb.write(tx)?)
}
fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (layer_index, position, data) in nodes {
tx.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(*layer_index, *position),
data.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (layer_index, position) in nodes {
tx.delete(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(*layer_index, *position),
);
}
Ok(self.kvdb.write(tx)?)
}
}