mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
9 Commits
3d7121ebb3
...
8d60bb2f8f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
8d60bb2f8f | ||
![]() |
82fd29968b | ||
![]() |
45fa344564 | ||
![]() |
90650294af | ||
![]() |
fd96737c6d | ||
![]() |
b16abe2303 | ||
![]() |
5e14db6422 | ||
![]() |
7589bdf4bb | ||
![]() |
5100c22933 |
34
Cargo.lock
generated
34
Cargo.lock
generated
@ -223,7 +223,9 @@ dependencies = [
|
||||
"eth2_ssz",
|
||||
"eth2_ssz_derive",
|
||||
"ethereum-types 0.14.1",
|
||||
"itertools 0.13.0",
|
||||
"lazy_static",
|
||||
"lru 0.12.5",
|
||||
"once_cell",
|
||||
"serde",
|
||||
"tiny-keccak",
|
||||
@ -1673,7 +1675,7 @@ dependencies = [
|
||||
"hkdf",
|
||||
"lazy_static",
|
||||
"libp2p-core 0.30.2",
|
||||
"lru",
|
||||
"lru 0.7.8",
|
||||
"parking_lot 0.11.2",
|
||||
"rand 0.8.5",
|
||||
"rlp",
|
||||
@ -2514,6 +2516,12 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foldhash"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
@ -2946,6 +2954,17 @@ dependencies = [
|
||||
"allocator-api2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
|
||||
dependencies = [
|
||||
"allocator-api2",
|
||||
"equivalent",
|
||||
"foldhash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashers"
|
||||
version = "1.0.1"
|
||||
@ -4117,7 +4136,7 @@ dependencies = [
|
||||
"libp2p-core 0.33.0",
|
||||
"libp2p-swarm",
|
||||
"log",
|
||||
"lru",
|
||||
"lru 0.7.8",
|
||||
"prost 0.10.4",
|
||||
"prost-build 0.10.4",
|
||||
"prost-codec",
|
||||
@ -4650,6 +4669,15 @@ dependencies = [
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
|
||||
dependencies = [
|
||||
"hashbrown 0.15.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru-cache"
|
||||
version = "0.1.2"
|
||||
@ -5018,7 +5046,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"libp2p",
|
||||
"lighthouse_metrics",
|
||||
"lru",
|
||||
"lru 0.7.8",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
|
@ -36,4 +36,8 @@ eth2_ssz = { path = "version-meld/eth2_ssz" }
|
||||
enr = { path = "version-meld/enr" }
|
||||
|
||||
[profile.bench.package.'storage']
|
||||
debug = true
|
||||
debug = true
|
||||
|
||||
[profile.dev]
|
||||
# enabling debug_assertions will make node fail to start because of checks in `clap`.
|
||||
debug-assertions = false
|
@ -12,4 +12,6 @@ eth2_ssz_derive = "0.3.0"
|
||||
serde = { version = "1.0.137", features = ["derive"] }
|
||||
lazy_static = "1.4.0"
|
||||
tracing = "0.1.36"
|
||||
once_cell = "1.19.0"
|
||||
once_cell = "1.19.0"
|
||||
itertools = "0.13.0"
|
||||
lru = "0.12.5"
|
@ -1,23 +1,28 @@
|
||||
mod merkle_tree;
|
||||
mod node_manager;
|
||||
mod proof;
|
||||
mod sha3;
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use itertools::Itertools;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
use crate::merkle_tree::MerkleTreeWrite;
|
||||
pub use crate::merkle_tree::{
|
||||
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
||||
};
|
||||
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager};
|
||||
pub use proof::{Proof, RangeProof};
|
||||
pub use sha3::Sha3Algorithm;
|
||||
|
||||
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
|
||||
/// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
|
||||
layers: Vec<Vec<E>>,
|
||||
node_manager: NodeManager<E>,
|
||||
/// Keep the delta nodes that can be used to construct a history tree.
|
||||
/// The key is the root node of that version.
|
||||
delta_nodes_map: BTreeMap<u64, DeltaNodes<E>>,
|
||||
@ -35,13 +40,15 @@ pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
|
||||
impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
|
||||
let mut merkle = Self {
|
||||
layers: vec![leaves],
|
||||
node_manager: NodeManager::new_dummy(),
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: None,
|
||||
leaf_height,
|
||||
_a: Default::default(),
|
||||
};
|
||||
merkle.node_manager.add_layer();
|
||||
merkle.node_manager.append_nodes(0, &leaves);
|
||||
if merkle.leaves() == 0 {
|
||||
if let Some(seq) = start_tx_seq {
|
||||
merkle.delta_nodes_map.insert(
|
||||
@ -62,18 +69,21 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
pub fn new_with_subtrees(
|
||||
node_db: Arc<dyn NodeDatabase<E>>,
|
||||
node_cache_capacity: usize,
|
||||
initial_data: MerkleTreeInitialData<E>,
|
||||
leaf_height: usize,
|
||||
start_tx_seq: Option<u64>,
|
||||
) -> Result<Self> {
|
||||
let mut merkle = Self {
|
||||
layers: vec![vec![]],
|
||||
node_manager: NodeManager::new(node_db, node_cache_capacity),
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: None,
|
||||
leaf_height,
|
||||
_a: Default::default(),
|
||||
};
|
||||
merkle.node_manager.add_layer();
|
||||
if initial_data.subtree_list.is_empty() {
|
||||
if let Some(seq) = start_tx_seq {
|
||||
merkle.delta_nodes_map.insert(
|
||||
@ -92,7 +102,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
|
||||
// TODO: Delete duplicate nodes from DB.
|
||||
merkle.layers[layer_index][position] = h;
|
||||
merkle.update_node(layer_index, position, h);
|
||||
}
|
||||
Ok(merkle)
|
||||
}
|
||||
@ -102,13 +112,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
if leaves.is_empty() {
|
||||
// Create an empty merkle tree with `depth`.
|
||||
let mut merkle = Self {
|
||||
layers: vec![vec![]; depth],
|
||||
// dummy node manager for the last chunk.
|
||||
node_manager: NodeManager::new_dummy(),
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: Some(depth),
|
||||
leaf_height: 0,
|
||||
_a: Default::default(),
|
||||
};
|
||||
for _ in 0..depth {
|
||||
merkle.node_manager.add_layer();
|
||||
}
|
||||
if let Some(seq) = start_tx_seq {
|
||||
merkle.delta_nodes_map.insert(
|
||||
seq,
|
||||
@ -119,16 +133,20 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
merkle
|
||||
} else {
|
||||
let mut layers = vec![vec![]; depth];
|
||||
layers[0] = leaves;
|
||||
let mut merkle = Self {
|
||||
layers,
|
||||
// dummy node manager for the last chunk.
|
||||
node_manager: NodeManager::new_dummy(),
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: Some(depth),
|
||||
leaf_height: 0,
|
||||
_a: Default::default(),
|
||||
};
|
||||
merkle.node_manager.add_layer();
|
||||
merkle.append_nodes(0, &leaves);
|
||||
for _ in 1..depth {
|
||||
merkle.node_manager.add_layer();
|
||||
}
|
||||
// Reconstruct the whole tree.
|
||||
merkle.recompute(0, 0, None);
|
||||
// Commit the first version in memory.
|
||||
@ -142,17 +160,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
}
|
||||
self.layers[0].push(new_leaf);
|
||||
self.node_manager.push_node(0, new_leaf);
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
}
|
||||
|
||||
pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
|
||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||
if leaf_list.contains(&E::null()) {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
}
|
||||
let start_index = self.leaves();
|
||||
self.layers[0].append(&mut leaf_list);
|
||||
self.node_manager.append_nodes(0, &leaf_list);
|
||||
self.recompute_after_append_leaves(start_index);
|
||||
}
|
||||
|
||||
@ -192,11 +210,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// updating to null is not allowed.
|
||||
return;
|
||||
}
|
||||
if self.layers[0].is_empty() {
|
||||
if self.layer_len(0) == 0 {
|
||||
// Special case for the first data.
|
||||
self.layers[0].push(updated_leaf);
|
||||
self.push_node(0, updated_leaf);
|
||||
} else {
|
||||
*self.layers[0].last_mut().unwrap() = updated_leaf;
|
||||
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
||||
}
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
}
|
||||
@ -207,13 +225,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
|
||||
if leaf == E::null() {
|
||||
// fill leaf with null is not allowed.
|
||||
} else if self.layers[0][index] == E::null() {
|
||||
self.layers[0][index] = leaf;
|
||||
} else if self.node(0, index) == E::null() {
|
||||
self.update_node(0, index, leaf);
|
||||
self.recompute_after_fill_leaves(index, index + 1);
|
||||
} else if self.layers[0][index] != leaf {
|
||||
} else if self.node(0, index) != leaf {
|
||||
panic!(
|
||||
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
||||
index, self.layers[0][index], leaf
|
||||
index,
|
||||
self.node(0, index),
|
||||
leaf
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -226,18 +246,18 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
&mut self,
|
||||
proof: RangeProof<E>,
|
||||
) -> Result<Vec<(usize, usize, E)>> {
|
||||
self.fill_with_proof(
|
||||
proof
|
||||
.left_proof
|
||||
.proof_nodes_in_tree()
|
||||
.split_off(self.leaf_height),
|
||||
)?;
|
||||
self.fill_with_proof(
|
||||
proof
|
||||
.right_proof
|
||||
.proof_nodes_in_tree()
|
||||
.split_off(self.leaf_height),
|
||||
)
|
||||
let mut updated_nodes = Vec::new();
|
||||
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
|
||||
if left_nodes.len() >= self.leaf_height {
|
||||
updated_nodes
|
||||
.append(&mut self.fill_with_proof(left_nodes.split_off(self.leaf_height))?);
|
||||
}
|
||||
let mut right_nodes = proof.right_proof.proof_nodes_in_tree();
|
||||
if right_nodes.len() >= self.leaf_height {
|
||||
updated_nodes
|
||||
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
|
||||
}
|
||||
Ok(updated_nodes)
|
||||
}
|
||||
|
||||
pub fn fill_with_file_proof(
|
||||
@ -280,28 +300,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
let mut updated_nodes = Vec::new();
|
||||
// A valid proof should not fail the following checks.
|
||||
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
|
||||
let layer = &mut self.layers[i];
|
||||
if position > layer.len() {
|
||||
if position > self.layer_len(i) {
|
||||
bail!(
|
||||
"proof position out of range, position={} layer.len()={}",
|
||||
position,
|
||||
layer.len()
|
||||
self.layer_len(i)
|
||||
);
|
||||
}
|
||||
if position == layer.len() {
|
||||
if position == self.layer_len(i) {
|
||||
// skip padding node.
|
||||
continue;
|
||||
}
|
||||
if layer[position] == E::null() {
|
||||
layer[position] = data.clone();
|
||||
if self.node(i, position) == E::null() {
|
||||
self.update_node(i, position, data.clone());
|
||||
updated_nodes.push((i, position, data))
|
||||
} else if layer[position] != data {
|
||||
} else if self.node(i, position) != data {
|
||||
// The last node in each layer may have changed in the tree.
|
||||
trace!(
|
||||
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
|
||||
i,
|
||||
position,
|
||||
layer[position],
|
||||
self.node(i, position),
|
||||
data
|
||||
);
|
||||
}
|
||||
@ -317,8 +336,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
if position >= self.leaves() {
|
||||
bail!("Out of bound: position={} end={}", position, self.leaves());
|
||||
}
|
||||
if self.layers[0][position] != E::null() {
|
||||
Ok(Some(self.layers[0][position].clone()))
|
||||
if self.node(0, position) != E::null() {
|
||||
Ok(Some(self.node(0, position)))
|
||||
} else {
|
||||
// The leaf hash is unknown.
|
||||
Ok(None)
|
||||
@ -366,10 +385,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
return;
|
||||
}
|
||||
let mut right_most_nodes = Vec::new();
|
||||
for layer in &self.layers {
|
||||
right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone()));
|
||||
for height in 0..self.height() {
|
||||
let pos = self.layer_len(height) - 1;
|
||||
right_most_nodes.push((pos, self.node(height, pos)));
|
||||
}
|
||||
let root = self.root().clone();
|
||||
let root = self.root();
|
||||
self.delta_nodes_map
|
||||
.insert(tx_seq, DeltaNodes::new(right_most_nodes));
|
||||
self.root_to_tx_seq_map.insert(root, tx_seq);
|
||||
@ -377,8 +397,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
fn before_extend_layer(&mut self, height: usize) {
|
||||
if height == self.layers.len() {
|
||||
self.layers.push(Vec::new());
|
||||
if height == self.height() {
|
||||
self.node_manager.add_layer()
|
||||
}
|
||||
}
|
||||
|
||||
@ -395,7 +415,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
/// Given a range of changed leaf nodes and recompute the tree.
|
||||
/// Since this tree is append-only, we always compute to the end.
|
||||
fn recompute(
|
||||
&mut self,
|
||||
mut start_index: usize,
|
||||
@ -405,42 +424,51 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
start_index >>= height;
|
||||
maybe_end_index = maybe_end_index.map(|end| end >> height);
|
||||
// Loop until we compute the new root and reach `tree_depth`.
|
||||
while self.layers[height].len() > 1 || height < self.layers.len() - 1 {
|
||||
while self.layer_len(height) > 1 || height < self.height() - 1 {
|
||||
let next_layer_start_index = start_index >> 1;
|
||||
if start_index % 2 == 1 {
|
||||
start_index -= 1;
|
||||
}
|
||||
|
||||
let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len());
|
||||
if end_index % 2 == 1 && end_index != self.layers[height].len() {
|
||||
let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height));
|
||||
if end_index % 2 == 1 && end_index != self.layer_len(height) {
|
||||
end_index += 1;
|
||||
}
|
||||
let mut i = 0;
|
||||
let mut iter = self.layers[height][start_index..end_index].chunks_exact(2);
|
||||
let iter = self
|
||||
.node_manager
|
||||
.get_nodes(height, start_index, end_index)
|
||||
.chunks(2);
|
||||
// We cannot modify the parent layer while iterating the child layer,
|
||||
// so just keep the changes and update them later.
|
||||
let mut parent_update = Vec::new();
|
||||
while let Some([left, right]) = iter.next() {
|
||||
// If either left or right is null (unknown), we cannot compute the parent hash.
|
||||
// Note that if we are recompute a range of an existing tree,
|
||||
// we do not need to keep these possibly null parent. This is only saved
|
||||
// for the case of constructing a new tree from the leaves.
|
||||
let parent = if *left == E::null() || *right == E::null() {
|
||||
E::null()
|
||||
for chunk_iter in &iter {
|
||||
let chunk: Vec<_> = chunk_iter.collect();
|
||||
if chunk.len() == 2 {
|
||||
let left = &chunk[0];
|
||||
let right = &chunk[1];
|
||||
// If either left or right is null (unknown), we cannot compute the parent hash.
|
||||
// Note that if we are recompute a range of an existing tree,
|
||||
// we do not need to keep these possibly null parent. This is only saved
|
||||
// for the case of constructing a new tree from the leaves.
|
||||
let parent = if *left == E::null() || *right == E::null() {
|
||||
E::null()
|
||||
} else {
|
||||
A::parent(left, right)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
i += 1;
|
||||
} else {
|
||||
A::parent(left, right)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
i += 1;
|
||||
}
|
||||
if let [r] = iter.remainder() {
|
||||
// Same as above.
|
||||
let parent = if *r == E::null() {
|
||||
E::null()
|
||||
} else {
|
||||
A::parent_single(r, height + self.leaf_height)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
assert_eq!(chunk.len(), 1);
|
||||
let r = &chunk[0];
|
||||
// Same as above.
|
||||
let parent = if *r == E::null() {
|
||||
E::null()
|
||||
} else {
|
||||
A::parent_single(r, height + self.leaf_height)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
}
|
||||
}
|
||||
if !parent_update.is_empty() {
|
||||
self.before_extend_layer(height + 1);
|
||||
@ -449,27 +477,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// we can just overwrite `last_changed_parent_index` with new values.
|
||||
let mut last_changed_parent_index = None;
|
||||
for (parent_index, parent) in parent_update {
|
||||
match parent_index.cmp(&self.layers[height + 1].len()) {
|
||||
match parent_index.cmp(&self.layer_len(height + 1)) {
|
||||
Ordering::Less => {
|
||||
// We do not overwrite with null.
|
||||
if parent != E::null() {
|
||||
if self.layers[height + 1][parent_index] == E::null()
|
||||
if self.node(height + 1, parent_index) == E::null()
|
||||
// The last node in a layer can be updated.
|
||||
|| (self.layers[height + 1][parent_index] != parent
|
||||
&& parent_index == self.layers[height + 1].len() - 1)
|
||||
|| (self.node(height + 1, parent_index) != parent
|
||||
&& parent_index == self.layer_len(height + 1) - 1)
|
||||
{
|
||||
self.layers[height + 1][parent_index] = parent;
|
||||
self.update_node(height + 1, parent_index, parent);
|
||||
last_changed_parent_index = Some(parent_index);
|
||||
} else if self.layers[height + 1][parent_index] != parent {
|
||||
} else if self.node(height + 1, parent_index) != parent {
|
||||
// Recompute changes a node in the middle. This should be impossible
|
||||
// if the inputs are valid.
|
||||
panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}",
|
||||
height + 1, parent_index, self.layers[height + 1][parent_index], parent);
|
||||
height + 1, parent_index, self.node(height + 1, parent_index), parent);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ordering::Equal => {
|
||||
self.layers[height + 1].push(parent);
|
||||
self.push_node(height + 1, parent);
|
||||
last_changed_parent_index = Some(parent_index);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
@ -500,10 +528,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
for height in 0..(subtree_depth - 1) {
|
||||
self.before_extend_layer(height);
|
||||
let subtree_layer_size = 1 << (subtree_depth - 1 - height);
|
||||
self.layers[height].append(&mut vec![E::null(); subtree_layer_size]);
|
||||
self.append_nodes(height, &vec![E::null(); subtree_layer_size]);
|
||||
}
|
||||
self.before_extend_layer(subtree_depth - 1);
|
||||
self.layers[subtree_depth - 1].push(subtree_root);
|
||||
self.push_node(subtree_depth - 1, subtree_root);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -514,21 +542,24 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> {
|
||||
if self.layers[0].is_empty() {
|
||||
if self.layer_len(0) == 0 {
|
||||
// Any previous state of an empty tree is always empty.
|
||||
return Ok(());
|
||||
}
|
||||
let delta_nodes = self
|
||||
.delta_nodes_map
|
||||
.get(&tx_seq)
|
||||
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?;
|
||||
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
|
||||
.clone();
|
||||
// Dropping the upper layers that are not in the old merkle tree.
|
||||
self.layers.truncate(delta_nodes.right_most_nodes.len());
|
||||
for height in (delta_nodes.right_most_nodes.len()..(self.height() - 1)).rev() {
|
||||
self.node_manager.truncate_layer(height);
|
||||
}
|
||||
for (height, (last_index, right_most_node)) in
|
||||
delta_nodes.right_most_nodes.iter().enumerate()
|
||||
{
|
||||
self.layers[height].truncate(*last_index + 1);
|
||||
self.layers[height][*last_index] = right_most_node.clone();
|
||||
self.node_manager.truncate_nodes(height, *last_index + 1);
|
||||
self.update_node(height, *last_index, right_most_node.clone())
|
||||
}
|
||||
self.clear_after(tx_seq);
|
||||
Ok(())
|
||||
@ -550,17 +581,23 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
bail!("empty tree");
|
||||
}
|
||||
Ok(HistoryTree {
|
||||
layers: &self.layers,
|
||||
node_manager: &self.node_manager,
|
||||
delta_nodes,
|
||||
leaf_height: self.leaf_height,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.layers = match self.min_depth {
|
||||
None => vec![vec![]],
|
||||
Some(depth) => vec![vec![]; depth],
|
||||
};
|
||||
for height in (0..self.height()).rev() {
|
||||
self.node_manager.truncate_layer(height);
|
||||
}
|
||||
if let Some(depth) = self.min_depth {
|
||||
for _ in 0..depth {
|
||||
self.node_manager.add_layer();
|
||||
}
|
||||
} else {
|
||||
self.node_manager.add_layer();
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_after(&mut self, tx_seq: u64) {
|
||||
@ -580,10 +617,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
fn first_known_root_at(&self, index: usize) -> (usize, E) {
|
||||
let mut height = 0;
|
||||
let mut index_in_layer = index;
|
||||
while height < self.layers.len() {
|
||||
while height < self.height() {
|
||||
let node = self.node(height, index_in_layer);
|
||||
if !node.is_null() {
|
||||
return (height + 1, node.clone());
|
||||
return (height + 1, node);
|
||||
}
|
||||
height += 1;
|
||||
index_in_layer /= 2;
|
||||
@ -628,7 +665,7 @@ impl<E: HashElement> DeltaNodes<E> {
|
||||
|
||||
pub struct HistoryTree<'m, E: HashElement> {
|
||||
/// A reference to the global tree nodes.
|
||||
layers: &'m Vec<Vec<E>>,
|
||||
node_manager: &'m NodeManager<E>,
|
||||
/// The delta nodes that are difference from `layers`.
|
||||
/// This could be a reference, we just take ownership for convenience.
|
||||
delta_nodes: &'m DeltaNodes<E>,
|
||||
@ -639,16 +676,18 @@ pub struct HistoryTree<'m, E: HashElement> {
|
||||
impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A> {
|
||||
type E = E;
|
||||
|
||||
fn node(&self, layer: usize, index: usize) -> &Self::E {
|
||||
&self.layers[layer][index]
|
||||
fn node(&self, layer: usize, index: usize) -> Self::E {
|
||||
self.node_manager
|
||||
.get_node(layer, index)
|
||||
.expect("index checked")
|
||||
}
|
||||
|
||||
fn height(&self) -> usize {
|
||||
self.layers.len()
|
||||
self.node_manager.num_layers()
|
||||
}
|
||||
|
||||
fn layer_len(&self, layer_height: usize) -> usize {
|
||||
self.layers[layer_height].len()
|
||||
self.node_manager.layer_size(layer_height)
|
||||
}
|
||||
|
||||
fn padding_node(&self, height: usize) -> Self::E {
|
||||
@ -658,10 +697,13 @@ impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A>
|
||||
|
||||
impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
|
||||
type E = E;
|
||||
fn node(&self, layer: usize, index: usize) -> &Self::E {
|
||||
fn node(&self, layer: usize, index: usize) -> Self::E {
|
||||
match self.delta_nodes.get(layer, index).expect("range checked") {
|
||||
Some(node) if *node != E::null() => node,
|
||||
_ => &self.layers[layer][index],
|
||||
Some(node) if *node != E::null() => node.clone(),
|
||||
_ => self
|
||||
.node_manager
|
||||
.get_node(layer, index)
|
||||
.expect("index checked"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -678,6 +720,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: HashElement, A: Algorithm<E>> MerkleTreeWrite for AppendMerkleTree<E, A> {
|
||||
type E = E;
|
||||
|
||||
fn push_node(&mut self, layer: usize, node: Self::E) {
|
||||
self.node_manager.push_node(layer, node);
|
||||
}
|
||||
|
||||
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) {
|
||||
self.node_manager.append_nodes(layer, nodes);
|
||||
}
|
||||
|
||||
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) {
|
||||
self.node_manager.add_node(layer, pos, node);
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! ensure_eq {
|
||||
($given:expr, $expected:expr) => {
|
||||
@ -699,9 +757,11 @@ macro_rules! ensure_eq {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::merkle_tree::MerkleTreeRead;
|
||||
use crate::node_manager::EmptyNodeDatabase;
|
||||
use crate::sha3::Sha3Algorithm;
|
||||
use crate::AppendMerkleTree;
|
||||
use ethereum_types::H256;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn test_proof() {
|
||||
|
@ -49,7 +49,7 @@ pub trait Algorithm<E: HashElement> {
|
||||
|
||||
pub trait MerkleTreeRead {
|
||||
type E: HashElement;
|
||||
fn node(&self, layer: usize, index: usize) -> &Self::E;
|
||||
fn node(&self, layer: usize, index: usize) -> Self::E;
|
||||
fn height(&self) -> usize;
|
||||
fn layer_len(&self, layer_height: usize) -> usize;
|
||||
fn padding_node(&self, height: usize) -> Self::E;
|
||||
@ -58,7 +58,7 @@ pub trait MerkleTreeRead {
|
||||
self.layer_len(0)
|
||||
}
|
||||
|
||||
fn root(&self) -> &Self::E {
|
||||
fn root(&self) -> Self::E {
|
||||
self.node(self.height() - 1, 0)
|
||||
}
|
||||
|
||||
@ -70,16 +70,16 @@ pub trait MerkleTreeRead {
|
||||
self.leaves()
|
||||
);
|
||||
}
|
||||
if self.node(0, leaf_index) == &Self::E::null() {
|
||||
if self.node(0, leaf_index) == Self::E::null() {
|
||||
bail!("Not ready to generate proof for leaf_index={}", leaf_index);
|
||||
}
|
||||
if self.height() == 1 {
|
||||
return Proof::new(vec![self.root().clone(), self.root().clone()], vec![]);
|
||||
return Proof::new(vec![self.root(), self.root().clone()], vec![]);
|
||||
}
|
||||
let mut lemma: Vec<Self::E> = Vec::with_capacity(self.height()); // path + root
|
||||
let mut path: Vec<bool> = Vec::with_capacity(self.height() - 2); // path - 1
|
||||
let mut index_in_layer = leaf_index;
|
||||
lemma.push(self.node(0, leaf_index).clone());
|
||||
lemma.push(self.node(0, leaf_index));
|
||||
for height in 0..(self.height() - 1) {
|
||||
trace!(
|
||||
"gen_proof: height={} index={} hash={:?}",
|
||||
@ -93,15 +93,15 @@ pub trait MerkleTreeRead {
|
||||
// TODO: This can be skipped if the tree size is available in validation.
|
||||
lemma.push(self.padding_node(height));
|
||||
} else {
|
||||
lemma.push(self.node(height, index_in_layer + 1).clone());
|
||||
lemma.push(self.node(height, index_in_layer + 1));
|
||||
}
|
||||
} else {
|
||||
path.push(false);
|
||||
lemma.push(self.node(height, index_in_layer - 1).clone());
|
||||
lemma.push(self.node(height, index_in_layer - 1));
|
||||
}
|
||||
index_in_layer >>= 1;
|
||||
}
|
||||
lemma.push(self.root().clone());
|
||||
lemma.push(self.root());
|
||||
if lemma.contains(&Self::E::null()) {
|
||||
bail!(
|
||||
"Not enough data to generate proof, lemma={:?} path={:?}",
|
||||
@ -130,6 +130,13 @@ pub trait MerkleTreeRead {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait MerkleTreeWrite {
|
||||
type E: HashElement;
|
||||
fn push_node(&mut self, layer: usize, node: Self::E);
|
||||
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]);
|
||||
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E);
|
||||
}
|
||||
|
||||
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes
|
||||
/// are `null`. Other intermediate nodes will be computed based on these known nodes.
|
||||
pub struct MerkleTreeInitialData<E: HashElement> {
|
||||
|
154
common/append_merkle/src/node_manager.rs
Normal file
154
common/append_merkle/src/node_manager.rs
Normal file
@ -0,0 +1,154 @@
|
||||
use crate::HashElement;
|
||||
use anyhow::Result;
|
||||
use lru::LruCache;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::error;
|
||||
|
||||
pub struct NodeManager<E: HashElement> {
|
||||
cache: LruCache<(usize, usize), E>,
|
||||
layer_size: Vec<usize>,
|
||||
db: Arc<dyn NodeDatabase<E>>,
|
||||
}
|
||||
|
||||
impl<E: HashElement> NodeManager<E> {
|
||||
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Self {
|
||||
Self {
|
||||
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||
layer_size: vec![],
|
||||
db,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_dummy() -> Self {
|
||||
Self {
|
||||
cache: LruCache::unbounded(),
|
||||
layer_size: vec![],
|
||||
db: Arc::new(EmptyNodeDatabase {}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_node(&mut self, layer: usize, node: E) {
|
||||
self.add_node(layer, self.layer_size[layer], node);
|
||||
self.layer_size[layer] += 1;
|
||||
}
|
||||
|
||||
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
|
||||
let pos = &mut self.layer_size[layer];
|
||||
let mut saved_nodes = Vec::with_capacity(nodes.len());
|
||||
for node in nodes {
|
||||
self.cache.put((layer, *pos), node.clone());
|
||||
saved_nodes.push((layer, *pos, node));
|
||||
*pos += 1;
|
||||
}
|
||||
if let Err(e) = self.db.save_node_list(&saved_nodes) {
|
||||
error!("Failed to save node list: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
||||
match self.cache.peek(&(layer, pos)) {
|
||||
Some(node) => Some(node.clone()),
|
||||
None => self.db.get_node(layer, pos).unwrap_or_else(|e| {
|
||||
error!("Failed to get node: {}", e);
|
||||
None
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator<E> {
|
||||
NodeIterator {
|
||||
node_manager: self,
|
||||
layer,
|
||||
start_pos,
|
||||
end_pos,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
||||
if let Err(e) = self.db.save_node(layer, pos, &node) {
|
||||
error!("Failed to save node: {}", e);
|
||||
}
|
||||
self.cache.put((layer, pos), node);
|
||||
}
|
||||
|
||||
pub fn add_layer(&mut self) {
|
||||
self.layer_size.push(0);
|
||||
}
|
||||
|
||||
pub fn layer_size(&self, layer: usize) -> usize {
|
||||
self.layer_size[layer]
|
||||
}
|
||||
|
||||
pub fn num_layers(&self) -> usize {
|
||||
self.layer_size.len()
|
||||
}
|
||||
|
||||
pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) {
|
||||
let mut removed_nodes = Vec::new();
|
||||
for pos in pos_end..self.layer_size[layer] {
|
||||
self.cache.pop(&(layer, pos));
|
||||
removed_nodes.push((layer, pos));
|
||||
}
|
||||
if let Err(e) = self.db.remove_node_list(&removed_nodes) {
|
||||
error!("Failed to remove node list: {:?}", e);
|
||||
}
|
||||
self.layer_size[layer] = pos_end;
|
||||
}
|
||||
|
||||
pub fn truncate_layer(&mut self, layer: usize) {
|
||||
self.truncate_nodes(layer, 0);
|
||||
if layer == self.num_layers() - 1 {
|
||||
self.layer_size.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NodeIterator<'a, E: HashElement> {
|
||||
node_manager: &'a NodeManager<E>,
|
||||
layer: usize,
|
||||
start_pos: usize,
|
||||
end_pos: usize,
|
||||
}
|
||||
|
||||
impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
|
||||
type Item = E;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.start_pos < self.end_pos {
|
||||
let r = self.node_manager.get_node(self.layer, self.start_pos);
|
||||
self.start_pos += 1;
|
||||
r
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
||||
fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>;
|
||||
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
||||
fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>;
|
||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>;
|
||||
}
|
||||
|
||||
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
||||
pub struct EmptyNodeDatabase {}
|
||||
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
||||
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -9,5 +9,5 @@ exit-future = "0.2.0"
|
||||
futures = "0.3.21"
|
||||
lazy_static = "1.4.0"
|
||||
lighthouse_metrics = { path = "../lighthouse_metrics" }
|
||||
tokio = { version = "1.19.2", features = ["rt"] }
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
tracing = "0.1.35"
|
||||
|
@ -222,7 +222,7 @@ impl LogEntryFetcher {
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let provider = self.provider.clone();
|
||||
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, provider.clone());
|
||||
let contract = self.flow_contract();
|
||||
let log_page_size = self.log_page_size;
|
||||
|
||||
executor.spawn(
|
||||
@ -305,7 +305,7 @@ impl LogEntryFetcher {
|
||||
mut watch_progress_rx: UnboundedReceiver<u64>,
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
||||
let contract = self.flow_contract();
|
||||
let provider = self.provider.clone();
|
||||
let confirmation_delay = self.confirmation_delay;
|
||||
let log_page_size = self.log_page_size;
|
||||
@ -583,6 +583,10 @@ impl LogEntryFetcher {
|
||||
pub fn provider(&self) -> &Provider<RetryClient<Http>> {
|
||||
self.provider.as_ref()
|
||||
}
|
||||
|
||||
pub fn flow_contract(&self) -> ZgsFlow<Provider<RetryClient<Http>>> {
|
||||
ZgsFlow::new(self.contract_address, self.provider.clone())
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_watch_process(
|
||||
|
@ -510,6 +510,41 @@ impl LogSyncManager {
|
||||
}
|
||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||
self.next_tx_seq += 1;
|
||||
|
||||
// Check if the computed data root matches on-chain state.
|
||||
// If the call fails, we won't check the root here and return `true` directly.
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@ use super::{Client, RuntimeContext};
|
||||
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
|
||||
use file_location_cache::FileLocationCache;
|
||||
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
||||
use miner::{MineService, MinerConfig, MinerMessage};
|
||||
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
|
||||
use network::{
|
||||
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
|
||||
Service as LibP2PService,
|
||||
@ -112,7 +112,7 @@ impl ClientBuilder {
|
||||
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
||||
let store = Arc::new(
|
||||
LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
|
||||
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||
);
|
||||
|
||||
@ -216,6 +216,16 @@ impl ClientBuilder {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
|
||||
self.async_store
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.update_shard_config(config)
|
||||
.await;
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Starts the networking stack.
|
||||
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
|
||||
let executor = require!("router", self, runtime_context).clone().executor;
|
||||
|
@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion};
|
||||
use std::net::IpAddr;
|
||||
use std::time::Duration;
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::log_manager::LogConfig;
|
||||
use storage::StorageConfig;
|
||||
|
||||
impl ZgsConfig {
|
||||
@ -101,8 +102,11 @@ impl ZgsConfig {
|
||||
}
|
||||
|
||||
pub fn storage_config(&self) -> Result<StorageConfig, String> {
|
||||
let mut log_config = LogConfig::default();
|
||||
log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity;
|
||||
Ok(StorageConfig {
|
||||
db_dir: self.db_dir.clone().into(),
|
||||
log_config,
|
||||
})
|
||||
}
|
||||
|
||||
@ -228,7 +232,7 @@ impl ZgsConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn shard_config(&self) -> Result<ShardConfig, String> {
|
||||
pub fn shard_config(&self) -> Result<ShardConfig, String> {
|
||||
self.shard_position.clone().try_into()
|
||||
}
|
||||
}
|
||||
|
@ -60,6 +60,7 @@ build_config! {
|
||||
(prune_check_time_s, (u64), 60)
|
||||
(prune_batch_size, (usize), 16 * 1024)
|
||||
(prune_batch_wait_time_ms, (u64), 1000)
|
||||
(merkle_node_cache_capacity, (usize), 32 * 1024 * 1024)
|
||||
|
||||
// misc
|
||||
(log_config_file, (String), "log_config".to_string())
|
||||
|
@ -17,6 +17,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
let miner_config = config.mine_config()?;
|
||||
let router_config = config.router_config(&network_config)?;
|
||||
let pruner_config = config.pruner_config()?;
|
||||
let shard_config = config.shard_config()?;
|
||||
|
||||
ClientBuilder::default()
|
||||
.with_runtime_context(context)
|
||||
@ -30,6 +31,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
.await?
|
||||
.with_miner(miner_config)
|
||||
.await?
|
||||
.with_shard(shard_config)
|
||||
.await?
|
||||
.with_pruner(pruner_config)
|
||||
.await?
|
||||
.with_rpc(config.rpc, config.chunk_pool_config()?)
|
||||
|
@ -29,7 +29,7 @@ itertools = "0.13.0"
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
parking_lot = "0.12.3"
|
||||
serde_json = "1.0.127"
|
||||
tokio = { version = "1.10.0", features = ["sync"] }
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -1,3 +1,4 @@
|
||||
use crate::log_store::log_manager::LogConfig;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr};
|
||||
@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config";
|
||||
#[derive(Clone)]
|
||||
pub struct Config {
|
||||
pub db_dir: PathBuf,
|
||||
pub log_config: LogConfig,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)]
|
||||
|
@ -10,7 +10,7 @@ use crate::log_store::log_manager::{
|
||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase};
|
||||
use itertools::Itertools;
|
||||
use parking_lot::RwLock;
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||
@ -25,15 +25,15 @@ use tracing::{debug, error, trace};
|
||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||
|
||||
pub struct FlowStore {
|
||||
db: FlowDBStore,
|
||||
db: Arc<FlowDBStore>,
|
||||
seal_manager: SealTaskManager,
|
||||
config: FlowConfig,
|
||||
}
|
||||
|
||||
impl FlowStore {
|
||||
pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
|
||||
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||
Self {
|
||||
db: FlowDBStore::new(db),
|
||||
db,
|
||||
seal_manager: Default::default(),
|
||||
config,
|
||||
}
|
||||
@ -93,6 +93,7 @@ impl FlowStore {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FlowConfig {
|
||||
pub batch_size: usize,
|
||||
pub merkle_node_cache_capacity: usize,
|
||||
pub shard_config: Arc<RwLock<ShardConfig>>,
|
||||
}
|
||||
|
||||
@ -100,6 +101,8 @@ impl Default for FlowConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
batch_size: SECTORS_PER_LOAD,
|
||||
// Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size.
|
||||
merkle_node_cache_capacity: 32 * 1024 * 1024,
|
||||
shard_config: Default::default(),
|
||||
}
|
||||
}
|
||||
@ -436,7 +439,7 @@ impl FlowDBStore {
|
||||
let mut expected_index = 0;
|
||||
|
||||
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
||||
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||
|
||||
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
||||
let (index_bytes, root_bytes) = r?;
|
||||
@ -666,3 +669,45 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
||||
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
||||
Ok((layer_index, position))
|
||||
}
|
||||
|
||||
impl NodeDatabase<DataRoot> for FlowDBStore {
|
||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
||||
Ok(self
|
||||
.kvdb
|
||||
.get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
|
||||
.map(|v| DataRoot::from_slice(&v)))
|
||||
}
|
||||
|
||||
fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
tx.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(layer, pos),
|
||||
node.as_bytes(),
|
||||
);
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for (layer_index, position, data) in nodes {
|
||||
tx.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(*layer_index, *position),
|
||||
data.as_bytes(),
|
||||
);
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for (layer_index, position) in nodes {
|
||||
tx.delete(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(*layer_index, *position),
|
||||
);
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
}
|
||||
|
@ -4,11 +4,10 @@ mod seal;
|
||||
mod serde;
|
||||
|
||||
use ::serde::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
|
||||
use anyhow::Result;
|
||||
use ethereum_types::H256;
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::cmp::min;
|
||||
|
||||
use crate::log_store::log_manager::data_to_merkle_leaves;
|
||||
use crate::try_option;
|
||||
@ -206,7 +205,7 @@ impl EntryBatch {
|
||||
}
|
||||
}
|
||||
Ok(Some(
|
||||
*try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
|
||||
try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
|
||||
))
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::config::ShardConfig;
|
||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
|
||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
|
||||
use crate::log_store::tx_store::TransactionStore;
|
||||
use crate::log_store::{
|
||||
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
|
||||
@ -94,6 +94,7 @@ impl MerkleManager {
|
||||
}
|
||||
|
||||
fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
|
||||
debug!("revert merkle tree {}", tx_seq);
|
||||
// Special case for reverting tx_seq == 0
|
||||
if tx_seq == u64::MAX {
|
||||
self.pora_chunks_merkle.reset();
|
||||
@ -116,7 +117,7 @@ impl MerkleManager {
|
||||
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
|
||||
self.last_chunk_merkle.append(H256::zero());
|
||||
self.pora_chunks_merkle
|
||||
.update_last(*self.last_chunk_merkle.root());
|
||||
.update_last(self.last_chunk_merkle.root());
|
||||
} else if self.last_chunk_merkle.leaves() != 0 {
|
||||
let last_chunk_start_index = self.last_chunk_start_index();
|
||||
let last_chunk_data = flow_store.get_available_entries(
|
||||
@ -355,7 +356,7 @@ impl LogStoreWrite for LogManager {
|
||||
merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
|
||||
merkle.try_initialize(&self.flow_store)?;
|
||||
assert_eq!(
|
||||
Some(*merkle.last_chunk_merkle.root()),
|
||||
Some(merkle.last_chunk_merkle.root()),
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
|
||||
@ -577,7 +578,7 @@ impl LogStoreRead for LogManager {
|
||||
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
|
||||
let merkle = self.merkle.read_recursive();
|
||||
Ok((
|
||||
*merkle.pora_chunks_merkle.root(),
|
||||
merkle.pora_chunks_merkle.root(),
|
||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
|
||||
))
|
||||
}
|
||||
@ -626,7 +627,8 @@ impl LogManager {
|
||||
executor: task_executor::TaskExecutor,
|
||||
) -> Result<Self> {
|
||||
let tx_store = TransactionStore::new(db.clone())?;
|
||||
let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
|
||||
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
||||
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
||||
let mut initial_data = flow_store.get_chunk_root_list()?;
|
||||
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
||||
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
||||
@ -705,14 +707,19 @@ impl LogManager {
|
||||
}
|
||||
}
|
||||
|
||||
let mut pora_chunks_merkle =
|
||||
Merkle::new_with_subtrees(initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq)?;
|
||||
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
||||
flow_db,
|
||||
config.flow.merkle_node_cache_capacity,
|
||||
initial_data,
|
||||
log2_pow2(PORA_CHUNK_SIZE),
|
||||
start_tx_seq,
|
||||
)?;
|
||||
let last_chunk_merkle = match start_tx_seq {
|
||||
Some(tx_seq) => {
|
||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
||||
}
|
||||
// Initialize
|
||||
None => Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None),
|
||||
None => Merkle::new_with_depth(vec![], 1, None),
|
||||
};
|
||||
|
||||
debug!(
|
||||
@ -722,7 +729,7 @@ impl LogManager {
|
||||
last_chunk_merkle.leaves(),
|
||||
);
|
||||
if last_chunk_merkle.leaves() != 0 {
|
||||
pora_chunks_merkle.append(*last_chunk_merkle.root());
|
||||
pora_chunks_merkle.append(last_chunk_merkle.root());
|
||||
// update the merkle root
|
||||
pora_chunks_merkle.commit(start_tx_seq);
|
||||
}
|
||||
@ -761,6 +768,10 @@ impl LogManager {
|
||||
.merkle
|
||||
.write()
|
||||
.try_initialize(&log_manager.flow_store)?;
|
||||
info!(
|
||||
"Log manager initialized, state={:?}",
|
||||
log_manager.get_context()?
|
||||
);
|
||||
Ok(log_manager)
|
||||
}
|
||||
|
||||
@ -889,16 +900,16 @@ impl LogManager {
|
||||
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.append_subtree(1, *merkle.last_chunk_merkle.root())?;
|
||||
.append_subtree(1, merkle.last_chunk_merkle.root())?;
|
||||
} else {
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
}
|
||||
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
||||
batch_root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(*merkle.last_chunk_merkle.root(), 1),
|
||||
(merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
self.complete_last_chunk_merkle(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
@ -954,7 +965,7 @@ impl LogManager {
|
||||
.append_list(data_to_merkle_leaves(&pad_data)?);
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
} else {
|
||||
if last_chunk_pad != 0 {
|
||||
is_full_empty = false;
|
||||
@ -964,10 +975,10 @@ impl LogManager {
|
||||
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(*merkle.last_chunk_merkle.root(), 1),
|
||||
(merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
||||
}
|
||||
@ -978,7 +989,7 @@ impl LogManager {
|
||||
let data = pad_data[start_index * ENTRY_SIZE
|
||||
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
|
||||
.to_vec();
|
||||
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
|
||||
let root = Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
|
||||
merkle.pora_chunks_merkle.append(root);
|
||||
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
|
||||
start_index += PORA_CHUNK_SIZE;
|
||||
@ -1056,7 +1067,7 @@ impl LogManager {
|
||||
}
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
}
|
||||
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
|
||||
for (chunk_index, chunk_root) in chunk_roots {
|
||||
|
@ -3,11 +3,14 @@ use crate::log_store::log_manager::{
|
||||
PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
|
||||
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||
use append_merkle::{
|
||||
Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm,
|
||||
};
|
||||
use ethereum_types::H256;
|
||||
use rand::random;
|
||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
|
||||
#[test]
|
||||
|
@ -335,11 +335,7 @@ impl TransactionStore {
|
||||
}
|
||||
let mut merkle = if last_chunk_start_index == 0 {
|
||||
// The first entry hash is initialized as zero.
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
||||
vec![H256::zero()],
|
||||
log2_pow2(PORA_CHUNK_SIZE) + 1,
|
||||
None,
|
||||
)
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
|
||||
} else {
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
||||
vec![],
|
||||
|
@ -1 +1 @@
|
||||
75c251804a29ab22adced50d92478cf0baf834bc
|
||||
66ff70bc88547c7467efd35ba500ae5f25cf8960
|
||||
|
@ -40,8 +40,8 @@
|
||||
"type": "function"
|
||||
}
|
||||
],
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
|
||||
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||
"linkReferences": {},
|
||||
"deployedLinkReferences": {}
|
||||
}
|
||||
|
@ -70,8 +70,8 @@
|
||||
"type": "function"
|
||||
}
|
||||
],
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
|
||||
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||
"linkReferences": {},
|
||||
"deployedLinkReferences": {}
|
||||
}
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue
Block a user