mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
13 Commits
2a5a230f21
...
0cd07d8903
Author | SHA1 | Date | |
---|---|---|---|
![]() |
0cd07d8903 | ||
![]() |
506d234562 | ||
![]() |
8f17a7ad72 | ||
![]() |
2947cb7ac6 | ||
![]() |
39efb721c5 | ||
![]() |
9fe5a2c18b | ||
![]() |
80b4d63cba | ||
![]() |
b2a70501c2 | ||
![]() |
e701c8fdbd | ||
![]() |
a4b02a21b7 | ||
![]() |
3fc1543fb4 | ||
![]() |
82fd29968b | ||
![]() |
45fa344564 |
37
Cargo.lock
generated
37
Cargo.lock
generated
@ -223,7 +223,9 @@ dependencies = [
|
||||
"eth2_ssz",
|
||||
"eth2_ssz_derive",
|
||||
"ethereum-types 0.14.1",
|
||||
"itertools 0.13.0",
|
||||
"lazy_static",
|
||||
"lru 0.12.5",
|
||||
"once_cell",
|
||||
"serde",
|
||||
"tiny-keccak",
|
||||
@ -1673,7 +1675,7 @@ dependencies = [
|
||||
"hkdf",
|
||||
"lazy_static",
|
||||
"libp2p-core 0.30.2",
|
||||
"lru",
|
||||
"lru 0.7.8",
|
||||
"parking_lot 0.11.2",
|
||||
"rand 0.8.5",
|
||||
"rlp",
|
||||
@ -2514,6 +2516,12 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foldhash"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
@ -2946,6 +2954,17 @@ dependencies = [
|
||||
"allocator-api2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
|
||||
dependencies = [
|
||||
"allocator-api2",
|
||||
"equivalent",
|
||||
"foldhash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashers"
|
||||
version = "1.0.1"
|
||||
@ -4117,7 +4136,7 @@ dependencies = [
|
||||
"libp2p-core 0.33.0",
|
||||
"libp2p-swarm",
|
||||
"log",
|
||||
"lru",
|
||||
"lru 0.7.8",
|
||||
"prost 0.10.4",
|
||||
"prost-build 0.10.4",
|
||||
"prost-codec",
|
||||
@ -4650,6 +4669,15 @@ dependencies = [
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
|
||||
dependencies = [
|
||||
"hashbrown 0.15.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru-cache"
|
||||
version = "0.1.2"
|
||||
@ -4715,9 +4743,10 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "metrics"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=992ebc5483d937c8f6b883e266f8ed2a67a7fa9a#992ebc5483d937c8f6b883e266f8ed2a67a7fa9a"
|
||||
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603#c4734e337c66d38e6396742cd5117b596e8d2603"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"duration-str",
|
||||
"futures",
|
||||
"influx_db_client",
|
||||
"lazy_static",
|
||||
@ -5018,7 +5047,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"libp2p",
|
||||
"lighthouse_metrics",
|
||||
"lru",
|
||||
"lru 0.7.8",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
|
@ -28,7 +28,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.dependencies]
|
||||
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" }
|
||||
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "c4734e337c66d38e6396742cd5117b596e8d2603" }
|
||||
|
||||
[patch.crates-io]
|
||||
discv5 = { path = "version-meld/discv5" }
|
||||
@ -36,4 +36,8 @@ eth2_ssz = { path = "version-meld/eth2_ssz" }
|
||||
enr = { path = "version-meld/enr" }
|
||||
|
||||
[profile.bench.package.'storage']
|
||||
debug = true
|
||||
debug = true
|
||||
|
||||
[profile.dev]
|
||||
# enabling debug_assertions will make node fail to start because of checks in `clap`.
|
||||
debug-assertions = false
|
@ -12,4 +12,6 @@ eth2_ssz_derive = "0.3.0"
|
||||
serde = { version = "1.0.137", features = ["derive"] }
|
||||
lazy_static = "1.4.0"
|
||||
tracing = "0.1.36"
|
||||
once_cell = "1.19.0"
|
||||
once_cell = "1.19.0"
|
||||
itertools = "0.13.0"
|
||||
lru = "0.12.5"
|
@ -1,23 +1,28 @@
|
||||
mod merkle_tree;
|
||||
mod node_manager;
|
||||
mod proof;
|
||||
mod sha3;
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use itertools::Itertools;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
use crate::merkle_tree::MerkleTreeWrite;
|
||||
pub use crate::merkle_tree::{
|
||||
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
||||
};
|
||||
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction};
|
||||
pub use proof::{Proof, RangeProof};
|
||||
pub use sha3::Sha3Algorithm;
|
||||
|
||||
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
|
||||
/// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
|
||||
layers: Vec<Vec<E>>,
|
||||
node_manager: NodeManager<E>,
|
||||
/// Keep the delta nodes that can be used to construct a history tree.
|
||||
/// The key is the root node of that version.
|
||||
delta_nodes_map: BTreeMap<u64, DeltaNodes<E>>,
|
||||
@ -35,13 +40,16 @@ pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
|
||||
impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
|
||||
let mut merkle = Self {
|
||||
layers: vec![leaves],
|
||||
node_manager: NodeManager::new_dummy(),
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: None,
|
||||
leaf_height,
|
||||
_a: Default::default(),
|
||||
};
|
||||
merkle.node_manager.start_transaction();
|
||||
merkle.node_manager.add_layer();
|
||||
merkle.node_manager.append_nodes(0, &leaves);
|
||||
if merkle.leaves() == 0 {
|
||||
if let Some(seq) = start_tx_seq {
|
||||
merkle.delta_nodes_map.insert(
|
||||
@ -51,10 +59,12 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
},
|
||||
);
|
||||
}
|
||||
merkle.node_manager.commit();
|
||||
return merkle;
|
||||
}
|
||||
// Reconstruct the whole tree.
|
||||
merkle.recompute(0, 0, None);
|
||||
merkle.node_manager.commit();
|
||||
// Commit the first version in memory.
|
||||
// TODO(zz): Check when the roots become available.
|
||||
merkle.commit(start_tx_seq);
|
||||
@ -62,53 +72,44 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
pub fn new_with_subtrees(
|
||||
initial_data: MerkleTreeInitialData<E>,
|
||||
node_db: Arc<dyn NodeDatabase<E>>,
|
||||
node_cache_capacity: usize,
|
||||
leaf_height: usize,
|
||||
start_tx_seq: Option<u64>,
|
||||
) -> Result<Self> {
|
||||
let mut merkle = Self {
|
||||
layers: vec![vec![]],
|
||||
node_manager: NodeManager::new(node_db, node_cache_capacity)?,
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: None,
|
||||
leaf_height,
|
||||
_a: Default::default(),
|
||||
};
|
||||
if initial_data.subtree_list.is_empty() {
|
||||
if let Some(seq) = start_tx_seq {
|
||||
merkle.delta_nodes_map.insert(
|
||||
seq,
|
||||
DeltaNodes {
|
||||
right_most_nodes: vec![],
|
||||
},
|
||||
);
|
||||
}
|
||||
return Ok(merkle);
|
||||
}
|
||||
merkle.append_subtree_list(initial_data.subtree_list)?;
|
||||
merkle.commit(start_tx_seq);
|
||||
for (index, h) in initial_data.known_leaves {
|
||||
merkle.fill_leaf(index, h);
|
||||
}
|
||||
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
|
||||
// TODO: Delete duplicate nodes from DB.
|
||||
merkle.layers[layer_index][position] = h;
|
||||
if merkle.height() == 0 {
|
||||
merkle.node_manager.start_transaction();
|
||||
merkle.node_manager.add_layer();
|
||||
merkle.node_manager.commit();
|
||||
}
|
||||
Ok(merkle)
|
||||
}
|
||||
|
||||
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
||||
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
||||
let mut node_manager = NodeManager::new_dummy();
|
||||
node_manager.start_transaction();
|
||||
if leaves.is_empty() {
|
||||
// Create an empty merkle tree with `depth`.
|
||||
let mut merkle = Self {
|
||||
layers: vec![vec![]; depth],
|
||||
// dummy node manager for the last chunk.
|
||||
node_manager,
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: Some(depth),
|
||||
leaf_height: 0,
|
||||
_a: Default::default(),
|
||||
};
|
||||
for _ in 0..depth {
|
||||
merkle.node_manager.add_layer();
|
||||
}
|
||||
if let Some(seq) = start_tx_seq {
|
||||
merkle.delta_nodes_map.insert(
|
||||
seq,
|
||||
@ -117,20 +118,26 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
},
|
||||
);
|
||||
}
|
||||
merkle.node_manager.commit();
|
||||
merkle
|
||||
} else {
|
||||
let mut layers = vec![vec![]; depth];
|
||||
layers[0] = leaves;
|
||||
let mut merkle = Self {
|
||||
layers,
|
||||
// dummy node manager for the last chunk.
|
||||
node_manager,
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: Some(depth),
|
||||
leaf_height: 0,
|
||||
_a: Default::default(),
|
||||
};
|
||||
merkle.node_manager.add_layer();
|
||||
merkle.append_nodes(0, &leaves);
|
||||
for _ in 1..depth {
|
||||
merkle.node_manager.add_layer();
|
||||
}
|
||||
// Reconstruct the whole tree.
|
||||
merkle.recompute(0, 0, None);
|
||||
merkle.node_manager.commit();
|
||||
// Commit the first version in memory.
|
||||
merkle.commit(start_tx_seq);
|
||||
merkle
|
||||
@ -142,18 +149,22 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
}
|
||||
self.layers[0].push(new_leaf);
|
||||
self.node_manager.start_transaction();
|
||||
self.node_manager.push_node(0, new_leaf);
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
|
||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||
if leaf_list.contains(&E::null()) {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let start_index = self.leaves();
|
||||
self.layers[0].append(&mut leaf_list);
|
||||
self.node_manager.append_nodes(0, &leaf_list);
|
||||
self.recompute_after_append_leaves(start_index);
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
/// Append a leaf list by providing their intermediate node hash.
|
||||
@ -166,9 +177,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_root is null");
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let start_index = self.leaves();
|
||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -177,11 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_list contains null");
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
for (subtree_depth, subtree_root) in subtree_list {
|
||||
let start_index = self.leaves();
|
||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
}
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -192,13 +207,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// updating to null is not allowed.
|
||||
return;
|
||||
}
|
||||
if self.layers[0].is_empty() {
|
||||
self.node_manager.start_transaction();
|
||||
if self.layer_len(0) == 0 {
|
||||
// Special case for the first data.
|
||||
self.layers[0].push(updated_leaf);
|
||||
self.push_node(0, updated_leaf);
|
||||
} else {
|
||||
*self.layers[0].last_mut().unwrap() = updated_leaf;
|
||||
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
||||
}
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
/// Fill an unknown `null` leaf with its real value.
|
||||
@ -207,13 +224,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
|
||||
if leaf == E::null() {
|
||||
// fill leaf with null is not allowed.
|
||||
} else if self.layers[0][index] == E::null() {
|
||||
self.layers[0][index] = leaf;
|
||||
} else if self.node(0, index) == E::null() {
|
||||
self.node_manager.start_transaction();
|
||||
self.update_node(0, index, leaf);
|
||||
self.recompute_after_fill_leaves(index, index + 1);
|
||||
} else if self.layers[0][index] != leaf {
|
||||
self.node_manager.commit();
|
||||
} else if self.node(0, index) != leaf {
|
||||
panic!(
|
||||
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
||||
index, self.layers[0][index], leaf
|
||||
index,
|
||||
self.node(0, index),
|
||||
leaf
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -226,18 +247,20 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
&mut self,
|
||||
proof: RangeProof<E>,
|
||||
) -> Result<Vec<(usize, usize, E)>> {
|
||||
self.fill_with_proof(
|
||||
proof
|
||||
.left_proof
|
||||
.proof_nodes_in_tree()
|
||||
.split_off(self.leaf_height),
|
||||
)?;
|
||||
self.fill_with_proof(
|
||||
proof
|
||||
.right_proof
|
||||
.proof_nodes_in_tree()
|
||||
.split_off(self.leaf_height),
|
||||
)
|
||||
self.node_manager.start_transaction();
|
||||
let mut updated_nodes = Vec::new();
|
||||
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
|
||||
if left_nodes.len() >= self.leaf_height {
|
||||
updated_nodes
|
||||
.append(&mut self.fill_with_proof(left_nodes.split_off(self.leaf_height))?);
|
||||
}
|
||||
let mut right_nodes = proof.right_proof.proof_nodes_in_tree();
|
||||
if right_nodes.len() >= self.leaf_height {
|
||||
updated_nodes
|
||||
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
|
||||
}
|
||||
self.node_manager.commit();
|
||||
Ok(updated_nodes)
|
||||
}
|
||||
|
||||
pub fn fill_with_file_proof(
|
||||
@ -262,13 +285,16 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
if tx_merkle_nodes.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let mut position_and_data =
|
||||
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
||||
let start_index = (start_index >> self.leaf_height) as usize;
|
||||
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
||||
*position += start_index >> i;
|
||||
}
|
||||
self.fill_with_proof(position_and_data)
|
||||
let updated_nodes = self.fill_with_proof(position_and_data)?;
|
||||
self.node_manager.commit();
|
||||
Ok(updated_nodes)
|
||||
}
|
||||
|
||||
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
||||
@ -280,28 +306,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
let mut updated_nodes = Vec::new();
|
||||
// A valid proof should not fail the following checks.
|
||||
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
|
||||
let layer = &mut self.layers[i];
|
||||
if position > layer.len() {
|
||||
if position > self.layer_len(i) {
|
||||
bail!(
|
||||
"proof position out of range, position={} layer.len()={}",
|
||||
position,
|
||||
layer.len()
|
||||
self.layer_len(i)
|
||||
);
|
||||
}
|
||||
if position == layer.len() {
|
||||
if position == self.layer_len(i) {
|
||||
// skip padding node.
|
||||
continue;
|
||||
}
|
||||
if layer[position] == E::null() {
|
||||
layer[position] = data.clone();
|
||||
if self.node(i, position) == E::null() {
|
||||
self.update_node(i, position, data.clone());
|
||||
updated_nodes.push((i, position, data))
|
||||
} else if layer[position] != data {
|
||||
} else if self.node(i, position) != data {
|
||||
// The last node in each layer may have changed in the tree.
|
||||
trace!(
|
||||
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
|
||||
i,
|
||||
position,
|
||||
layer[position],
|
||||
self.node(i, position),
|
||||
data
|
||||
);
|
||||
}
|
||||
@ -317,8 +342,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
if position >= self.leaves() {
|
||||
bail!("Out of bound: position={} end={}", position, self.leaves());
|
||||
}
|
||||
if self.layers[0][position] != E::null() {
|
||||
Ok(Some(self.layers[0][position].clone()))
|
||||
if self.node(0, position) != E::null() {
|
||||
Ok(Some(self.node(0, position)))
|
||||
} else {
|
||||
// The leaf hash is unknown.
|
||||
Ok(None)
|
||||
@ -366,10 +391,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
return;
|
||||
}
|
||||
let mut right_most_nodes = Vec::new();
|
||||
for layer in &self.layers {
|
||||
right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone()));
|
||||
for height in 0..self.height() {
|
||||
let pos = self.layer_len(height) - 1;
|
||||
right_most_nodes.push((pos, self.node(height, pos)));
|
||||
}
|
||||
let root = self.root().clone();
|
||||
let root = self.root();
|
||||
self.delta_nodes_map
|
||||
.insert(tx_seq, DeltaNodes::new(right_most_nodes));
|
||||
self.root_to_tx_seq_map.insert(root, tx_seq);
|
||||
@ -377,8 +403,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
fn before_extend_layer(&mut self, height: usize) {
|
||||
if height == self.layers.len() {
|
||||
self.layers.push(Vec::new());
|
||||
if height == self.height() {
|
||||
self.node_manager.add_layer()
|
||||
}
|
||||
}
|
||||
|
||||
@ -395,7 +421,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
/// Given a range of changed leaf nodes and recompute the tree.
|
||||
/// Since this tree is append-only, we always compute to the end.
|
||||
fn recompute(
|
||||
&mut self,
|
||||
mut start_index: usize,
|
||||
@ -405,42 +430,51 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
start_index >>= height;
|
||||
maybe_end_index = maybe_end_index.map(|end| end >> height);
|
||||
// Loop until we compute the new root and reach `tree_depth`.
|
||||
while self.layers[height].len() > 1 || height < self.layers.len() - 1 {
|
||||
while self.layer_len(height) > 1 || height < self.height() - 1 {
|
||||
let next_layer_start_index = start_index >> 1;
|
||||
if start_index % 2 == 1 {
|
||||
start_index -= 1;
|
||||
}
|
||||
|
||||
let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len());
|
||||
if end_index % 2 == 1 && end_index != self.layers[height].len() {
|
||||
let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height));
|
||||
if end_index % 2 == 1 && end_index != self.layer_len(height) {
|
||||
end_index += 1;
|
||||
}
|
||||
let mut i = 0;
|
||||
let mut iter = self.layers[height][start_index..end_index].chunks_exact(2);
|
||||
let iter = self
|
||||
.node_manager
|
||||
.get_nodes(height, start_index, end_index)
|
||||
.chunks(2);
|
||||
// We cannot modify the parent layer while iterating the child layer,
|
||||
// so just keep the changes and update them later.
|
||||
let mut parent_update = Vec::new();
|
||||
while let Some([left, right]) = iter.next() {
|
||||
// If either left or right is null (unknown), we cannot compute the parent hash.
|
||||
// Note that if we are recompute a range of an existing tree,
|
||||
// we do not need to keep these possibly null parent. This is only saved
|
||||
// for the case of constructing a new tree from the leaves.
|
||||
let parent = if *left == E::null() || *right == E::null() {
|
||||
E::null()
|
||||
for chunk_iter in &iter {
|
||||
let chunk: Vec<_> = chunk_iter.collect();
|
||||
if chunk.len() == 2 {
|
||||
let left = &chunk[0];
|
||||
let right = &chunk[1];
|
||||
// If either left or right is null (unknown), we cannot compute the parent hash.
|
||||
// Note that if we are recompute a range of an existing tree,
|
||||
// we do not need to keep these possibly null parent. This is only saved
|
||||
// for the case of constructing a new tree from the leaves.
|
||||
let parent = if *left == E::null() || *right == E::null() {
|
||||
E::null()
|
||||
} else {
|
||||
A::parent(left, right)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
i += 1;
|
||||
} else {
|
||||
A::parent(left, right)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
i += 1;
|
||||
}
|
||||
if let [r] = iter.remainder() {
|
||||
// Same as above.
|
||||
let parent = if *r == E::null() {
|
||||
E::null()
|
||||
} else {
|
||||
A::parent_single(r, height + self.leaf_height)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
assert_eq!(chunk.len(), 1);
|
||||
let r = &chunk[0];
|
||||
// Same as above.
|
||||
let parent = if *r == E::null() {
|
||||
E::null()
|
||||
} else {
|
||||
A::parent_single(r, height + self.leaf_height)
|
||||
};
|
||||
parent_update.push((next_layer_start_index + i, parent));
|
||||
}
|
||||
}
|
||||
if !parent_update.is_empty() {
|
||||
self.before_extend_layer(height + 1);
|
||||
@ -449,27 +483,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// we can just overwrite `last_changed_parent_index` with new values.
|
||||
let mut last_changed_parent_index = None;
|
||||
for (parent_index, parent) in parent_update {
|
||||
match parent_index.cmp(&self.layers[height + 1].len()) {
|
||||
match parent_index.cmp(&self.layer_len(height + 1)) {
|
||||
Ordering::Less => {
|
||||
// We do not overwrite with null.
|
||||
if parent != E::null() {
|
||||
if self.layers[height + 1][parent_index] == E::null()
|
||||
if self.node(height + 1, parent_index) == E::null()
|
||||
// The last node in a layer can be updated.
|
||||
|| (self.layers[height + 1][parent_index] != parent
|
||||
&& parent_index == self.layers[height + 1].len() - 1)
|
||||
|| (self.node(height + 1, parent_index) != parent
|
||||
&& parent_index == self.layer_len(height + 1) - 1)
|
||||
{
|
||||
self.layers[height + 1][parent_index] = parent;
|
||||
self.update_node(height + 1, parent_index, parent);
|
||||
last_changed_parent_index = Some(parent_index);
|
||||
} else if self.layers[height + 1][parent_index] != parent {
|
||||
} else if self.node(height + 1, parent_index) != parent {
|
||||
// Recompute changes a node in the middle. This should be impossible
|
||||
// if the inputs are valid.
|
||||
panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}",
|
||||
height + 1, parent_index, self.layers[height + 1][parent_index], parent);
|
||||
height + 1, parent_index, self.node(height + 1, parent_index), parent);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ordering::Equal => {
|
||||
self.layers[height + 1].push(parent);
|
||||
self.push_node(height + 1, parent);
|
||||
last_changed_parent_index = Some(parent_index);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
@ -500,10 +534,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
for height in 0..(subtree_depth - 1) {
|
||||
self.before_extend_layer(height);
|
||||
let subtree_layer_size = 1 << (subtree_depth - 1 - height);
|
||||
self.layers[height].append(&mut vec![E::null(); subtree_layer_size]);
|
||||
self.append_nodes(height, &vec![E::null(); subtree_layer_size]);
|
||||
}
|
||||
self.before_extend_layer(subtree_depth - 1);
|
||||
self.layers[subtree_depth - 1].push(subtree_root);
|
||||
self.push_node(subtree_depth - 1, subtree_root);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -514,23 +548,45 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> {
|
||||
if self.layers[0].is_empty() {
|
||||
if self.layer_len(0) == 0 {
|
||||
// Any previous state of an empty tree is always empty.
|
||||
return Ok(());
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let delta_nodes = self
|
||||
.delta_nodes_map
|
||||
.get(&tx_seq)
|
||||
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?;
|
||||
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
|
||||
.clone();
|
||||
// Dropping the upper layers that are not in the old merkle tree.
|
||||
self.layers.truncate(delta_nodes.right_most_nodes.len());
|
||||
for height in (delta_nodes.right_most_nodes.len()..self.height()).rev() {
|
||||
self.node_manager.truncate_layer(height);
|
||||
}
|
||||
for (height, (last_index, right_most_node)) in
|
||||
delta_nodes.right_most_nodes.iter().enumerate()
|
||||
{
|
||||
self.layers[height].truncate(*last_index + 1);
|
||||
self.layers[height][*last_index] = right_most_node.clone();
|
||||
self.node_manager.truncate_nodes(height, *last_index + 1);
|
||||
self.update_node(height, *last_index, right_most_node.clone())
|
||||
}
|
||||
self.clear_after(tx_seq);
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Revert to a tx_seq not in `delta_nodes_map`.
|
||||
// This is needed to revert the last unfinished tx after restart.
|
||||
pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> {
|
||||
self.node_manager.start_transaction();
|
||||
for height in (0..self.height()).rev() {
|
||||
let kept_nodes = leaves >> height;
|
||||
if kept_nodes == 0 {
|
||||
self.node_manager.truncate_layer(height);
|
||||
} else {
|
||||
self.node_manager.truncate_nodes(height, kept_nodes + 1);
|
||||
}
|
||||
}
|
||||
self.recompute_after_append_leaves(leaves);
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -550,17 +606,25 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
bail!("empty tree");
|
||||
}
|
||||
Ok(HistoryTree {
|
||||
layers: &self.layers,
|
||||
node_manager: &self.node_manager,
|
||||
delta_nodes,
|
||||
leaf_height: self.leaf_height,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.layers = match self.min_depth {
|
||||
None => vec![vec![]],
|
||||
Some(depth) => vec![vec![]; depth],
|
||||
};
|
||||
self.node_manager.start_transaction();
|
||||
for height in (0..self.height()).rev() {
|
||||
self.node_manager.truncate_layer(height);
|
||||
}
|
||||
if let Some(depth) = self.min_depth {
|
||||
for _ in 0..depth {
|
||||
self.node_manager.add_layer();
|
||||
}
|
||||
} else {
|
||||
self.node_manager.add_layer();
|
||||
}
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
fn clear_after(&mut self, tx_seq: u64) {
|
||||
@ -580,10 +644,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
fn first_known_root_at(&self, index: usize) -> (usize, E) {
|
||||
let mut height = 0;
|
||||
let mut index_in_layer = index;
|
||||
while height < self.layers.len() {
|
||||
while height < self.height() {
|
||||
let node = self.node(height, index_in_layer);
|
||||
if !node.is_null() {
|
||||
return (height + 1, node.clone());
|
||||
return (height + 1, node);
|
||||
}
|
||||
height += 1;
|
||||
index_in_layer /= 2;
|
||||
@ -628,7 +692,7 @@ impl<E: HashElement> DeltaNodes<E> {
|
||||
|
||||
pub struct HistoryTree<'m, E: HashElement> {
|
||||
/// A reference to the global tree nodes.
|
||||
layers: &'m Vec<Vec<E>>,
|
||||
node_manager: &'m NodeManager<E>,
|
||||
/// The delta nodes that are difference from `layers`.
|
||||
/// This could be a reference, we just take ownership for convenience.
|
||||
delta_nodes: &'m DeltaNodes<E>,
|
||||
@ -639,16 +703,18 @@ pub struct HistoryTree<'m, E: HashElement> {
|
||||
impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A> {
|
||||
type E = E;
|
||||
|
||||
fn node(&self, layer: usize, index: usize) -> &Self::E {
|
||||
&self.layers[layer][index]
|
||||
fn node(&self, layer: usize, index: usize) -> Self::E {
|
||||
self.node_manager
|
||||
.get_node(layer, index)
|
||||
.expect("index checked")
|
||||
}
|
||||
|
||||
fn height(&self) -> usize {
|
||||
self.layers.len()
|
||||
self.node_manager.num_layers()
|
||||
}
|
||||
|
||||
fn layer_len(&self, layer_height: usize) -> usize {
|
||||
self.layers[layer_height].len()
|
||||
self.node_manager.layer_size(layer_height)
|
||||
}
|
||||
|
||||
fn padding_node(&self, height: usize) -> Self::E {
|
||||
@ -658,10 +724,13 @@ impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A>
|
||||
|
||||
impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
|
||||
type E = E;
|
||||
fn node(&self, layer: usize, index: usize) -> &Self::E {
|
||||
fn node(&self, layer: usize, index: usize) -> Self::E {
|
||||
match self.delta_nodes.get(layer, index).expect("range checked") {
|
||||
Some(node) if *node != E::null() => node,
|
||||
_ => &self.layers[layer][index],
|
||||
Some(node) if *node != E::null() => node.clone(),
|
||||
_ => self
|
||||
.node_manager
|
||||
.get_node(layer, index)
|
||||
.expect("index checked"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -678,6 +747,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: HashElement, A: Algorithm<E>> MerkleTreeWrite for AppendMerkleTree<E, A> {
|
||||
type E = E;
|
||||
|
||||
fn push_node(&mut self, layer: usize, node: Self::E) {
|
||||
self.node_manager.push_node(layer, node);
|
||||
}
|
||||
|
||||
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) {
|
||||
self.node_manager.append_nodes(layer, nodes);
|
||||
}
|
||||
|
||||
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) {
|
||||
self.node_manager.add_node(layer, pos, node);
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! ensure_eq {
|
||||
($given:expr, $expected:expr) => {
|
||||
@ -699,6 +784,7 @@ macro_rules! ensure_eq {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::merkle_tree::MerkleTreeRead;
|
||||
|
||||
use crate::sha3::Sha3Algorithm;
|
||||
use crate::AppendMerkleTree;
|
||||
use ethereum_types::H256;
|
||||
|
@ -49,7 +49,7 @@ pub trait Algorithm<E: HashElement> {
|
||||
|
||||
pub trait MerkleTreeRead {
|
||||
type E: HashElement;
|
||||
fn node(&self, layer: usize, index: usize) -> &Self::E;
|
||||
fn node(&self, layer: usize, index: usize) -> Self::E;
|
||||
fn height(&self) -> usize;
|
||||
fn layer_len(&self, layer_height: usize) -> usize;
|
||||
fn padding_node(&self, height: usize) -> Self::E;
|
||||
@ -58,7 +58,7 @@ pub trait MerkleTreeRead {
|
||||
self.layer_len(0)
|
||||
}
|
||||
|
||||
fn root(&self) -> &Self::E {
|
||||
fn root(&self) -> Self::E {
|
||||
self.node(self.height() - 1, 0)
|
||||
}
|
||||
|
||||
@ -70,16 +70,16 @@ pub trait MerkleTreeRead {
|
||||
self.leaves()
|
||||
);
|
||||
}
|
||||
if self.node(0, leaf_index) == &Self::E::null() {
|
||||
if self.node(0, leaf_index) == Self::E::null() {
|
||||
bail!("Not ready to generate proof for leaf_index={}", leaf_index);
|
||||
}
|
||||
if self.height() == 1 {
|
||||
return Proof::new(vec![self.root().clone(), self.root().clone()], vec![]);
|
||||
return Proof::new(vec![self.root(), self.root().clone()], vec![]);
|
||||
}
|
||||
let mut lemma: Vec<Self::E> = Vec::with_capacity(self.height()); // path + root
|
||||
let mut path: Vec<bool> = Vec::with_capacity(self.height() - 2); // path - 1
|
||||
let mut index_in_layer = leaf_index;
|
||||
lemma.push(self.node(0, leaf_index).clone());
|
||||
lemma.push(self.node(0, leaf_index));
|
||||
for height in 0..(self.height() - 1) {
|
||||
trace!(
|
||||
"gen_proof: height={} index={} hash={:?}",
|
||||
@ -93,15 +93,15 @@ pub trait MerkleTreeRead {
|
||||
// TODO: This can be skipped if the tree size is available in validation.
|
||||
lemma.push(self.padding_node(height));
|
||||
} else {
|
||||
lemma.push(self.node(height, index_in_layer + 1).clone());
|
||||
lemma.push(self.node(height, index_in_layer + 1));
|
||||
}
|
||||
} else {
|
||||
path.push(false);
|
||||
lemma.push(self.node(height, index_in_layer - 1).clone());
|
||||
lemma.push(self.node(height, index_in_layer - 1));
|
||||
}
|
||||
index_in_layer >>= 1;
|
||||
}
|
||||
lemma.push(self.root().clone());
|
||||
lemma.push(self.root());
|
||||
if lemma.contains(&Self::E::null()) {
|
||||
bail!(
|
||||
"Not enough data to generate proof, lemma={:?} path={:?}",
|
||||
@ -130,6 +130,13 @@ pub trait MerkleTreeRead {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait MerkleTreeWrite {
|
||||
type E: HashElement;
|
||||
fn push_node(&mut self, layer: usize, node: Self::E);
|
||||
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]);
|
||||
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E);
|
||||
}
|
||||
|
||||
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes
|
||||
/// are `null`. Other intermediate nodes will be computed based on these known nodes.
|
||||
pub struct MerkleTreeInitialData<E: HashElement> {
|
||||
|
219
common/append_merkle/src/node_manager.rs
Normal file
219
common/append_merkle/src/node_manager.rs
Normal file
@ -0,0 +1,219 @@
|
||||
use crate::HashElement;
|
||||
use anyhow::Result;
|
||||
use lru::LruCache;
|
||||
use std::any::Any;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::error;
|
||||
|
||||
pub struct NodeManager<E: HashElement> {
|
||||
cache: LruCache<(usize, usize), E>,
|
||||
layer_size: Vec<usize>,
|
||||
db: Arc<dyn NodeDatabase<E>>,
|
||||
db_tx: Option<Box<dyn NodeTransaction<E>>>,
|
||||
}
|
||||
|
||||
impl<E: HashElement> NodeManager<E> {
|
||||
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Result<Self> {
|
||||
let mut layer = 0;
|
||||
let mut layer_size = Vec::new();
|
||||
while let Some(size) = db.get_layer_size(layer)? {
|
||||
layer_size.push(size);
|
||||
layer += 1;
|
||||
}
|
||||
Ok(Self {
|
||||
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||
layer_size,
|
||||
db,
|
||||
db_tx: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_dummy() -> Self {
|
||||
Self {
|
||||
cache: LruCache::unbounded(),
|
||||
layer_size: vec![],
|
||||
db: Arc::new(EmptyNodeDatabase {}),
|
||||
db_tx: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_node(&mut self, layer: usize, node: E) {
|
||||
self.add_node(layer, self.layer_size[layer], node);
|
||||
self.set_layer_size(layer, self.layer_size[layer] + 1);
|
||||
}
|
||||
|
||||
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
|
||||
let mut pos = self.layer_size[layer];
|
||||
let mut saved_nodes = Vec::with_capacity(nodes.len());
|
||||
for node in nodes {
|
||||
self.cache.put((layer, pos), node.clone());
|
||||
saved_nodes.push((layer, pos, node));
|
||||
pos += 1;
|
||||
}
|
||||
self.set_layer_size(layer, pos);
|
||||
self.db_tx().save_node_list(&saved_nodes);
|
||||
}
|
||||
|
||||
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
||||
match self.cache.peek(&(layer, pos)) {
|
||||
Some(node) => Some(node.clone()),
|
||||
None => self.db.get_node(layer, pos).unwrap_or_else(|e| {
|
||||
error!("Failed to get node: {}", e);
|
||||
None
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator<E> {
|
||||
NodeIterator {
|
||||
node_manager: self,
|
||||
layer,
|
||||
start_pos,
|
||||
end_pos,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
||||
// No need to insert if the value is unchanged.
|
||||
if self.cache.get(&(layer, pos)) != Some(&node) {
|
||||
self.db_tx().save_node(layer, pos, &node);
|
||||
self.cache.put((layer, pos), node);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_layer(&mut self) {
|
||||
self.layer_size.push(0);
|
||||
let layer = self.layer_size.len() - 1;
|
||||
self.db_tx().save_layer_size(layer, 0);
|
||||
}
|
||||
|
||||
pub fn layer_size(&self, layer: usize) -> usize {
|
||||
self.layer_size[layer]
|
||||
}
|
||||
|
||||
pub fn num_layers(&self) -> usize {
|
||||
self.layer_size.len()
|
||||
}
|
||||
|
||||
pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) {
|
||||
let mut removed_nodes = Vec::new();
|
||||
for pos in pos_end..self.layer_size[layer] {
|
||||
self.cache.pop(&(layer, pos));
|
||||
removed_nodes.push((layer, pos));
|
||||
}
|
||||
self.db_tx().remove_node_list(&removed_nodes);
|
||||
self.set_layer_size(layer, pos_end);
|
||||
}
|
||||
|
||||
pub fn truncate_layer(&mut self, layer: usize) {
|
||||
self.truncate_nodes(layer, 0);
|
||||
if layer == self.num_layers() - 1 {
|
||||
self.layer_size.pop();
|
||||
self.db_tx().remove_layer_size(layer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_transaction(&mut self) {
|
||||
if self.db_tx.is_some() {
|
||||
error!("start new tx before commit");
|
||||
panic!("start new tx before commit");
|
||||
}
|
||||
self.db_tx = Some(self.db.start_transaction());
|
||||
}
|
||||
|
||||
pub fn commit(&mut self) {
|
||||
let tx = match self.db_tx.take() {
|
||||
Some(tx) => tx,
|
||||
None => {
|
||||
error!("db_tx is None");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(e) = self.db.commit(tx) {
|
||||
error!("Failed to commit db transaction: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
fn db_tx(&mut self) -> &mut dyn NodeTransaction<E> {
|
||||
(*self.db_tx.as_mut().expect("tx checked")).as_mut()
|
||||
}
|
||||
|
||||
fn set_layer_size(&mut self, layer: usize, size: usize) {
|
||||
self.layer_size[layer] = size;
|
||||
self.db_tx().save_layer_size(layer, size);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NodeIterator<'a, E: HashElement> {
|
||||
node_manager: &'a NodeManager<E>,
|
||||
layer: usize,
|
||||
start_pos: usize,
|
||||
end_pos: usize,
|
||||
}
|
||||
|
||||
impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
|
||||
type Item = E;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.start_pos < self.end_pos {
|
||||
let r = self.node_manager.get_node(self.layer, self.start_pos);
|
||||
self.start_pos += 1;
|
||||
r
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
||||
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>>;
|
||||
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>>;
|
||||
fn commit(&self, tx: Box<dyn NodeTransaction<E>>) -> Result<()>;
|
||||
}
|
||||
|
||||
pub trait NodeTransaction<E: HashElement>: Send + Sync {
|
||||
fn save_node(&mut self, layer: usize, pos: usize, node: &E);
|
||||
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
||||
fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]);
|
||||
fn remove_node_list(&mut self, nodes: &[(usize, usize)]);
|
||||
fn save_layer_size(&mut self, layer: usize, size: usize);
|
||||
fn remove_layer_size(&mut self, layer: usize);
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any>;
|
||||
}
|
||||
|
||||
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
||||
pub struct EmptyNodeDatabase {}
|
||||
pub struct EmptyNodeTransaction {}
|
||||
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
||||
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
||||
Ok(None)
|
||||
}
|
||||
fn get_layer_size(&self, _layer: usize) -> Result<Option<usize>> {
|
||||
Ok(None)
|
||||
}
|
||||
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>> {
|
||||
Box::new(EmptyNodeTransaction {})
|
||||
}
|
||||
fn commit(&self, _tx: Box<dyn NodeTransaction<E>>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: HashElement> NodeTransaction<E> for EmptyNodeTransaction {
|
||||
fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {}
|
||||
|
||||
fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {}
|
||||
|
||||
fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {}
|
||||
|
||||
fn save_layer_size(&mut self, _layer: usize, _size: usize) {}
|
||||
|
||||
fn remove_layer_size(&mut self, _layer: usize) {}
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||
self
|
||||
}
|
||||
}
|
@ -9,5 +9,5 @@ exit-future = "0.2.0"
|
||||
futures = "0.3.21"
|
||||
lazy_static = "1.4.0"
|
||||
lighthouse_metrics = { path = "../lighthouse_metrics" }
|
||||
tokio = { version = "1.19.2", features = ["rt"] }
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
tracing = "0.1.35"
|
||||
|
@ -222,7 +222,7 @@ impl LogEntryFetcher {
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let provider = self.provider.clone();
|
||||
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, provider.clone());
|
||||
let contract = self.flow_contract();
|
||||
let log_page_size = self.log_page_size;
|
||||
|
||||
executor.spawn(
|
||||
@ -236,22 +236,29 @@ impl LogEntryFetcher {
|
||||
.filter;
|
||||
let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
|
||||
.with_page_size(log_page_size);
|
||||
debug!(
|
||||
info!(
|
||||
"start_recover starts, start={} end={}",
|
||||
start_block_number, end_block_number
|
||||
);
|
||||
let (mut block_hash_sent, mut block_number_sent) = (None, None);
|
||||
while let Some(maybe_log) = stream.next().await {
|
||||
match maybe_log {
|
||||
Ok(log) => {
|
||||
let sync_progress =
|
||||
if log.block_hash.is_some() && log.block_number.is_some() {
|
||||
let synced_block = LogFetchProgress::SyncedBlock((
|
||||
log.block_number.unwrap().as_u64(),
|
||||
log.block_hash.unwrap(),
|
||||
None,
|
||||
));
|
||||
progress = log.block_number.unwrap().as_u64();
|
||||
Some(synced_block)
|
||||
if block_hash_sent != log.block_hash
|
||||
|| block_number_sent != log.block_number
|
||||
{
|
||||
let synced_block = LogFetchProgress::SyncedBlock((
|
||||
log.block_number.unwrap().as_u64(),
|
||||
log.block_hash.unwrap(),
|
||||
None,
|
||||
));
|
||||
progress = log.block_number.unwrap().as_u64();
|
||||
Some(synced_block)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@ -268,11 +275,17 @@ impl LogEntryFetcher {
|
||||
log.block_number.expect("block number exist").as_u64(),
|
||||
))
|
||||
.and_then(|_| match sync_progress {
|
||||
Some(b) => recover_tx.send(b),
|
||||
Some(b) => {
|
||||
recover_tx.send(b)?;
|
||||
block_hash_sent = log.block_hash;
|
||||
block_number_sent = log.block_number;
|
||||
Ok(())
|
||||
}
|
||||
None => Ok(()),
|
||||
})
|
||||
{
|
||||
error!("send error: e={:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@ -289,6 +302,8 @@ impl LogEntryFetcher {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("log recover end");
|
||||
},
|
||||
"log recover",
|
||||
);
|
||||
@ -305,7 +320,7 @@ impl LogEntryFetcher {
|
||||
mut watch_progress_rx: UnboundedReceiver<u64>,
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
||||
let contract = self.flow_contract();
|
||||
let provider = self.provider.clone();
|
||||
let confirmation_delay = self.confirmation_delay;
|
||||
let log_page_size = self.log_page_size;
|
||||
@ -583,6 +598,10 @@ impl LogEntryFetcher {
|
||||
pub fn provider(&self) -> &Provider<RetryClient<Http>> {
|
||||
self.provider.as_ref()
|
||||
}
|
||||
|
||||
pub fn flow_contract(&self) -> ZgsFlow<Provider<RetryClient<Http>>> {
|
||||
ZgsFlow::new(self.contract_address, self.provider.clone())
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_watch_process(
|
||||
@ -658,17 +677,24 @@ async fn check_watch_process(
|
||||
"get block hash for block {} from RPC, assume there is no org",
|
||||
*progress - 1
|
||||
);
|
||||
match provider.get_block(*progress - 1).await {
|
||||
Ok(Some(v)) => {
|
||||
break v.hash.expect("parent block hash expect exist");
|
||||
let hash = loop {
|
||||
match provider.get_block(*progress - 1).await {
|
||||
Ok(Some(v)) => {
|
||||
break v.hash.expect("parent block hash expect exist");
|
||||
}
|
||||
Ok(None) => {
|
||||
panic!("parent block {} expect exist", *progress - 1);
|
||||
}
|
||||
Err(e) => {
|
||||
if e.to_string().contains("server is too busy") {
|
||||
warn!("server busy, wait for parent block {}", *progress - 1);
|
||||
} else {
|
||||
panic!("parent block {} expect exist, error {}", *progress - 1, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
panic!("parent block {} expect exist", *progress - 1);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("parent block {} expect exist, error {}", *progress - 1, e);
|
||||
}
|
||||
}
|
||||
};
|
||||
break hash;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -510,6 +510,41 @@ impl LogSyncManager {
|
||||
}
|
||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||
self.next_tx_seq += 1;
|
||||
|
||||
// Check if the computed data root matches on-chain state.
|
||||
// If the call fails, we won't check the root here and return `true` directly.
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
@ -67,8 +67,8 @@ impl MinerConfig {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> {
|
||||
let provider = Arc::new(Provider::new(
|
||||
pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
|
||||
Ok(Arc::new(Provider::new(
|
||||
RetryClientBuilder::default()
|
||||
.rate_limit_retries(self.rate_limit_retries)
|
||||
.timeout_retries(self.timeout_retries)
|
||||
@ -78,7 +78,11 @@ impl MinerConfig {
|
||||
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
|
||||
Box::new(HttpRateLimitRetryPolicy),
|
||||
),
|
||||
));
|
||||
)))
|
||||
}
|
||||
|
||||
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
|
||||
let provider = self.make_provider()?;
|
||||
let chain_id = provider
|
||||
.get_chainid()
|
||||
.await
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use ethereum_types::H256;
|
||||
use ethers::prelude::{Http, Provider, RetryClient};
|
||||
use tokio::time::{sleep, Duration, Instant};
|
||||
|
||||
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
|
||||
@ -12,14 +13,14 @@ use storage_async::Store;
|
||||
use task_executor::TaskExecutor;
|
||||
use zgs_spec::SECTORS_PER_SEAL;
|
||||
|
||||
use crate::config::{MineServiceMiddleware, MinerConfig};
|
||||
use crate::config::MinerConfig;
|
||||
|
||||
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
|
||||
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
|
||||
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
|
||||
|
||||
pub struct Sealer {
|
||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||
store: Arc<Store>,
|
||||
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
|
||||
last_context_flow_length: u64,
|
||||
@ -29,7 +30,7 @@ pub struct Sealer {
|
||||
impl Sealer {
|
||||
pub fn spawn(
|
||||
executor: TaskExecutor,
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
store: Arc<Store>,
|
||||
config: &MinerConfig,
|
||||
miner_id: H256,
|
||||
|
@ -33,11 +33,13 @@ impl MineService {
|
||||
config: MinerConfig,
|
||||
store: Arc<Store>,
|
||||
) -> Result<broadcast::Sender<MinerMessage>, String> {
|
||||
let provider = Arc::new(config.make_provider().await?);
|
||||
let provider = config.make_provider()?;
|
||||
let signing_provider = Arc::new(config.make_signing_provider().await?);
|
||||
|
||||
let (msg_send, msg_recv) = broadcast::channel(1024);
|
||||
|
||||
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
|
||||
let miner_id =
|
||||
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
|
||||
debug!("miner id setting complete.");
|
||||
|
||||
let mine_context_receiver = MineContextWatcher::spawn(
|
||||
@ -61,6 +63,7 @@ impl MineService {
|
||||
mine_answer_receiver,
|
||||
mine_context_receiver,
|
||||
provider.clone(),
|
||||
signing_provider,
|
||||
store.clone(),
|
||||
&config,
|
||||
);
|
||||
|
@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
|
||||
use contract_interface::{PoraMine, ZgsFlow};
|
||||
use ethereum_types::U256;
|
||||
use ethers::contract::ContractCall;
|
||||
use ethers::prelude::{Http, Provider, RetryClient};
|
||||
use ethers::providers::PendingTransaction;
|
||||
use hex::ToHex;
|
||||
use shared_types::FlowRangeProof;
|
||||
@ -24,7 +25,7 @@ pub struct Submitter {
|
||||
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
||||
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
||||
mine_contract: PoraMine<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||
default_gas_limit: Option<U256>,
|
||||
store: Arc<Store>,
|
||||
}
|
||||
@ -34,11 +35,12 @@ impl Submitter {
|
||||
executor: TaskExecutor,
|
||||
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
||||
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
signing_provider: Arc<MineServiceMiddleware>,
|
||||
store: Arc<Store>,
|
||||
config: &MinerConfig,
|
||||
) {
|
||||
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
|
||||
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
|
||||
let flow_contract = ZgsFlow::new(config.flow_address, provider);
|
||||
let default_gas_limit = config.submission_gas;
|
||||
|
||||
|
@ -14,13 +14,13 @@ use tokio::{
|
||||
try_join,
|
||||
};
|
||||
|
||||
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
|
||||
use ethers::prelude::{Http, RetryClient};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{ops::DerefMut, str::FromStr};
|
||||
|
||||
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
|
||||
|
||||
pub type MineContextMessage = Option<PoraPuzzle>;
|
||||
|
||||
lazy_static! {
|
||||
@ -29,9 +29,9 @@ lazy_static! {
|
||||
}
|
||||
|
||||
pub struct MineContextWatcher {
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
||||
mine_contract: PoraMine<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||
mine_contract: PoraMine<Provider<RetryClient<Http>>>,
|
||||
|
||||
mine_context_sender: broadcast::Sender<MineContextMessage>,
|
||||
last_report: MineContextMessage,
|
||||
@ -44,7 +44,7 @@ impl MineContextWatcher {
|
||||
pub fn spawn(
|
||||
executor: TaskExecutor,
|
||||
msg_recv: broadcast::Receiver<MinerMessage>,
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
config: &MinerConfig,
|
||||
) -> broadcast::Receiver<MineContextMessage> {
|
||||
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
|
||||
|
@ -10,7 +10,7 @@ mod service;
|
||||
use duration_str::deserialize_duration;
|
||||
use network::Multiaddr;
|
||||
use serde::Deserialize;
|
||||
use std::time::Duration;
|
||||
use std::{net::IpAddr, time::Duration};
|
||||
|
||||
pub use crate::service::RouterService;
|
||||
|
||||
@ -26,6 +26,7 @@ pub struct Config {
|
||||
pub libp2p_nodes: Vec<Multiaddr>,
|
||||
pub private_ip_enabled: bool,
|
||||
pub check_announced_ip: bool,
|
||||
pub public_address: Option<IpAddr>,
|
||||
|
||||
// batcher
|
||||
/// Timeout to publish messages in batch
|
||||
@ -47,6 +48,7 @@ impl Default for Config {
|
||||
libp2p_nodes: vec![],
|
||||
private_ip_enabled: false,
|
||||
check_announced_ip: false,
|
||||
public_address: None,
|
||||
|
||||
batcher_timeout: Duration::from_secs(1),
|
||||
batcher_file_capacity: 1,
|
||||
|
@ -348,17 +348,26 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> {
|
||||
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
|
||||
// public address configured
|
||||
if let Some(ip) = self.config.public_address {
|
||||
let mut addr = Multiaddr::empty();
|
||||
addr.push(ip.into());
|
||||
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
|
||||
return Some(addr);
|
||||
}
|
||||
|
||||
// public listen address
|
||||
if let Some(addr) = self.get_listen_addr() {
|
||||
return Some(addr);
|
||||
}
|
||||
|
||||
// auto detect public IP address
|
||||
let ipv4_addr = public_ip::addr_v4().await?;
|
||||
|
||||
let mut addr = Multiaddr::empty();
|
||||
addr.push(Protocol::Ip4(ipv4_addr));
|
||||
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
|
||||
addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
|
||||
|
||||
self.network_globals
|
||||
.listen_multiaddrs
|
||||
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
|
||||
|
||||
let peer_id = *self.network_globals.peer_id.read();
|
||||
|
||||
let addr = self.get_listen_addr_or_add().await?;
|
||||
let addr = self.construct_announced_ip().await?;
|
||||
|
||||
let timestamp = timestamp_now();
|
||||
let shard_config = self.store.get_store().get_shard_config();
|
||||
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
|
||||
shard_config: ShardConfig,
|
||||
) -> Option<PubsubMessage> {
|
||||
let peer_id = *self.network_globals.peer_id.read();
|
||||
let addr = self.get_listen_addr_or_add().await?;
|
||||
let addr = self.construct_announced_ip().await?;
|
||||
let timestamp = timestamp_now();
|
||||
|
||||
let msg = AnnounceShardConfig {
|
||||
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
|
||||
index_end: u64,
|
||||
) -> Option<PubsubMessage> {
|
||||
let peer_id = *self.network_globals.peer_id.read();
|
||||
let addr = self.get_listen_addr_or_add().await?;
|
||||
let addr = self.construct_announced_ip().await?;
|
||||
let timestamp = timestamp_now();
|
||||
|
||||
let msg = AnnounceChunks {
|
||||
|
@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::proc_macros::rpc;
|
||||
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::{config::ShardConfig, H256};
|
||||
|
||||
#[rpc(server, client, namespace = "zgs")]
|
||||
pub trait Rpc {
|
||||
@ -77,4 +77,7 @@ pub trait Rpc {
|
||||
sector_index: u64,
|
||||
flow_root: Option<DataRoot>,
|
||||
) -> RpcResult<FlowProof>;
|
||||
|
||||
#[method(name = "getFlowContext")]
|
||||
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
|
||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||
use std::fmt::{Debug, Formatter, Result};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::try_option;
|
||||
use storage::{try_option, H256};
|
||||
|
||||
pub struct RpcServerImpl {
|
||||
pub ctx: Context,
|
||||
@ -198,6 +198,10 @@ impl RpcServer for RpcServerImpl {
|
||||
assert_eq!(proof.left_proof, proof.right_proof);
|
||||
Ok(proof.right_proof)
|
||||
}
|
||||
|
||||
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
|
||||
Ok(self.ctx.log_store.get_context().await?)
|
||||
}
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
|
@ -2,7 +2,7 @@ use super::{Client, RuntimeContext};
|
||||
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
|
||||
use file_location_cache::FileLocationCache;
|
||||
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
||||
use miner::{MineService, MinerConfig, MinerMessage};
|
||||
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
|
||||
use network::{
|
||||
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
|
||||
Service as LibP2PService,
|
||||
@ -112,7 +112,7 @@ impl ClientBuilder {
|
||||
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
||||
let store = Arc::new(
|
||||
LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
|
||||
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||
);
|
||||
|
||||
@ -216,6 +216,16 @@ impl ClientBuilder {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
|
||||
self.async_store
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.update_shard_config(config)
|
||||
.await;
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Starts the networking stack.
|
||||
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
|
||||
let executor = require!("router", self, runtime_context).clone().executor;
|
||||
|
@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion};
|
||||
use std::net::IpAddr;
|
||||
use std::time::Duration;
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::log_manager::LogConfig;
|
||||
use storage::StorageConfig;
|
||||
|
||||
impl ZgsConfig {
|
||||
@ -101,8 +102,11 @@ impl ZgsConfig {
|
||||
}
|
||||
|
||||
pub fn storage_config(&self) -> Result<StorageConfig, String> {
|
||||
let mut log_config = LogConfig::default();
|
||||
log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity;
|
||||
Ok(StorageConfig {
|
||||
db_dir: self.db_dir.clone().into(),
|
||||
log_config,
|
||||
})
|
||||
}
|
||||
|
||||
@ -200,6 +204,13 @@ impl ZgsConfig {
|
||||
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
|
||||
let mut router_config = self.router.clone();
|
||||
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
|
||||
|
||||
if router_config.public_address.is_none() {
|
||||
if let Some(addr) = &self.network_enr_address {
|
||||
router_config.public_address = Some(addr.parse().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(router_config)
|
||||
}
|
||||
|
||||
@ -228,7 +239,7 @@ impl ZgsConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn shard_config(&self) -> Result<ShardConfig, String> {
|
||||
pub fn shard_config(&self) -> Result<ShardConfig, String> {
|
||||
self.shard_position.clone().try_into()
|
||||
}
|
||||
}
|
||||
|
@ -60,6 +60,7 @@ build_config! {
|
||||
(prune_check_time_s, (u64), 60)
|
||||
(prune_batch_size, (usize), 16 * 1024)
|
||||
(prune_batch_wait_time_ms, (u64), 1000)
|
||||
(merkle_node_cache_capacity, (usize), 32 * 1024 * 1024)
|
||||
|
||||
// misc
|
||||
(log_config_file, (String), "log_config".to_string())
|
||||
|
@ -17,6 +17,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
let miner_config = config.mine_config()?;
|
||||
let router_config = config.router_config(&network_config)?;
|
||||
let pruner_config = config.pruner_config()?;
|
||||
let shard_config = config.shard_config()?;
|
||||
|
||||
ClientBuilder::default()
|
||||
.with_runtime_context(context)
|
||||
@ -30,6 +31,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
.await?
|
||||
.with_miner(miner_config)
|
||||
.await?
|
||||
.with_shard(shard_config)
|
||||
.await?
|
||||
.with_pruner(pruner_config)
|
||||
.await?
|
||||
.with_rpc(config.rpc, config.chunk_pool_config()?)
|
||||
|
@ -29,7 +29,7 @@ itertools = "0.13.0"
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
parking_lot = "0.12.3"
|
||||
serde_json = "1.0.127"
|
||||
tokio = { version = "1.10.0", features = ["sync"] }
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -1,3 +1,4 @@
|
||||
use crate::log_store::log_manager::LogConfig;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr};
|
||||
@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config";
|
||||
#[derive(Clone)]
|
||||
pub struct Config {
|
||||
pub db_dir: PathBuf,
|
||||
pub log_config: LogConfig,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)]
|
||||
|
@ -9,9 +9,11 @@ use crate::log_store::log_manager::{
|
||||
};
|
||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
use any::Any;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||
use itertools::Itertools;
|
||||
use kvdb::DBTransaction;
|
||||
use parking_lot::RwLock;
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||
use ssz::{Decode, Encode};
|
||||
@ -20,20 +22,20 @@ use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::{cmp, mem};
|
||||
use std::{any, cmp, mem};
|
||||
use tracing::{debug, error, trace};
|
||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||
|
||||
pub struct FlowStore {
|
||||
db: FlowDBStore,
|
||||
db: Arc<FlowDBStore>,
|
||||
seal_manager: SealTaskManager,
|
||||
config: FlowConfig,
|
||||
}
|
||||
|
||||
impl FlowStore {
|
||||
pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
|
||||
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||
Self {
|
||||
db: FlowDBStore::new(db),
|
||||
db,
|
||||
seal_manager: Default::default(),
|
||||
config,
|
||||
}
|
||||
@ -93,6 +95,7 @@ impl FlowStore {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FlowConfig {
|
||||
pub batch_size: usize,
|
||||
pub merkle_node_cache_capacity: usize,
|
||||
pub shard_config: Arc<RwLock<ShardConfig>>,
|
||||
}
|
||||
|
||||
@ -100,6 +103,8 @@ impl Default for FlowConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
batch_size: SECTORS_PER_LOAD,
|
||||
// Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size.
|
||||
merkle_node_cache_capacity: 32 * 1024 * 1024,
|
||||
shard_config: Default::default(),
|
||||
}
|
||||
}
|
||||
@ -436,7 +441,7 @@ impl FlowDBStore {
|
||||
let mut expected_index = 0;
|
||||
|
||||
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
||||
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||
|
||||
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
||||
let (index_bytes, root_bytes) = r?;
|
||||
@ -666,3 +671,84 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
||||
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
||||
Ok((layer_index, position))
|
||||
}
|
||||
|
||||
fn layer_size_key(layer: usize) -> Vec<u8> {
|
||||
let mut key = "layer_size".as_bytes().to_vec();
|
||||
key.extend_from_slice(&layer.to_be_bytes());
|
||||
key
|
||||
}
|
||||
|
||||
pub struct NodeDBTransaction(DBTransaction);
|
||||
|
||||
impl NodeDatabase<DataRoot> for FlowDBStore {
|
||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
||||
Ok(self
|
||||
.kvdb
|
||||
.get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
|
||||
.map(|v| DataRoot::from_slice(&v)))
|
||||
}
|
||||
|
||||
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
|
||||
match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? {
|
||||
Some(v) => Ok(Some(try_decode_usize(&v)?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn start_transaction(&self) -> Box<dyn NodeTransaction<DataRoot>> {
|
||||
Box::new(NodeDBTransaction(self.kvdb.transaction()))
|
||||
}
|
||||
|
||||
fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
|
||||
let db_tx: Box<NodeDBTransaction> = tx
|
||||
.into_any()
|
||||
.downcast()
|
||||
.map_err(|e| anyhow!("downcast failed, e={:?}", e))?;
|
||||
self.kvdb.write(db_tx.0).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeTransaction<DataRoot> for NodeDBTransaction {
|
||||
fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) {
|
||||
self.0.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(layer, pos),
|
||||
node.as_bytes(),
|
||||
);
|
||||
}
|
||||
|
||||
fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) {
|
||||
for (layer_index, position, data) in nodes {
|
||||
self.0.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(*layer_index, *position),
|
||||
data.as_bytes(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_node_list(&mut self, nodes: &[(usize, usize)]) {
|
||||
for (layer_index, position) in nodes {
|
||||
self.0.delete(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(*layer_index, *position),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn save_layer_size(&mut self, layer: usize, size: usize) {
|
||||
self.0.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&layer_size_key(layer),
|
||||
&size.to_be_bytes(),
|
||||
);
|
||||
}
|
||||
|
||||
fn remove_layer_size(&mut self, layer: usize) {
|
||||
self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer));
|
||||
}
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
@ -4,11 +4,10 @@ mod seal;
|
||||
mod serde;
|
||||
|
||||
use ::serde::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
|
||||
use anyhow::Result;
|
||||
use ethereum_types::H256;
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::cmp::min;
|
||||
|
||||
use crate::log_store::log_manager::data_to_merkle_leaves;
|
||||
use crate::try_option;
|
||||
@ -206,7 +205,7 @@ impl EntryBatch {
|
||||
}
|
||||
}
|
||||
Ok(Some(
|
||||
*try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
|
||||
try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
|
||||
))
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::config::ShardConfig;
|
||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
|
||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
|
||||
use crate::log_store::tx_store::TransactionStore;
|
||||
use crate::log_store::{
|
||||
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
|
||||
@ -94,6 +94,7 @@ impl MerkleManager {
|
||||
}
|
||||
|
||||
fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
|
||||
debug!("revert merkle tree {}", tx_seq);
|
||||
// Special case for reverting tx_seq == 0
|
||||
if tx_seq == u64::MAX {
|
||||
self.pora_chunks_merkle.reset();
|
||||
@ -116,7 +117,7 @@ impl MerkleManager {
|
||||
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
|
||||
self.last_chunk_merkle.append(H256::zero());
|
||||
self.pora_chunks_merkle
|
||||
.update_last(*self.last_chunk_merkle.root());
|
||||
.update_last(self.last_chunk_merkle.root());
|
||||
} else if self.last_chunk_merkle.leaves() != 0 {
|
||||
let last_chunk_start_index = self.last_chunk_start_index();
|
||||
let last_chunk_data = flow_store.get_available_entries(
|
||||
@ -355,7 +356,7 @@ impl LogStoreWrite for LogManager {
|
||||
merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
|
||||
merkle.try_initialize(&self.flow_store)?;
|
||||
assert_eq!(
|
||||
Some(*merkle.last_chunk_merkle.root()),
|
||||
Some(merkle.last_chunk_merkle.root()),
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
|
||||
@ -577,7 +578,7 @@ impl LogStoreRead for LogManager {
|
||||
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
|
||||
let merkle = self.merkle.read_recursive();
|
||||
Ok((
|
||||
*merkle.pora_chunks_merkle.root(),
|
||||
merkle.pora_chunks_merkle.root(),
|
||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
|
||||
))
|
||||
}
|
||||
@ -626,13 +627,10 @@ impl LogManager {
|
||||
executor: task_executor::TaskExecutor,
|
||||
) -> Result<Self> {
|
||||
let tx_store = TransactionStore::new(db.clone())?;
|
||||
let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
|
||||
let mut initial_data = flow_store.get_chunk_root_list()?;
|
||||
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
||||
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
||||
// and inserted later.
|
||||
let mut extra_leaves = Vec::new();
|
||||
|
||||
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
||||
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
||||
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||
// first and call `put_tx` later.
|
||||
let next_tx_seq = tx_store.next_tx_seq();
|
||||
let mut start_tx_seq = if next_tx_seq > 0 {
|
||||
Some(next_tx_seq - 1)
|
||||
@ -640,15 +638,25 @@ impl LogManager {
|
||||
None
|
||||
};
|
||||
let mut last_tx_to_insert = None;
|
||||
|
||||
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
||||
flow_db,
|
||||
config.flow.merkle_node_cache_capacity,
|
||||
log2_pow2(PORA_CHUNK_SIZE),
|
||||
)?;
|
||||
if let Some(last_tx_seq) = start_tx_seq {
|
||||
if !tx_store.check_tx_completed(last_tx_seq)? {
|
||||
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
||||
let last_tx = tx_store
|
||||
.get_tx_by_seq_number(last_tx_seq)?
|
||||
.expect("tx missing");
|
||||
let mut current_len = initial_data.leaves();
|
||||
let expected_len =
|
||||
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
||||
let current_len = pora_chunks_merkle.leaves();
|
||||
let expected_len = sector_to_segment(
|
||||
last_tx.start_entry_index
|
||||
+ last_tx.num_entries() as u64
|
||||
+ PORA_CHUNK_SIZE as u64
|
||||
- 1,
|
||||
);
|
||||
match expected_len.cmp(&(current_len)) {
|
||||
Ordering::Less => {
|
||||
bail!(
|
||||
@ -676,43 +684,33 @@ impl LogManager {
|
||||
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
||||
);
|
||||
if current_len > expected_len {
|
||||
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
|
||||
{
|
||||
current_len -= 1 << (subtree_depth - 1);
|
||||
if current_len == expected_len {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pora_chunks_merkle.revert_to_leaves(expected_len)?;
|
||||
} else {
|
||||
warn!(
|
||||
"revert last tx with no-op: {} {}",
|
||||
current_len, expected_len
|
||||
);
|
||||
assert_eq!(current_len, expected_len);
|
||||
}
|
||||
assert_eq!(current_len, expected_len);
|
||||
while let Some((index, h)) = initial_data.known_leaves.pop() {
|
||||
if index < current_len {
|
||||
initial_data.known_leaves.push((index, h));
|
||||
break;
|
||||
} else {
|
||||
extra_leaves.push((index, h));
|
||||
}
|
||||
}
|
||||
start_tx_seq = Some(last_tx_seq - 1);
|
||||
start_tx_seq = Some(previous_tx.seq);
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut pora_chunks_merkle =
|
||||
Merkle::new_with_subtrees(initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq)?;
|
||||
let last_chunk_merkle = match start_tx_seq {
|
||||
Some(tx_seq) => {
|
||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
||||
let tx = tx_store.get_tx_by_seq_number(tx_seq)?.expect("tx missing");
|
||||
if (tx.start_entry_index() + tx.num_entries() as u64) % PORA_CHUNK_SIZE as u64 == 0
|
||||
{
|
||||
// The last chunk should be aligned, so it's empty.
|
||||
Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None)
|
||||
} else {
|
||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves() - 1, tx_seq)?
|
||||
}
|
||||
}
|
||||
// Initialize
|
||||
None => Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None),
|
||||
None => {
|
||||
pora_chunks_merkle.reset();
|
||||
Merkle::new_with_depth(vec![], 1, None)
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
@ -722,10 +720,10 @@ impl LogManager {
|
||||
last_chunk_merkle.leaves(),
|
||||
);
|
||||
if last_chunk_merkle.leaves() != 0 {
|
||||
pora_chunks_merkle.append(*last_chunk_merkle.root());
|
||||
// update the merkle root
|
||||
pora_chunks_merkle.commit(start_tx_seq);
|
||||
pora_chunks_merkle.update_last(last_chunk_merkle.root());
|
||||
}
|
||||
// update the merkle root
|
||||
pora_chunks_merkle.commit(start_tx_seq);
|
||||
let merkle = RwLock::new(MerkleManager {
|
||||
pora_chunks_merkle,
|
||||
last_chunk_merkle,
|
||||
@ -744,23 +742,16 @@ impl LogManager {
|
||||
log_manager.start_receiver(receiver, executor);
|
||||
|
||||
if let Some(tx) = last_tx_to_insert {
|
||||
log_manager.revert_to(tx.seq - 1)?;
|
||||
log_manager.put_tx(tx)?;
|
||||
let mut merkle = log_manager.merkle.write();
|
||||
for (index, h) in extra_leaves {
|
||||
if index < merkle.pora_chunks_merkle.leaves() {
|
||||
merkle.pora_chunks_merkle.fill_leaf(index, h);
|
||||
} else {
|
||||
error!("out of range extra leaf: index={} hash={:?}", index, h);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert!(extra_leaves.is_empty());
|
||||
}
|
||||
log_manager
|
||||
.merkle
|
||||
.write()
|
||||
.try_initialize(&log_manager.flow_store)?;
|
||||
info!(
|
||||
"Log manager initialized, state={:?}",
|
||||
log_manager.get_context()?
|
||||
);
|
||||
Ok(log_manager)
|
||||
}
|
||||
|
||||
@ -788,7 +779,8 @@ impl LogManager {
|
||||
.unwrap();
|
||||
}
|
||||
std::result::Result::Err(_) => {
|
||||
error!("Receiver error");
|
||||
debug!("Log manager inner channel closed");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -889,16 +881,16 @@ impl LogManager {
|
||||
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.append_subtree(1, *merkle.last_chunk_merkle.root())?;
|
||||
.append_subtree(1, merkle.last_chunk_merkle.root())?;
|
||||
} else {
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
}
|
||||
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
||||
batch_root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(*merkle.last_chunk_merkle.root(), 1),
|
||||
(merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
self.complete_last_chunk_merkle(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
@ -954,7 +946,7 @@ impl LogManager {
|
||||
.append_list(data_to_merkle_leaves(&pad_data)?);
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
} else {
|
||||
if last_chunk_pad != 0 {
|
||||
is_full_empty = false;
|
||||
@ -964,10 +956,10 @@ impl LogManager {
|
||||
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(*merkle.last_chunk_merkle.root(), 1),
|
||||
(merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
||||
}
|
||||
@ -978,7 +970,7 @@ impl LogManager {
|
||||
let data = pad_data[start_index * ENTRY_SIZE
|
||||
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
|
||||
.to_vec();
|
||||
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
|
||||
let root = Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
|
||||
merkle.pora_chunks_merkle.append(root);
|
||||
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
|
||||
start_index += PORA_CHUNK_SIZE;
|
||||
@ -1056,7 +1048,7 @@ impl LogManager {
|
||||
}
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
}
|
||||
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
|
||||
for (chunk_index, chunk_root) in chunk_roots {
|
||||
|
@ -8,6 +8,7 @@ use ethereum_types::H256;
|
||||
use rand::random;
|
||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||
use std::cmp;
|
||||
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
|
||||
#[test]
|
||||
|
@ -292,6 +292,9 @@ impl TransactionStore {
|
||||
match tx.start_entry_index.cmp(&last_chunk_start_index) {
|
||||
cmp::Ordering::Greater => {
|
||||
tx_list.push((tx_seq, tx.merkle_nodes));
|
||||
if tx.start_entry_index >= last_chunk_start_index + PORA_CHUNK_SIZE as u64 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
cmp::Ordering::Equal => {
|
||||
tx_list.push((tx_seq, tx.merkle_nodes));
|
||||
@ -335,11 +338,7 @@ impl TransactionStore {
|
||||
}
|
||||
let mut merkle = if last_chunk_start_index == 0 {
|
||||
// The first entry hash is initialized as zero.
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
||||
vec![H256::zero()],
|
||||
log2_pow2(PORA_CHUNK_SIZE) + 1,
|
||||
None,
|
||||
)
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
|
||||
} else {
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
||||
vec![],
|
||||
|
@ -324,7 +324,7 @@ auto_sync_enabled = true
|
||||
# enabled = false
|
||||
|
||||
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
|
||||
# report_interval = ""
|
||||
# report_interval = "10s"
|
||||
|
||||
# File name to output metrics periodically.
|
||||
# file_report_output = ""
|
||||
|
@ -336,7 +336,7 @@ auto_sync_enabled = true
|
||||
# enabled = false
|
||||
|
||||
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
|
||||
# report_interval = ""
|
||||
# report_interval = "10s"
|
||||
|
||||
# File name to output metrics periodically.
|
||||
# file_report_output = ""
|
||||
|
@ -338,7 +338,7 @@
|
||||
# enabled = false
|
||||
|
||||
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
|
||||
# report_interval = ""
|
||||
# report_interval = "10s"
|
||||
|
||||
# File name to output metrics periodically.
|
||||
# file_report_output = ""
|
||||
|
@ -1 +1 @@
|
||||
75c251804a29ab22adced50d92478cf0baf834bc
|
||||
bea58429e436e4952ae69235d9079cfc4ac5f3b3
|
||||
|
@ -40,8 +40,8 @@
|
||||
"type": "function"
|
||||
}
|
||||
],
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
|
||||
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||
"linkReferences": {},
|
||||
"deployedLinkReferences": {}
|
||||
}
|
||||
|
@ -70,8 +70,8 @@
|
||||
"type": "function"
|
||||
}
|
||||
],
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
|
||||
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||
"linkReferences": {},
|
||||
"deployedLinkReferences": {}
|
||||
}
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
0
tests/crash_test.py
Normal file → Executable file
0
tests/crash_test.py
Normal file → Executable file
43
tests/node_cache_test.py
Executable file
43
tests/node_cache_test.py
Executable file
@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.submission import create_submission, submit_data
|
||||
from utility.utils import wait_until
|
||||
|
||||
|
||||
class NodeCacheTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.zgs_node_configs[0] = {
|
||||
"merkle_node_cache_capacity": 1024,
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
client = self.nodes[0]
|
||||
|
||||
chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
wait_until(lambda: self.contract.num_submissions() == 1)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||
|
||||
segment = submit_data(client, chunk_data)
|
||||
self.log.info("segment: %s", len(segment))
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
self.stop_storage_node(0)
|
||||
self.start_storage_node(0)
|
||||
self.nodes[0].wait_for_rpc_connection()
|
||||
|
||||
chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
wait_until(lambda: self.contract.num_submissions() == 2)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||
|
||||
segment = submit_data(client, chunk_data)
|
||||
self.log.info("segment: %s", len(segment))
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
NodeCacheTest().main()
|
55
tests/root_consistency_test.py
Executable file
55
tests/root_consistency_test.py
Executable file
@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python3
|
||||
from test_framework.test_framework import TestFramework
|
||||
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
|
||||
from utility.submission import create_submission, submit_data
|
||||
from utility.utils import wait_until, assert_equal
|
||||
from test_framework.blockchain_node import BlockChainNodeType
|
||||
|
||||
|
||||
class RootConsistencyTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_blockchain_nodes = 1
|
||||
self.num_nodes = 1
|
||||
|
||||
def submit_data(self, item, size):
|
||||
submissions_before = self.contract.num_submissions()
|
||||
client = self.nodes[0]
|
||||
chunk_data = item * 256 * size
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||
|
||||
segment = submit_data(client, chunk_data)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
def assert_flow_status(self, expected_length):
|
||||
contract_root = self.contract.get_flow_root().hex()
|
||||
contract_length = self.contract.get_flow_length()
|
||||
(node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
|
||||
|
||||
assert_equal(contract_length, node_length)
|
||||
assert_equal(contract_length, expected_length)
|
||||
assert_equal(contract_root, node_root[2:])
|
||||
|
||||
|
||||
|
||||
def run_test(self):
|
||||
self.assert_flow_status(1)
|
||||
|
||||
self.submit_data(b"\x11", 1)
|
||||
self.assert_flow_status(2)
|
||||
|
||||
self.submit_data(b"\x11", 8 + 4 + 2)
|
||||
self.assert_flow_status(16 + 4 + 2)
|
||||
|
||||
self.submit_data(b"\x12", 128 + 64)
|
||||
self.assert_flow_status(256 + 64)
|
||||
|
||||
self.submit_data(b"\x13", 512 + 256)
|
||||
self.assert_flow_status(1024 + 512 + 256)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
RootConsistencyTest().main()
|
@ -91,7 +91,12 @@ class FlowContractProxy(ContractProxy):
|
||||
|
||||
def get_mine_context(self, node_idx=0):
|
||||
return self._call("makeContextWithResult", node_idx)
|
||||
|
||||
def get_flow_root(self, node_idx=0):
|
||||
return self._call("computeFlowRoot", node_idx)
|
||||
|
||||
def get_flow_length(self, node_idx=0):
|
||||
return self._call("tree", node_idx)[0]
|
||||
|
||||
class MineContractProxy(ContractProxy):
|
||||
def last_mined_epoch(self, node_idx=0):
|
||||
|
@ -100,6 +100,9 @@ class ZgsNode(TestNode):
|
||||
|
||||
def zgs_get_file_info_by_tx_seq(self, tx_seq):
|
||||
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
|
||||
|
||||
def zgs_get_flow_context(self, tx_seq):
|
||||
return self.rpc.zgs_getFlowContext([tx_seq])
|
||||
|
||||
def shutdown(self):
|
||||
self.rpc.admin_shutdown()
|
||||
|
Loading…
Reference in New Issue
Block a user