This commit is contained in:
peilun-conflux 2024-10-12 11:04:11 +02:00 committed by GitHub
commit 8d60bb2f8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 443 additions and 127 deletions

34
Cargo.lock generated
View File

@ -223,7 +223,9 @@ dependencies = [
"eth2_ssz",
"eth2_ssz_derive",
"ethereum-types 0.14.1",
"itertools 0.13.0",
"lazy_static",
"lru 0.12.5",
"once_cell",
"serde",
"tiny-keccak",
@ -1673,7 +1675,7 @@ dependencies = [
"hkdf",
"lazy_static",
"libp2p-core 0.30.2",
"lru",
"lru 0.7.8",
"parking_lot 0.11.2",
"rand 0.8.5",
"rlp",
@ -2514,6 +2516,12 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
[[package]]
name = "foreign-types"
version = "0.3.2"
@ -2946,6 +2954,17 @@ dependencies = [
"allocator-api2",
]
[[package]]
name = "hashbrown"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
]
[[package]]
name = "hashers"
version = "1.0.1"
@ -4117,7 +4136,7 @@ dependencies = [
"libp2p-core 0.33.0",
"libp2p-swarm",
"log",
"lru",
"lru 0.7.8",
"prost 0.10.4",
"prost-build 0.10.4",
"prost-codec",
@ -4650,6 +4669,15 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "lru"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
dependencies = [
"hashbrown 0.15.0",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
@ -5018,7 +5046,7 @@ dependencies = [
"lazy_static",
"libp2p",
"lighthouse_metrics",
"lru",
"lru 0.7.8",
"parking_lot 0.12.3",
"rand 0.8.5",
"regex",

View File

@ -36,4 +36,8 @@ eth2_ssz = { path = "version-meld/eth2_ssz" }
enr = { path = "version-meld/enr" }
[profile.bench.package.'storage']
debug = true
debug = true
[profile.dev]
# enabling debug_assertions will make node fail to start because of checks in `clap`.
debug-assertions = false

View File

@ -12,4 +12,6 @@ eth2_ssz_derive = "0.3.0"
serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
tracing = "0.1.36"
once_cell = "1.19.0"
once_cell = "1.19.0"
itertools = "0.13.0"
lru = "0.12.5"

View File

@ -1,23 +1,28 @@
mod merkle_tree;
mod node_manager;
mod proof;
mod sha3;
use anyhow::{anyhow, bail, Result};
use itertools::Itertools;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::{trace, warn};
use crate::merkle_tree::MerkleTreeWrite;
pub use crate::merkle_tree::{
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
};
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager};
pub use proof::{Proof, RangeProof};
pub use sha3::Sha3Algorithm;
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
/// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
layers: Vec<Vec<E>>,
node_manager: NodeManager<E>,
/// Keep the delta nodes that can be used to construct a history tree.
/// The key is the root node of that version.
delta_nodes_map: BTreeMap<u64, DeltaNodes<E>>,
@ -35,13 +40,15 @@ pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
let mut merkle = Self {
layers: vec![leaves],
node_manager: NodeManager::new_dummy(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: None,
leaf_height,
_a: Default::default(),
};
merkle.node_manager.add_layer();
merkle.node_manager.append_nodes(0, &leaves);
if merkle.leaves() == 0 {
if let Some(seq) = start_tx_seq {
merkle.delta_nodes_map.insert(
@ -62,18 +69,21 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn new_with_subtrees(
node_db: Arc<dyn NodeDatabase<E>>,
node_cache_capacity: usize,
initial_data: MerkleTreeInitialData<E>,
leaf_height: usize,
start_tx_seq: Option<u64>,
) -> Result<Self> {
let mut merkle = Self {
layers: vec![vec![]],
node_manager: NodeManager::new(node_db, node_cache_capacity),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: None,
leaf_height,
_a: Default::default(),
};
merkle.node_manager.add_layer();
if initial_data.subtree_list.is_empty() {
if let Some(seq) = start_tx_seq {
merkle.delta_nodes_map.insert(
@ -92,7 +102,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
// TODO: Delete duplicate nodes from DB.
merkle.layers[layer_index][position] = h;
merkle.update_node(layer_index, position, h);
}
Ok(merkle)
}
@ -102,13 +112,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
if leaves.is_empty() {
// Create an empty merkle tree with `depth`.
let mut merkle = Self {
layers: vec![vec![]; depth],
// dummy node manager for the last chunk.
node_manager: NodeManager::new_dummy(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: Some(depth),
leaf_height: 0,
_a: Default::default(),
};
for _ in 0..depth {
merkle.node_manager.add_layer();
}
if let Some(seq) = start_tx_seq {
merkle.delta_nodes_map.insert(
seq,
@ -119,16 +133,20 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
merkle
} else {
let mut layers = vec![vec![]; depth];
layers[0] = leaves;
let mut merkle = Self {
layers,
// dummy node manager for the last chunk.
node_manager: NodeManager::new_dummy(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: Some(depth),
leaf_height: 0,
_a: Default::default(),
};
merkle.node_manager.add_layer();
merkle.append_nodes(0, &leaves);
for _ in 1..depth {
merkle.node_manager.add_layer();
}
// Reconstruct the whole tree.
merkle.recompute(0, 0, None);
// Commit the first version in memory.
@ -142,17 +160,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// appending null is not allowed.
return;
}
self.layers[0].push(new_leaf);
self.node_manager.push_node(0, new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1);
}
pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
pub fn append_list(&mut self, leaf_list: Vec<E>) {
if leaf_list.contains(&E::null()) {
// appending null is not allowed.
return;
}
let start_index = self.leaves();
self.layers[0].append(&mut leaf_list);
self.node_manager.append_nodes(0, &leaf_list);
self.recompute_after_append_leaves(start_index);
}
@ -192,11 +210,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// updating to null is not allowed.
return;
}
if self.layers[0].is_empty() {
if self.layer_len(0) == 0 {
// Special case for the first data.
self.layers[0].push(updated_leaf);
self.push_node(0, updated_leaf);
} else {
*self.layers[0].last_mut().unwrap() = updated_leaf;
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
}
self.recompute_after_append_leaves(self.leaves() - 1);
}
@ -207,13 +225,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
if leaf == E::null() {
// fill leaf with null is not allowed.
} else if self.layers[0][index] == E::null() {
self.layers[0][index] = leaf;
} else if self.node(0, index) == E::null() {
self.update_node(0, index, leaf);
self.recompute_after_fill_leaves(index, index + 1);
} else if self.layers[0][index] != leaf {
} else if self.node(0, index) != leaf {
panic!(
"Fill with invalid leaf, index={} was={:?} get={:?}",
index, self.layers[0][index], leaf
index,
self.node(0, index),
leaf
);
}
}
@ -280,28 +300,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
let mut updated_nodes = Vec::new();
// A valid proof should not fail the following checks.
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
let layer = &mut self.layers[i];
if position > layer.len() {
if position > self.layer_len(i) {
bail!(
"proof position out of range, position={} layer.len()={}",
position,
layer.len()
self.layer_len(i)
);
}
if position == layer.len() {
if position == self.layer_len(i) {
// skip padding node.
continue;
}
if layer[position] == E::null() {
layer[position] = data.clone();
if self.node(i, position) == E::null() {
self.update_node(i, position, data.clone());
updated_nodes.push((i, position, data))
} else if layer[position] != data {
} else if self.node(i, position) != data {
// The last node in each layer may have changed in the tree.
trace!(
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
i,
position,
layer[position],
self.node(i, position),
data
);
}
@ -317,8 +336,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
if position >= self.leaves() {
bail!("Out of bound: position={} end={}", position, self.leaves());
}
if self.layers[0][position] != E::null() {
Ok(Some(self.layers[0][position].clone()))
if self.node(0, position) != E::null() {
Ok(Some(self.node(0, position)))
} else {
// The leaf hash is unknown.
Ok(None)
@ -366,10 +385,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
return;
}
let mut right_most_nodes = Vec::new();
for layer in &self.layers {
right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone()));
for height in 0..self.height() {
let pos = self.layer_len(height) - 1;
right_most_nodes.push((pos, self.node(height, pos)));
}
let root = self.root().clone();
let root = self.root();
self.delta_nodes_map
.insert(tx_seq, DeltaNodes::new(right_most_nodes));
self.root_to_tx_seq_map.insert(root, tx_seq);
@ -377,8 +397,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
fn before_extend_layer(&mut self, height: usize) {
if height == self.layers.len() {
self.layers.push(Vec::new());
if height == self.height() {
self.node_manager.add_layer()
}
}
@ -395,7 +415,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
/// Given a range of changed leaf nodes and recompute the tree.
/// Since this tree is append-only, we always compute to the end.
fn recompute(
&mut self,
mut start_index: usize,
@ -405,42 +424,51 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
start_index >>= height;
maybe_end_index = maybe_end_index.map(|end| end >> height);
// Loop until we compute the new root and reach `tree_depth`.
while self.layers[height].len() > 1 || height < self.layers.len() - 1 {
while self.layer_len(height) > 1 || height < self.height() - 1 {
let next_layer_start_index = start_index >> 1;
if start_index % 2 == 1 {
start_index -= 1;
}
let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len());
if end_index % 2 == 1 && end_index != self.layers[height].len() {
let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height));
if end_index % 2 == 1 && end_index != self.layer_len(height) {
end_index += 1;
}
let mut i = 0;
let mut iter = self.layers[height][start_index..end_index].chunks_exact(2);
let iter = self
.node_manager
.get_nodes(height, start_index, end_index)
.chunks(2);
// We cannot modify the parent layer while iterating the child layer,
// so just keep the changes and update them later.
let mut parent_update = Vec::new();
while let Some([left, right]) = iter.next() {
// If either left or right is null (unknown), we cannot compute the parent hash.
// Note that if we are recompute a range of an existing tree,
// we do not need to keep these possibly null parent. This is only saved
// for the case of constructing a new tree from the leaves.
let parent = if *left == E::null() || *right == E::null() {
E::null()
for chunk_iter in &iter {
let chunk: Vec<_> = chunk_iter.collect();
if chunk.len() == 2 {
let left = &chunk[0];
let right = &chunk[1];
// If either left or right is null (unknown), we cannot compute the parent hash.
// Note that if we are recompute a range of an existing tree,
// we do not need to keep these possibly null parent. This is only saved
// for the case of constructing a new tree from the leaves.
let parent = if *left == E::null() || *right == E::null() {
E::null()
} else {
A::parent(left, right)
};
parent_update.push((next_layer_start_index + i, parent));
i += 1;
} else {
A::parent(left, right)
};
parent_update.push((next_layer_start_index + i, parent));
i += 1;
}
if let [r] = iter.remainder() {
// Same as above.
let parent = if *r == E::null() {
E::null()
} else {
A::parent_single(r, height + self.leaf_height)
};
parent_update.push((next_layer_start_index + i, parent));
assert_eq!(chunk.len(), 1);
let r = &chunk[0];
// Same as above.
let parent = if *r == E::null() {
E::null()
} else {
A::parent_single(r, height + self.leaf_height)
};
parent_update.push((next_layer_start_index + i, parent));
}
}
if !parent_update.is_empty() {
self.before_extend_layer(height + 1);
@ -449,27 +477,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// we can just overwrite `last_changed_parent_index` with new values.
let mut last_changed_parent_index = None;
for (parent_index, parent) in parent_update {
match parent_index.cmp(&self.layers[height + 1].len()) {
match parent_index.cmp(&self.layer_len(height + 1)) {
Ordering::Less => {
// We do not overwrite with null.
if parent != E::null() {
if self.layers[height + 1][parent_index] == E::null()
if self.node(height + 1, parent_index) == E::null()
// The last node in a layer can be updated.
|| (self.layers[height + 1][parent_index] != parent
&& parent_index == self.layers[height + 1].len() - 1)
|| (self.node(height + 1, parent_index) != parent
&& parent_index == self.layer_len(height + 1) - 1)
{
self.layers[height + 1][parent_index] = parent;
self.update_node(height + 1, parent_index, parent);
last_changed_parent_index = Some(parent_index);
} else if self.layers[height + 1][parent_index] != parent {
} else if self.node(height + 1, parent_index) != parent {
// Recompute changes a node in the middle. This should be impossible
// if the inputs are valid.
panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}",
height + 1, parent_index, self.layers[height + 1][parent_index], parent);
height + 1, parent_index, self.node(height + 1, parent_index), parent);
}
}
}
Ordering::Equal => {
self.layers[height + 1].push(parent);
self.push_node(height + 1, parent);
last_changed_parent_index = Some(parent_index);
}
Ordering::Greater => {
@ -500,10 +528,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
for height in 0..(subtree_depth - 1) {
self.before_extend_layer(height);
let subtree_layer_size = 1 << (subtree_depth - 1 - height);
self.layers[height].append(&mut vec![E::null(); subtree_layer_size]);
self.append_nodes(height, &vec![E::null(); subtree_layer_size]);
}
self.before_extend_layer(subtree_depth - 1);
self.layers[subtree_depth - 1].push(subtree_root);
self.push_node(subtree_depth - 1, subtree_root);
Ok(())
}
@ -514,21 +542,24 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> {
if self.layers[0].is_empty() {
if self.layer_len(0) == 0 {
// Any previous state of an empty tree is always empty.
return Ok(());
}
let delta_nodes = self
.delta_nodes_map
.get(&tx_seq)
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?;
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
.clone();
// Dropping the upper layers that are not in the old merkle tree.
self.layers.truncate(delta_nodes.right_most_nodes.len());
for height in (delta_nodes.right_most_nodes.len()..(self.height() - 1)).rev() {
self.node_manager.truncate_layer(height);
}
for (height, (last_index, right_most_node)) in
delta_nodes.right_most_nodes.iter().enumerate()
{
self.layers[height].truncate(*last_index + 1);
self.layers[height][*last_index] = right_most_node.clone();
self.node_manager.truncate_nodes(height, *last_index + 1);
self.update_node(height, *last_index, right_most_node.clone())
}
self.clear_after(tx_seq);
Ok(())
@ -550,17 +581,23 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
bail!("empty tree");
}
Ok(HistoryTree {
layers: &self.layers,
node_manager: &self.node_manager,
delta_nodes,
leaf_height: self.leaf_height,
})
}
pub fn reset(&mut self) {
self.layers = match self.min_depth {
None => vec![vec![]],
Some(depth) => vec![vec![]; depth],
};
for height in (0..self.height()).rev() {
self.node_manager.truncate_layer(height);
}
if let Some(depth) = self.min_depth {
for _ in 0..depth {
self.node_manager.add_layer();
}
} else {
self.node_manager.add_layer();
}
}
fn clear_after(&mut self, tx_seq: u64) {
@ -580,10 +617,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
fn first_known_root_at(&self, index: usize) -> (usize, E) {
let mut height = 0;
let mut index_in_layer = index;
while height < self.layers.len() {
while height < self.height() {
let node = self.node(height, index_in_layer);
if !node.is_null() {
return (height + 1, node.clone());
return (height + 1, node);
}
height += 1;
index_in_layer /= 2;
@ -628,7 +665,7 @@ impl<E: HashElement> DeltaNodes<E> {
pub struct HistoryTree<'m, E: HashElement> {
/// A reference to the global tree nodes.
layers: &'m Vec<Vec<E>>,
node_manager: &'m NodeManager<E>,
/// The delta nodes that are difference from `layers`.
/// This could be a reference, we just take ownership for convenience.
delta_nodes: &'m DeltaNodes<E>,
@ -639,16 +676,18 @@ pub struct HistoryTree<'m, E: HashElement> {
impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A> {
type E = E;
fn node(&self, layer: usize, index: usize) -> &Self::E {
&self.layers[layer][index]
fn node(&self, layer: usize, index: usize) -> Self::E {
self.node_manager
.get_node(layer, index)
.expect("index checked")
}
fn height(&self) -> usize {
self.layers.len()
self.node_manager.num_layers()
}
fn layer_len(&self, layer_height: usize) -> usize {
self.layers[layer_height].len()
self.node_manager.layer_size(layer_height)
}
fn padding_node(&self, height: usize) -> Self::E {
@ -658,10 +697,13 @@ impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A>
impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
type E = E;
fn node(&self, layer: usize, index: usize) -> &Self::E {
fn node(&self, layer: usize, index: usize) -> Self::E {
match self.delta_nodes.get(layer, index).expect("range checked") {
Some(node) if *node != E::null() => node,
_ => &self.layers[layer][index],
Some(node) if *node != E::null() => node.clone(),
_ => self
.node_manager
.get_node(layer, index)
.expect("index checked"),
}
}
@ -678,6 +720,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
}
}
impl<E: HashElement, A: Algorithm<E>> MerkleTreeWrite for AppendMerkleTree<E, A> {
type E = E;
fn push_node(&mut self, layer: usize, node: Self::E) {
self.node_manager.push_node(layer, node);
}
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) {
self.node_manager.append_nodes(layer, nodes);
}
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) {
self.node_manager.add_node(layer, pos, node);
}
}
#[macro_export]
macro_rules! ensure_eq {
($given:expr, $expected:expr) => {
@ -699,9 +757,11 @@ macro_rules! ensure_eq {
#[cfg(test)]
mod tests {
use crate::merkle_tree::MerkleTreeRead;
use crate::node_manager::EmptyNodeDatabase;
use crate::sha3::Sha3Algorithm;
use crate::AppendMerkleTree;
use ethereum_types::H256;
use std::sync::Arc;
#[test]
fn test_proof() {

View File

@ -49,7 +49,7 @@ pub trait Algorithm<E: HashElement> {
pub trait MerkleTreeRead {
type E: HashElement;
fn node(&self, layer: usize, index: usize) -> &Self::E;
fn node(&self, layer: usize, index: usize) -> Self::E;
fn height(&self) -> usize;
fn layer_len(&self, layer_height: usize) -> usize;
fn padding_node(&self, height: usize) -> Self::E;
@ -58,7 +58,7 @@ pub trait MerkleTreeRead {
self.layer_len(0)
}
fn root(&self) -> &Self::E {
fn root(&self) -> Self::E {
self.node(self.height() - 1, 0)
}
@ -70,16 +70,16 @@ pub trait MerkleTreeRead {
self.leaves()
);
}
if self.node(0, leaf_index) == &Self::E::null() {
if self.node(0, leaf_index) == Self::E::null() {
bail!("Not ready to generate proof for leaf_index={}", leaf_index);
}
if self.height() == 1 {
return Proof::new(vec![self.root().clone(), self.root().clone()], vec![]);
return Proof::new(vec![self.root(), self.root().clone()], vec![]);
}
let mut lemma: Vec<Self::E> = Vec::with_capacity(self.height()); // path + root
let mut path: Vec<bool> = Vec::with_capacity(self.height() - 2); // path - 1
let mut index_in_layer = leaf_index;
lemma.push(self.node(0, leaf_index).clone());
lemma.push(self.node(0, leaf_index));
for height in 0..(self.height() - 1) {
trace!(
"gen_proof: height={} index={} hash={:?}",
@ -93,15 +93,15 @@ pub trait MerkleTreeRead {
// TODO: This can be skipped if the tree size is available in validation.
lemma.push(self.padding_node(height));
} else {
lemma.push(self.node(height, index_in_layer + 1).clone());
lemma.push(self.node(height, index_in_layer + 1));
}
} else {
path.push(false);
lemma.push(self.node(height, index_in_layer - 1).clone());
lemma.push(self.node(height, index_in_layer - 1));
}
index_in_layer >>= 1;
}
lemma.push(self.root().clone());
lemma.push(self.root());
if lemma.contains(&Self::E::null()) {
bail!(
"Not enough data to generate proof, lemma={:?} path={:?}",
@ -130,6 +130,13 @@ pub trait MerkleTreeRead {
}
}
pub trait MerkleTreeWrite {
type E: HashElement;
fn push_node(&mut self, layer: usize, node: Self::E);
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]);
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E);
}
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes
/// are `null`. Other intermediate nodes will be computed based on these known nodes.
pub struct MerkleTreeInitialData<E: HashElement> {

View File

@ -0,0 +1,154 @@
use crate::HashElement;
use anyhow::Result;
use lru::LruCache;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::error;
pub struct NodeManager<E: HashElement> {
cache: LruCache<(usize, usize), E>,
layer_size: Vec<usize>,
db: Arc<dyn NodeDatabase<E>>,
}
impl<E: HashElement> NodeManager<E> {
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Self {
Self {
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
layer_size: vec![],
db,
}
}
pub fn new_dummy() -> Self {
Self {
cache: LruCache::unbounded(),
layer_size: vec![],
db: Arc::new(EmptyNodeDatabase {}),
}
}
pub fn push_node(&mut self, layer: usize, node: E) {
self.add_node(layer, self.layer_size[layer], node);
self.layer_size[layer] += 1;
}
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
let pos = &mut self.layer_size[layer];
let mut saved_nodes = Vec::with_capacity(nodes.len());
for node in nodes {
self.cache.put((layer, *pos), node.clone());
saved_nodes.push((layer, *pos, node));
*pos += 1;
}
if let Err(e) = self.db.save_node_list(&saved_nodes) {
error!("Failed to save node list: {:?}", e);
}
}
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
match self.cache.peek(&(layer, pos)) {
Some(node) => Some(node.clone()),
None => self.db.get_node(layer, pos).unwrap_or_else(|e| {
error!("Failed to get node: {}", e);
None
}),
}
}
pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator<E> {
NodeIterator {
node_manager: self,
layer,
start_pos,
end_pos,
}
}
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
if let Err(e) = self.db.save_node(layer, pos, &node) {
error!("Failed to save node: {}", e);
}
self.cache.put((layer, pos), node);
}
pub fn add_layer(&mut self) {
self.layer_size.push(0);
}
pub fn layer_size(&self, layer: usize) -> usize {
self.layer_size[layer]
}
pub fn num_layers(&self) -> usize {
self.layer_size.len()
}
pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) {
let mut removed_nodes = Vec::new();
for pos in pos_end..self.layer_size[layer] {
self.cache.pop(&(layer, pos));
removed_nodes.push((layer, pos));
}
if let Err(e) = self.db.remove_node_list(&removed_nodes) {
error!("Failed to remove node list: {:?}", e);
}
self.layer_size[layer] = pos_end;
}
pub fn truncate_layer(&mut self, layer: usize) {
self.truncate_nodes(layer, 0);
if layer == self.num_layers() - 1 {
self.layer_size.pop();
}
}
}
pub struct NodeIterator<'a, E: HashElement> {
node_manager: &'a NodeManager<E>,
layer: usize,
start_pos: usize,
end_pos: usize,
}
impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
type Item = E;
fn next(&mut self) -> Option<Self::Item> {
if self.start_pos < self.end_pos {
let r = self.node_manager.get_node(self.layer, self.start_pos);
self.start_pos += 1;
r
} else {
None
}
}
}
pub trait NodeDatabase<E: HashElement>: Send + Sync {
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>;
/// `nodes` are a list of tuples `(layer, pos, node)`.
fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>;
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>;
}
/// A dummy database structure for in-memory merkle tree that will not read/write db.
pub struct EmptyNodeDatabase {}
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
Ok(None)
}
fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> {
Ok(())
}
fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> {
Ok(())
}
fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> {
Ok(())
}
}

View File

@ -112,7 +112,7 @@ impl ClientBuilder {
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);

View File

@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion};
use std::net::IpAddr;
use std::time::Duration;
use storage::config::ShardConfig;
use storage::log_store::log_manager::LogConfig;
use storage::StorageConfig;
impl ZgsConfig {
@ -101,8 +102,11 @@ impl ZgsConfig {
}
pub fn storage_config(&self) -> Result<StorageConfig, String> {
let mut log_config = LogConfig::default();
log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity;
Ok(StorageConfig {
db_dir: self.db_dir.clone().into(),
log_config,
})
}

View File

@ -60,6 +60,7 @@ build_config! {
(prune_check_time_s, (u64), 60)
(prune_batch_size, (usize), 16 * 1024)
(prune_batch_wait_time_ms, (u64), 1000)
(merkle_node_cache_capacity, (usize), 32 * 1024 * 1024)
// misc
(log_config_file, (String), "log_config".to_string())

View File

@ -1,3 +1,4 @@
use crate::log_store::log_manager::LogConfig;
use serde::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr};
@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config";
#[derive(Clone)]
pub struct Config {
pub db_dir: PathBuf,
pub log_config: LogConfig,
}
#[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)]

View File

@ -10,7 +10,7 @@ use crate::log_store::log_manager::{
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase};
use itertools::Itertools;
use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
@ -25,15 +25,15 @@ use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
pub struct FlowStore {
db: FlowDBStore,
db: Arc<FlowDBStore>,
seal_manager: SealTaskManager,
config: FlowConfig,
}
impl FlowStore {
pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
Self {
db: FlowDBStore::new(db),
db,
seal_manager: Default::default(),
config,
}
@ -93,6 +93,7 @@ impl FlowStore {
#[derive(Clone, Debug)]
pub struct FlowConfig {
pub batch_size: usize,
pub merkle_node_cache_capacity: usize,
pub shard_config: Arc<RwLock<ShardConfig>>,
}
@ -100,6 +101,8 @@ impl Default for FlowConfig {
fn default() -> Self {
Self {
batch_size: SECTORS_PER_LOAD,
// Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size.
merkle_node_cache_capacity: 32 * 1024 * 1024,
shard_config: Default::default(),
}
}
@ -436,7 +439,7 @@ impl FlowDBStore {
let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?;
@ -666,3 +669,45 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
Ok((layer_index, position))
}
impl NodeDatabase<DataRoot> for FlowDBStore {
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
Ok(self
.kvdb
.get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
.map(|v| DataRoot::from_slice(&v)))
}
fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> {
let mut tx = self.kvdb.transaction();
tx.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(layer, pos),
node.as_bytes(),
);
Ok(self.kvdb.write(tx)?)
}
fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (layer_index, position, data) in nodes {
tx.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(*layer_index, *position),
data.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (layer_index, position) in nodes {
tx.delete(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(*layer_index, *position),
);
}
Ok(self.kvdb.write(tx)?)
}
}

View File

@ -4,11 +4,10 @@ mod seal;
mod serde;
use ::serde::{Deserialize, Serialize};
use std::cmp::min;
use anyhow::Result;
use ethereum_types::H256;
use ssz_derive::{Decode, Encode};
use std::cmp::min;
use crate::log_store::log_manager::data_to_merkle_leaves;
use crate::try_option;
@ -206,7 +205,7 @@ impl EntryBatch {
}
}
Ok(Some(
*try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
))
}

View File

@ -1,5 +1,5 @@
use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
use crate::log_store::tx_store::TransactionStore;
use crate::log_store::{
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
@ -94,6 +94,7 @@ impl MerkleManager {
}
fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
debug!("revert merkle tree {}", tx_seq);
// Special case for reverting tx_seq == 0
if tx_seq == u64::MAX {
self.pora_chunks_merkle.reset();
@ -116,7 +117,7 @@ impl MerkleManager {
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
self.last_chunk_merkle.append(H256::zero());
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
.update_last(self.last_chunk_merkle.root());
} else if self.last_chunk_merkle.leaves() != 0 {
let last_chunk_start_index = self.last_chunk_start_index();
let last_chunk_data = flow_store.get_available_entries(
@ -355,7 +356,7 @@ impl LogStoreWrite for LogManager {
merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
merkle.try_initialize(&self.flow_store)?;
assert_eq!(
Some(*merkle.last_chunk_merkle.root()),
Some(merkle.last_chunk_merkle.root()),
merkle
.pora_chunks_merkle
.leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
@ -577,7 +578,7 @@ impl LogStoreRead for LogManager {
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
let merkle = self.merkle.read_recursive();
Ok((
*merkle.pora_chunks_merkle.root(),
merkle.pora_chunks_merkle.root(),
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
))
}
@ -626,7 +627,8 @@ impl LogManager {
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let tx_store = TransactionStore::new(db.clone())?;
let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
let mut initial_data = flow_store.get_chunk_root_list()?;
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
@ -705,8 +707,13 @@ impl LogManager {
}
}
let mut pora_chunks_merkle =
Merkle::new_with_subtrees(initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq)?;
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
flow_db,
config.flow.merkle_node_cache_capacity,
initial_data,
log2_pow2(PORA_CHUNK_SIZE),
start_tx_seq,
)?;
let last_chunk_merkle = match start_tx_seq {
Some(tx_seq) => {
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
@ -722,7 +729,7 @@ impl LogManager {
last_chunk_merkle.leaves(),
);
if last_chunk_merkle.leaves() != 0 {
pora_chunks_merkle.append(*last_chunk_merkle.root());
pora_chunks_merkle.append(last_chunk_merkle.root());
// update the merkle root
pora_chunks_merkle.commit(start_tx_seq);
}
@ -893,16 +900,16 @@ impl LogManager {
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
merkle
.pora_chunks_merkle
.append_subtree(1, *merkle.last_chunk_merkle.root())?;
.append_subtree(1, merkle.last_chunk_merkle.root())?;
} else {
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
.update_last(merkle.last_chunk_merkle.root());
}
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
batch_root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(*merkle.last_chunk_merkle.root(), 1),
(merkle.last_chunk_merkle.root(), 1),
);
self.complete_last_chunk_merkle(
merkle.pora_chunks_merkle.leaves() - 1,
@ -958,7 +965,7 @@ impl LogManager {
.append_list(data_to_merkle_leaves(&pad_data)?);
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
.update_last(merkle.last_chunk_merkle.root());
} else {
if last_chunk_pad != 0 {
is_full_empty = false;
@ -968,10 +975,10 @@ impl LogManager {
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
.update_last(merkle.last_chunk_merkle.root());
root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(*merkle.last_chunk_merkle.root(), 1),
(merkle.last_chunk_merkle.root(), 1),
);
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
}
@ -982,7 +989,7 @@ impl LogManager {
let data = pad_data[start_index * ENTRY_SIZE
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
.to_vec();
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
let root = Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
merkle.pora_chunks_merkle.append(root);
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
start_index += PORA_CHUNK_SIZE;
@ -1060,7 +1067,7 @@ impl LogManager {
}
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
.update_last(merkle.last_chunk_merkle.root());
}
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
for (chunk_index, chunk_root) in chunk_roots {

View File

@ -3,11 +3,14 @@ use crate::log_store::log_manager::{
PORA_CHUNK_SIZE,
};
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
use append_merkle::{
Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm,
};
use ethereum_types::H256;
use rand::random;
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
use std::cmp;
use std::sync::Arc;
use task_executor::test_utils::TestRuntime;
#[test]