mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-18 11:05:18 +00:00
Use LRU to cache MPT nodes. (#227)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Add trait. * Update merkle tree trait. * Use NodeManager. * fix. * Use LRU for cache. * fix clippy. * Save layer size. * Initialize LogManager with NodeManager. * Fix. * Fix test. * fix.
This commit is contained in:
parent
8f17a7ad72
commit
506d234562
34
Cargo.lock
generated
34
Cargo.lock
generated
@ -223,7 +223,9 @@ dependencies = [
|
|||||||
"eth2_ssz",
|
"eth2_ssz",
|
||||||
"eth2_ssz_derive",
|
"eth2_ssz_derive",
|
||||||
"ethereum-types 0.14.1",
|
"ethereum-types 0.14.1",
|
||||||
|
"itertools 0.13.0",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
|
"lru 0.12.5",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"serde",
|
"serde",
|
||||||
"tiny-keccak",
|
"tiny-keccak",
|
||||||
@ -1673,7 +1675,7 @@ dependencies = [
|
|||||||
"hkdf",
|
"hkdf",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libp2p-core 0.30.2",
|
"libp2p-core 0.30.2",
|
||||||
"lru",
|
"lru 0.7.8",
|
||||||
"parking_lot 0.11.2",
|
"parking_lot 0.11.2",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rlp",
|
"rlp",
|
||||||
@ -2514,6 +2516,12 @@ version = "1.0.7"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "foldhash"
|
||||||
|
version = "0.1.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "foreign-types"
|
name = "foreign-types"
|
||||||
version = "0.3.2"
|
version = "0.3.2"
|
||||||
@ -2946,6 +2954,17 @@ dependencies = [
|
|||||||
"allocator-api2",
|
"allocator-api2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hashbrown"
|
||||||
|
version = "0.15.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
|
||||||
|
dependencies = [
|
||||||
|
"allocator-api2",
|
||||||
|
"equivalent",
|
||||||
|
"foldhash",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hashers"
|
name = "hashers"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
@ -4117,7 +4136,7 @@ dependencies = [
|
|||||||
"libp2p-core 0.33.0",
|
"libp2p-core 0.33.0",
|
||||||
"libp2p-swarm",
|
"libp2p-swarm",
|
||||||
"log",
|
"log",
|
||||||
"lru",
|
"lru 0.7.8",
|
||||||
"prost 0.10.4",
|
"prost 0.10.4",
|
||||||
"prost-build 0.10.4",
|
"prost-build 0.10.4",
|
||||||
"prost-codec",
|
"prost-codec",
|
||||||
@ -4650,6 +4669,15 @@ dependencies = [
|
|||||||
"hashbrown 0.12.3",
|
"hashbrown 0.12.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lru"
|
||||||
|
version = "0.12.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
|
||||||
|
dependencies = [
|
||||||
|
"hashbrown 0.15.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lru-cache"
|
name = "lru-cache"
|
||||||
version = "0.1.2"
|
version = "0.1.2"
|
||||||
@ -5019,7 +5047,7 @@ dependencies = [
|
|||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libp2p",
|
"libp2p",
|
||||||
"lighthouse_metrics",
|
"lighthouse_metrics",
|
||||||
"lru",
|
"lru 0.7.8",
|
||||||
"parking_lot 0.12.3",
|
"parking_lot 0.12.3",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"regex",
|
"regex",
|
||||||
|
@ -36,4 +36,8 @@ eth2_ssz = { path = "version-meld/eth2_ssz" }
|
|||||||
enr = { path = "version-meld/enr" }
|
enr = { path = "version-meld/enr" }
|
||||||
|
|
||||||
[profile.bench.package.'storage']
|
[profile.bench.package.'storage']
|
||||||
debug = true
|
debug = true
|
||||||
|
|
||||||
|
[profile.dev]
|
||||||
|
# enabling debug_assertions will make node fail to start because of checks in `clap`.
|
||||||
|
debug-assertions = false
|
@ -12,4 +12,6 @@ eth2_ssz_derive = "0.3.0"
|
|||||||
serde = { version = "1.0.137", features = ["derive"] }
|
serde = { version = "1.0.137", features = ["derive"] }
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
tracing = "0.1.36"
|
tracing = "0.1.36"
|
||||||
once_cell = "1.19.0"
|
once_cell = "1.19.0"
|
||||||
|
itertools = "0.13.0"
|
||||||
|
lru = "0.12.5"
|
@ -1,23 +1,28 @@
|
|||||||
mod merkle_tree;
|
mod merkle_tree;
|
||||||
|
mod node_manager;
|
||||||
mod proof;
|
mod proof;
|
||||||
mod sha3;
|
mod sha3;
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
|
use itertools::Itertools;
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
use std::sync::Arc;
|
||||||
use tracing::{trace, warn};
|
use tracing::{trace, warn};
|
||||||
|
|
||||||
|
use crate::merkle_tree::MerkleTreeWrite;
|
||||||
pub use crate::merkle_tree::{
|
pub use crate::merkle_tree::{
|
||||||
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
||||||
};
|
};
|
||||||
|
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction};
|
||||||
pub use proof::{Proof, RangeProof};
|
pub use proof::{Proof, RangeProof};
|
||||||
pub use sha3::Sha3Algorithm;
|
pub use sha3::Sha3Algorithm;
|
||||||
|
|
||||||
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
|
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
|
||||||
/// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
|
/// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
|
||||||
layers: Vec<Vec<E>>,
|
node_manager: NodeManager<E>,
|
||||||
/// Keep the delta nodes that can be used to construct a history tree.
|
/// Keep the delta nodes that can be used to construct a history tree.
|
||||||
/// The key is the root node of that version.
|
/// The key is the root node of that version.
|
||||||
delta_nodes_map: BTreeMap<u64, DeltaNodes<E>>,
|
delta_nodes_map: BTreeMap<u64, DeltaNodes<E>>,
|
||||||
@ -35,13 +40,16 @@ pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
|
|||||||
impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||||
pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
|
pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
layers: vec![leaves],
|
node_manager: NodeManager::new_dummy(),
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: None,
|
min_depth: None,
|
||||||
leaf_height,
|
leaf_height,
|
||||||
_a: Default::default(),
|
_a: Default::default(),
|
||||||
};
|
};
|
||||||
|
merkle.node_manager.start_transaction();
|
||||||
|
merkle.node_manager.add_layer();
|
||||||
|
merkle.node_manager.append_nodes(0, &leaves);
|
||||||
if merkle.leaves() == 0 {
|
if merkle.leaves() == 0 {
|
||||||
if let Some(seq) = start_tx_seq {
|
if let Some(seq) = start_tx_seq {
|
||||||
merkle.delta_nodes_map.insert(
|
merkle.delta_nodes_map.insert(
|
||||||
@ -51,10 +59,12 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
merkle.node_manager.commit();
|
||||||
return merkle;
|
return merkle;
|
||||||
}
|
}
|
||||||
// Reconstruct the whole tree.
|
// Reconstruct the whole tree.
|
||||||
merkle.recompute(0, 0, None);
|
merkle.recompute(0, 0, None);
|
||||||
|
merkle.node_manager.commit();
|
||||||
// Commit the first version in memory.
|
// Commit the first version in memory.
|
||||||
// TODO(zz): Check when the roots become available.
|
// TODO(zz): Check when the roots become available.
|
||||||
merkle.commit(start_tx_seq);
|
merkle.commit(start_tx_seq);
|
||||||
@ -62,53 +72,44 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_with_subtrees(
|
pub fn new_with_subtrees(
|
||||||
initial_data: MerkleTreeInitialData<E>,
|
node_db: Arc<dyn NodeDatabase<E>>,
|
||||||
|
node_cache_capacity: usize,
|
||||||
leaf_height: usize,
|
leaf_height: usize,
|
||||||
start_tx_seq: Option<u64>,
|
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
layers: vec![vec![]],
|
node_manager: NodeManager::new(node_db, node_cache_capacity)?,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: None,
|
min_depth: None,
|
||||||
leaf_height,
|
leaf_height,
|
||||||
_a: Default::default(),
|
_a: Default::default(),
|
||||||
};
|
};
|
||||||
if initial_data.subtree_list.is_empty() {
|
if merkle.height() == 0 {
|
||||||
if let Some(seq) = start_tx_seq {
|
merkle.node_manager.start_transaction();
|
||||||
merkle.delta_nodes_map.insert(
|
merkle.node_manager.add_layer();
|
||||||
seq,
|
merkle.node_manager.commit();
|
||||||
DeltaNodes {
|
|
||||||
right_most_nodes: vec![],
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return Ok(merkle);
|
|
||||||
}
|
|
||||||
merkle.append_subtree_list(initial_data.subtree_list)?;
|
|
||||||
merkle.commit(start_tx_seq);
|
|
||||||
for (index, h) in initial_data.known_leaves {
|
|
||||||
merkle.fill_leaf(index, h);
|
|
||||||
}
|
|
||||||
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
|
|
||||||
// TODO: Delete duplicate nodes from DB.
|
|
||||||
merkle.layers[layer_index][position] = h;
|
|
||||||
}
|
}
|
||||||
Ok(merkle)
|
Ok(merkle)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
||||||
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
||||||
|
let mut node_manager = NodeManager::new_dummy();
|
||||||
|
node_manager.start_transaction();
|
||||||
if leaves.is_empty() {
|
if leaves.is_empty() {
|
||||||
// Create an empty merkle tree with `depth`.
|
// Create an empty merkle tree with `depth`.
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
layers: vec![vec![]; depth],
|
// dummy node manager for the last chunk.
|
||||||
|
node_manager,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: Some(depth),
|
min_depth: Some(depth),
|
||||||
leaf_height: 0,
|
leaf_height: 0,
|
||||||
_a: Default::default(),
|
_a: Default::default(),
|
||||||
};
|
};
|
||||||
|
for _ in 0..depth {
|
||||||
|
merkle.node_manager.add_layer();
|
||||||
|
}
|
||||||
if let Some(seq) = start_tx_seq {
|
if let Some(seq) = start_tx_seq {
|
||||||
merkle.delta_nodes_map.insert(
|
merkle.delta_nodes_map.insert(
|
||||||
seq,
|
seq,
|
||||||
@ -117,20 +118,26 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
merkle.node_manager.commit();
|
||||||
merkle
|
merkle
|
||||||
} else {
|
} else {
|
||||||
let mut layers = vec![vec![]; depth];
|
|
||||||
layers[0] = leaves;
|
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
layers,
|
// dummy node manager for the last chunk.
|
||||||
|
node_manager,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: Some(depth),
|
min_depth: Some(depth),
|
||||||
leaf_height: 0,
|
leaf_height: 0,
|
||||||
_a: Default::default(),
|
_a: Default::default(),
|
||||||
};
|
};
|
||||||
|
merkle.node_manager.add_layer();
|
||||||
|
merkle.append_nodes(0, &leaves);
|
||||||
|
for _ in 1..depth {
|
||||||
|
merkle.node_manager.add_layer();
|
||||||
|
}
|
||||||
// Reconstruct the whole tree.
|
// Reconstruct the whole tree.
|
||||||
merkle.recompute(0, 0, None);
|
merkle.recompute(0, 0, None);
|
||||||
|
merkle.node_manager.commit();
|
||||||
// Commit the first version in memory.
|
// Commit the first version in memory.
|
||||||
merkle.commit(start_tx_seq);
|
merkle.commit(start_tx_seq);
|
||||||
merkle
|
merkle
|
||||||
@ -142,18 +149,22 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.layers[0].push(new_leaf);
|
self.node_manager.start_transaction();
|
||||||
|
self.node_manager.push_node(0, new_leaf);
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
|
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||||
if leaf_list.contains(&E::null()) {
|
if leaf_list.contains(&E::null()) {
|
||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.layers[0].append(&mut leaf_list);
|
self.node_manager.append_nodes(0, &leaf_list);
|
||||||
self.recompute_after_append_leaves(start_index);
|
self.recompute_after_append_leaves(start_index);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a leaf list by providing their intermediate node hash.
|
/// Append a leaf list by providing their intermediate node hash.
|
||||||
@ -166,9 +177,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_root is null");
|
bail!("subtree_root is null");
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,11 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_list contains null");
|
bail!("subtree_list contains null");
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
for (subtree_depth, subtree_root) in subtree_list {
|
for (subtree_depth, subtree_root) in subtree_list {
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
}
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,13 +207,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// updating to null is not allowed.
|
// updating to null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if self.layers[0].is_empty() {
|
self.node_manager.start_transaction();
|
||||||
|
if self.layer_len(0) == 0 {
|
||||||
// Special case for the first data.
|
// Special case for the first data.
|
||||||
self.layers[0].push(updated_leaf);
|
self.push_node(0, updated_leaf);
|
||||||
} else {
|
} else {
|
||||||
*self.layers[0].last_mut().unwrap() = updated_leaf;
|
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
||||||
}
|
}
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fill an unknown `null` leaf with its real value.
|
/// Fill an unknown `null` leaf with its real value.
|
||||||
@ -207,13 +224,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
|
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
|
||||||
if leaf == E::null() {
|
if leaf == E::null() {
|
||||||
// fill leaf with null is not allowed.
|
// fill leaf with null is not allowed.
|
||||||
} else if self.layers[0][index] == E::null() {
|
} else if self.node(0, index) == E::null() {
|
||||||
self.layers[0][index] = leaf;
|
self.node_manager.start_transaction();
|
||||||
|
self.update_node(0, index, leaf);
|
||||||
self.recompute_after_fill_leaves(index, index + 1);
|
self.recompute_after_fill_leaves(index, index + 1);
|
||||||
} else if self.layers[0][index] != leaf {
|
self.node_manager.commit();
|
||||||
|
} else if self.node(0, index) != leaf {
|
||||||
panic!(
|
panic!(
|
||||||
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
||||||
index, self.layers[0][index], leaf
|
index,
|
||||||
|
self.node(0, index),
|
||||||
|
leaf
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -226,6 +247,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
proof: RangeProof<E>,
|
proof: RangeProof<E>,
|
||||||
) -> Result<Vec<(usize, usize, E)>> {
|
) -> Result<Vec<(usize, usize, E)>> {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let mut updated_nodes = Vec::new();
|
let mut updated_nodes = Vec::new();
|
||||||
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
|
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
|
||||||
if left_nodes.len() >= self.leaf_height {
|
if left_nodes.len() >= self.leaf_height {
|
||||||
@ -237,6 +259,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
updated_nodes
|
updated_nodes
|
||||||
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
|
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
|
||||||
}
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(updated_nodes)
|
Ok(updated_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -262,13 +285,16 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
if tx_merkle_nodes.is_empty() {
|
if tx_merkle_nodes.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let mut position_and_data =
|
let mut position_and_data =
|
||||||
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
||||||
let start_index = (start_index >> self.leaf_height) as usize;
|
let start_index = (start_index >> self.leaf_height) as usize;
|
||||||
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
||||||
*position += start_index >> i;
|
*position += start_index >> i;
|
||||||
}
|
}
|
||||||
self.fill_with_proof(position_and_data)
|
let updated_nodes = self.fill_with_proof(position_and_data)?;
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(updated_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
||||||
@ -280,28 +306,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
let mut updated_nodes = Vec::new();
|
let mut updated_nodes = Vec::new();
|
||||||
// A valid proof should not fail the following checks.
|
// A valid proof should not fail the following checks.
|
||||||
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
|
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
|
||||||
let layer = &mut self.layers[i];
|
if position > self.layer_len(i) {
|
||||||
if position > layer.len() {
|
|
||||||
bail!(
|
bail!(
|
||||||
"proof position out of range, position={} layer.len()={}",
|
"proof position out of range, position={} layer.len()={}",
|
||||||
position,
|
position,
|
||||||
layer.len()
|
self.layer_len(i)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if position == layer.len() {
|
if position == self.layer_len(i) {
|
||||||
// skip padding node.
|
// skip padding node.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if layer[position] == E::null() {
|
if self.node(i, position) == E::null() {
|
||||||
layer[position] = data.clone();
|
self.update_node(i, position, data.clone());
|
||||||
updated_nodes.push((i, position, data))
|
updated_nodes.push((i, position, data))
|
||||||
} else if layer[position] != data {
|
} else if self.node(i, position) != data {
|
||||||
// The last node in each layer may have changed in the tree.
|
// The last node in each layer may have changed in the tree.
|
||||||
trace!(
|
trace!(
|
||||||
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
|
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
|
||||||
i,
|
i,
|
||||||
position,
|
position,
|
||||||
layer[position],
|
self.node(i, position),
|
||||||
data
|
data
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -317,8 +342,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
if position >= self.leaves() {
|
if position >= self.leaves() {
|
||||||
bail!("Out of bound: position={} end={}", position, self.leaves());
|
bail!("Out of bound: position={} end={}", position, self.leaves());
|
||||||
}
|
}
|
||||||
if self.layers[0][position] != E::null() {
|
if self.node(0, position) != E::null() {
|
||||||
Ok(Some(self.layers[0][position].clone()))
|
Ok(Some(self.node(0, position)))
|
||||||
} else {
|
} else {
|
||||||
// The leaf hash is unknown.
|
// The leaf hash is unknown.
|
||||||
Ok(None)
|
Ok(None)
|
||||||
@ -366,10 +391,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let mut right_most_nodes = Vec::new();
|
let mut right_most_nodes = Vec::new();
|
||||||
for layer in &self.layers {
|
for height in 0..self.height() {
|
||||||
right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone()));
|
let pos = self.layer_len(height) - 1;
|
||||||
|
right_most_nodes.push((pos, self.node(height, pos)));
|
||||||
}
|
}
|
||||||
let root = self.root().clone();
|
let root = self.root();
|
||||||
self.delta_nodes_map
|
self.delta_nodes_map
|
||||||
.insert(tx_seq, DeltaNodes::new(right_most_nodes));
|
.insert(tx_seq, DeltaNodes::new(right_most_nodes));
|
||||||
self.root_to_tx_seq_map.insert(root, tx_seq);
|
self.root_to_tx_seq_map.insert(root, tx_seq);
|
||||||
@ -377,8 +403,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn before_extend_layer(&mut self, height: usize) {
|
fn before_extend_layer(&mut self, height: usize) {
|
||||||
if height == self.layers.len() {
|
if height == self.height() {
|
||||||
self.layers.push(Vec::new());
|
self.node_manager.add_layer()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -395,7 +421,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Given a range of changed leaf nodes and recompute the tree.
|
/// Given a range of changed leaf nodes and recompute the tree.
|
||||||
/// Since this tree is append-only, we always compute to the end.
|
|
||||||
fn recompute(
|
fn recompute(
|
||||||
&mut self,
|
&mut self,
|
||||||
mut start_index: usize,
|
mut start_index: usize,
|
||||||
@ -405,42 +430,51 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
start_index >>= height;
|
start_index >>= height;
|
||||||
maybe_end_index = maybe_end_index.map(|end| end >> height);
|
maybe_end_index = maybe_end_index.map(|end| end >> height);
|
||||||
// Loop until we compute the new root and reach `tree_depth`.
|
// Loop until we compute the new root and reach `tree_depth`.
|
||||||
while self.layers[height].len() > 1 || height < self.layers.len() - 1 {
|
while self.layer_len(height) > 1 || height < self.height() - 1 {
|
||||||
let next_layer_start_index = start_index >> 1;
|
let next_layer_start_index = start_index >> 1;
|
||||||
if start_index % 2 == 1 {
|
if start_index % 2 == 1 {
|
||||||
start_index -= 1;
|
start_index -= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len());
|
let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height));
|
||||||
if end_index % 2 == 1 && end_index != self.layers[height].len() {
|
if end_index % 2 == 1 && end_index != self.layer_len(height) {
|
||||||
end_index += 1;
|
end_index += 1;
|
||||||
}
|
}
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
let mut iter = self.layers[height][start_index..end_index].chunks_exact(2);
|
let iter = self
|
||||||
|
.node_manager
|
||||||
|
.get_nodes(height, start_index, end_index)
|
||||||
|
.chunks(2);
|
||||||
// We cannot modify the parent layer while iterating the child layer,
|
// We cannot modify the parent layer while iterating the child layer,
|
||||||
// so just keep the changes and update them later.
|
// so just keep the changes and update them later.
|
||||||
let mut parent_update = Vec::new();
|
let mut parent_update = Vec::new();
|
||||||
while let Some([left, right]) = iter.next() {
|
for chunk_iter in &iter {
|
||||||
// If either left or right is null (unknown), we cannot compute the parent hash.
|
let chunk: Vec<_> = chunk_iter.collect();
|
||||||
// Note that if we are recompute a range of an existing tree,
|
if chunk.len() == 2 {
|
||||||
// we do not need to keep these possibly null parent. This is only saved
|
let left = &chunk[0];
|
||||||
// for the case of constructing a new tree from the leaves.
|
let right = &chunk[1];
|
||||||
let parent = if *left == E::null() || *right == E::null() {
|
// If either left or right is null (unknown), we cannot compute the parent hash.
|
||||||
E::null()
|
// Note that if we are recompute a range of an existing tree,
|
||||||
|
// we do not need to keep these possibly null parent. This is only saved
|
||||||
|
// for the case of constructing a new tree from the leaves.
|
||||||
|
let parent = if *left == E::null() || *right == E::null() {
|
||||||
|
E::null()
|
||||||
|
} else {
|
||||||
|
A::parent(left, right)
|
||||||
|
};
|
||||||
|
parent_update.push((next_layer_start_index + i, parent));
|
||||||
|
i += 1;
|
||||||
} else {
|
} else {
|
||||||
A::parent(left, right)
|
assert_eq!(chunk.len(), 1);
|
||||||
};
|
let r = &chunk[0];
|
||||||
parent_update.push((next_layer_start_index + i, parent));
|
// Same as above.
|
||||||
i += 1;
|
let parent = if *r == E::null() {
|
||||||
}
|
E::null()
|
||||||
if let [r] = iter.remainder() {
|
} else {
|
||||||
// Same as above.
|
A::parent_single(r, height + self.leaf_height)
|
||||||
let parent = if *r == E::null() {
|
};
|
||||||
E::null()
|
parent_update.push((next_layer_start_index + i, parent));
|
||||||
} else {
|
}
|
||||||
A::parent_single(r, height + self.leaf_height)
|
|
||||||
};
|
|
||||||
parent_update.push((next_layer_start_index + i, parent));
|
|
||||||
}
|
}
|
||||||
if !parent_update.is_empty() {
|
if !parent_update.is_empty() {
|
||||||
self.before_extend_layer(height + 1);
|
self.before_extend_layer(height + 1);
|
||||||
@ -449,27 +483,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// we can just overwrite `last_changed_parent_index` with new values.
|
// we can just overwrite `last_changed_parent_index` with new values.
|
||||||
let mut last_changed_parent_index = None;
|
let mut last_changed_parent_index = None;
|
||||||
for (parent_index, parent) in parent_update {
|
for (parent_index, parent) in parent_update {
|
||||||
match parent_index.cmp(&self.layers[height + 1].len()) {
|
match parent_index.cmp(&self.layer_len(height + 1)) {
|
||||||
Ordering::Less => {
|
Ordering::Less => {
|
||||||
// We do not overwrite with null.
|
// We do not overwrite with null.
|
||||||
if parent != E::null() {
|
if parent != E::null() {
|
||||||
if self.layers[height + 1][parent_index] == E::null()
|
if self.node(height + 1, parent_index) == E::null()
|
||||||
// The last node in a layer can be updated.
|
// The last node in a layer can be updated.
|
||||||
|| (self.layers[height + 1][parent_index] != parent
|
|| (self.node(height + 1, parent_index) != parent
|
||||||
&& parent_index == self.layers[height + 1].len() - 1)
|
&& parent_index == self.layer_len(height + 1) - 1)
|
||||||
{
|
{
|
||||||
self.layers[height + 1][parent_index] = parent;
|
self.update_node(height + 1, parent_index, parent);
|
||||||
last_changed_parent_index = Some(parent_index);
|
last_changed_parent_index = Some(parent_index);
|
||||||
} else if self.layers[height + 1][parent_index] != parent {
|
} else if self.node(height + 1, parent_index) != parent {
|
||||||
// Recompute changes a node in the middle. This should be impossible
|
// Recompute changes a node in the middle. This should be impossible
|
||||||
// if the inputs are valid.
|
// if the inputs are valid.
|
||||||
panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}",
|
panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}",
|
||||||
height + 1, parent_index, self.layers[height + 1][parent_index], parent);
|
height + 1, parent_index, self.node(height + 1, parent_index), parent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ordering::Equal => {
|
Ordering::Equal => {
|
||||||
self.layers[height + 1].push(parent);
|
self.push_node(height + 1, parent);
|
||||||
last_changed_parent_index = Some(parent_index);
|
last_changed_parent_index = Some(parent_index);
|
||||||
}
|
}
|
||||||
Ordering::Greater => {
|
Ordering::Greater => {
|
||||||
@ -500,10 +534,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
for height in 0..(subtree_depth - 1) {
|
for height in 0..(subtree_depth - 1) {
|
||||||
self.before_extend_layer(height);
|
self.before_extend_layer(height);
|
||||||
let subtree_layer_size = 1 << (subtree_depth - 1 - height);
|
let subtree_layer_size = 1 << (subtree_depth - 1 - height);
|
||||||
self.layers[height].append(&mut vec![E::null(); subtree_layer_size]);
|
self.append_nodes(height, &vec![E::null(); subtree_layer_size]);
|
||||||
}
|
}
|
||||||
self.before_extend_layer(subtree_depth - 1);
|
self.before_extend_layer(subtree_depth - 1);
|
||||||
self.layers[subtree_depth - 1].push(subtree_root);
|
self.push_node(subtree_depth - 1, subtree_root);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -514,23 +548,45 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> {
|
pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> {
|
||||||
if self.layers[0].is_empty() {
|
if self.layer_len(0) == 0 {
|
||||||
// Any previous state of an empty tree is always empty.
|
// Any previous state of an empty tree is always empty.
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let delta_nodes = self
|
let delta_nodes = self
|
||||||
.delta_nodes_map
|
.delta_nodes_map
|
||||||
.get(&tx_seq)
|
.get(&tx_seq)
|
||||||
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?;
|
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
|
||||||
|
.clone();
|
||||||
// Dropping the upper layers that are not in the old merkle tree.
|
// Dropping the upper layers that are not in the old merkle tree.
|
||||||
self.layers.truncate(delta_nodes.right_most_nodes.len());
|
for height in (delta_nodes.right_most_nodes.len()..self.height()).rev() {
|
||||||
|
self.node_manager.truncate_layer(height);
|
||||||
|
}
|
||||||
for (height, (last_index, right_most_node)) in
|
for (height, (last_index, right_most_node)) in
|
||||||
delta_nodes.right_most_nodes.iter().enumerate()
|
delta_nodes.right_most_nodes.iter().enumerate()
|
||||||
{
|
{
|
||||||
self.layers[height].truncate(*last_index + 1);
|
self.node_manager.truncate_nodes(height, *last_index + 1);
|
||||||
self.layers[height][*last_index] = right_most_node.clone();
|
self.update_node(height, *last_index, right_most_node.clone())
|
||||||
}
|
}
|
||||||
self.clear_after(tx_seq);
|
self.clear_after(tx_seq);
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revert to a tx_seq not in `delta_nodes_map`.
|
||||||
|
// This is needed to revert the last unfinished tx after restart.
|
||||||
|
pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
|
for height in (0..self.height()).rev() {
|
||||||
|
let kept_nodes = leaves >> height;
|
||||||
|
if kept_nodes == 0 {
|
||||||
|
self.node_manager.truncate_layer(height);
|
||||||
|
} else {
|
||||||
|
self.node_manager.truncate_nodes(height, kept_nodes + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.recompute_after_append_leaves(leaves);
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -550,17 +606,25 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
bail!("empty tree");
|
bail!("empty tree");
|
||||||
}
|
}
|
||||||
Ok(HistoryTree {
|
Ok(HistoryTree {
|
||||||
layers: &self.layers,
|
node_manager: &self.node_manager,
|
||||||
delta_nodes,
|
delta_nodes,
|
||||||
leaf_height: self.leaf_height,
|
leaf_height: self.leaf_height,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reset(&mut self) {
|
pub fn reset(&mut self) {
|
||||||
self.layers = match self.min_depth {
|
self.node_manager.start_transaction();
|
||||||
None => vec![vec![]],
|
for height in (0..self.height()).rev() {
|
||||||
Some(depth) => vec![vec![]; depth],
|
self.node_manager.truncate_layer(height);
|
||||||
};
|
}
|
||||||
|
if let Some(depth) = self.min_depth {
|
||||||
|
for _ in 0..depth {
|
||||||
|
self.node_manager.add_layer();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.node_manager.add_layer();
|
||||||
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_after(&mut self, tx_seq: u64) {
|
fn clear_after(&mut self, tx_seq: u64) {
|
||||||
@ -580,10 +644,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
fn first_known_root_at(&self, index: usize) -> (usize, E) {
|
fn first_known_root_at(&self, index: usize) -> (usize, E) {
|
||||||
let mut height = 0;
|
let mut height = 0;
|
||||||
let mut index_in_layer = index;
|
let mut index_in_layer = index;
|
||||||
while height < self.layers.len() {
|
while height < self.height() {
|
||||||
let node = self.node(height, index_in_layer);
|
let node = self.node(height, index_in_layer);
|
||||||
if !node.is_null() {
|
if !node.is_null() {
|
||||||
return (height + 1, node.clone());
|
return (height + 1, node);
|
||||||
}
|
}
|
||||||
height += 1;
|
height += 1;
|
||||||
index_in_layer /= 2;
|
index_in_layer /= 2;
|
||||||
@ -628,7 +692,7 @@ impl<E: HashElement> DeltaNodes<E> {
|
|||||||
|
|
||||||
pub struct HistoryTree<'m, E: HashElement> {
|
pub struct HistoryTree<'m, E: HashElement> {
|
||||||
/// A reference to the global tree nodes.
|
/// A reference to the global tree nodes.
|
||||||
layers: &'m Vec<Vec<E>>,
|
node_manager: &'m NodeManager<E>,
|
||||||
/// The delta nodes that are difference from `layers`.
|
/// The delta nodes that are difference from `layers`.
|
||||||
/// This could be a reference, we just take ownership for convenience.
|
/// This could be a reference, we just take ownership for convenience.
|
||||||
delta_nodes: &'m DeltaNodes<E>,
|
delta_nodes: &'m DeltaNodes<E>,
|
||||||
@ -639,16 +703,18 @@ pub struct HistoryTree<'m, E: HashElement> {
|
|||||||
impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A> {
|
impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A> {
|
||||||
type E = E;
|
type E = E;
|
||||||
|
|
||||||
fn node(&self, layer: usize, index: usize) -> &Self::E {
|
fn node(&self, layer: usize, index: usize) -> Self::E {
|
||||||
&self.layers[layer][index]
|
self.node_manager
|
||||||
|
.get_node(layer, index)
|
||||||
|
.expect("index checked")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn height(&self) -> usize {
|
fn height(&self) -> usize {
|
||||||
self.layers.len()
|
self.node_manager.num_layers()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn layer_len(&self, layer_height: usize) -> usize {
|
fn layer_len(&self, layer_height: usize) -> usize {
|
||||||
self.layers[layer_height].len()
|
self.node_manager.layer_size(layer_height)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn padding_node(&self, height: usize) -> Self::E {
|
fn padding_node(&self, height: usize) -> Self::E {
|
||||||
@ -658,10 +724,13 @@ impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A>
|
|||||||
|
|
||||||
impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
|
impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
|
||||||
type E = E;
|
type E = E;
|
||||||
fn node(&self, layer: usize, index: usize) -> &Self::E {
|
fn node(&self, layer: usize, index: usize) -> Self::E {
|
||||||
match self.delta_nodes.get(layer, index).expect("range checked") {
|
match self.delta_nodes.get(layer, index).expect("range checked") {
|
||||||
Some(node) if *node != E::null() => node,
|
Some(node) if *node != E::null() => node.clone(),
|
||||||
_ => &self.layers[layer][index],
|
_ => self
|
||||||
|
.node_manager
|
||||||
|
.get_node(layer, index)
|
||||||
|
.expect("index checked"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -678,6 +747,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<E: HashElement, A: Algorithm<E>> MerkleTreeWrite for AppendMerkleTree<E, A> {
|
||||||
|
type E = E;
|
||||||
|
|
||||||
|
fn push_node(&mut self, layer: usize, node: Self::E) {
|
||||||
|
self.node_manager.push_node(layer, node);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) {
|
||||||
|
self.node_manager.append_nodes(layer, nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) {
|
||||||
|
self.node_manager.add_node(layer, pos, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! ensure_eq {
|
macro_rules! ensure_eq {
|
||||||
($given:expr, $expected:expr) => {
|
($given:expr, $expected:expr) => {
|
||||||
@ -699,6 +784,7 @@ macro_rules! ensure_eq {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::merkle_tree::MerkleTreeRead;
|
use crate::merkle_tree::MerkleTreeRead;
|
||||||
|
|
||||||
use crate::sha3::Sha3Algorithm;
|
use crate::sha3::Sha3Algorithm;
|
||||||
use crate::AppendMerkleTree;
|
use crate::AppendMerkleTree;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
|
@ -49,7 +49,7 @@ pub trait Algorithm<E: HashElement> {
|
|||||||
|
|
||||||
pub trait MerkleTreeRead {
|
pub trait MerkleTreeRead {
|
||||||
type E: HashElement;
|
type E: HashElement;
|
||||||
fn node(&self, layer: usize, index: usize) -> &Self::E;
|
fn node(&self, layer: usize, index: usize) -> Self::E;
|
||||||
fn height(&self) -> usize;
|
fn height(&self) -> usize;
|
||||||
fn layer_len(&self, layer_height: usize) -> usize;
|
fn layer_len(&self, layer_height: usize) -> usize;
|
||||||
fn padding_node(&self, height: usize) -> Self::E;
|
fn padding_node(&self, height: usize) -> Self::E;
|
||||||
@ -58,7 +58,7 @@ pub trait MerkleTreeRead {
|
|||||||
self.layer_len(0)
|
self.layer_len(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn root(&self) -> &Self::E {
|
fn root(&self) -> Self::E {
|
||||||
self.node(self.height() - 1, 0)
|
self.node(self.height() - 1, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,16 +70,16 @@ pub trait MerkleTreeRead {
|
|||||||
self.leaves()
|
self.leaves()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if self.node(0, leaf_index) == &Self::E::null() {
|
if self.node(0, leaf_index) == Self::E::null() {
|
||||||
bail!("Not ready to generate proof for leaf_index={}", leaf_index);
|
bail!("Not ready to generate proof for leaf_index={}", leaf_index);
|
||||||
}
|
}
|
||||||
if self.height() == 1 {
|
if self.height() == 1 {
|
||||||
return Proof::new(vec![self.root().clone(), self.root().clone()], vec![]);
|
return Proof::new(vec![self.root(), self.root().clone()], vec![]);
|
||||||
}
|
}
|
||||||
let mut lemma: Vec<Self::E> = Vec::with_capacity(self.height()); // path + root
|
let mut lemma: Vec<Self::E> = Vec::with_capacity(self.height()); // path + root
|
||||||
let mut path: Vec<bool> = Vec::with_capacity(self.height() - 2); // path - 1
|
let mut path: Vec<bool> = Vec::with_capacity(self.height() - 2); // path - 1
|
||||||
let mut index_in_layer = leaf_index;
|
let mut index_in_layer = leaf_index;
|
||||||
lemma.push(self.node(0, leaf_index).clone());
|
lemma.push(self.node(0, leaf_index));
|
||||||
for height in 0..(self.height() - 1) {
|
for height in 0..(self.height() - 1) {
|
||||||
trace!(
|
trace!(
|
||||||
"gen_proof: height={} index={} hash={:?}",
|
"gen_proof: height={} index={} hash={:?}",
|
||||||
@ -93,15 +93,15 @@ pub trait MerkleTreeRead {
|
|||||||
// TODO: This can be skipped if the tree size is available in validation.
|
// TODO: This can be skipped if the tree size is available in validation.
|
||||||
lemma.push(self.padding_node(height));
|
lemma.push(self.padding_node(height));
|
||||||
} else {
|
} else {
|
||||||
lemma.push(self.node(height, index_in_layer + 1).clone());
|
lemma.push(self.node(height, index_in_layer + 1));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
path.push(false);
|
path.push(false);
|
||||||
lemma.push(self.node(height, index_in_layer - 1).clone());
|
lemma.push(self.node(height, index_in_layer - 1));
|
||||||
}
|
}
|
||||||
index_in_layer >>= 1;
|
index_in_layer >>= 1;
|
||||||
}
|
}
|
||||||
lemma.push(self.root().clone());
|
lemma.push(self.root());
|
||||||
if lemma.contains(&Self::E::null()) {
|
if lemma.contains(&Self::E::null()) {
|
||||||
bail!(
|
bail!(
|
||||||
"Not enough data to generate proof, lemma={:?} path={:?}",
|
"Not enough data to generate proof, lemma={:?} path={:?}",
|
||||||
@ -130,6 +130,13 @@ pub trait MerkleTreeRead {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait MerkleTreeWrite {
|
||||||
|
type E: HashElement;
|
||||||
|
fn push_node(&mut self, layer: usize, node: Self::E);
|
||||||
|
fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]);
|
||||||
|
fn update_node(&mut self, layer: usize, pos: usize, node: Self::E);
|
||||||
|
}
|
||||||
|
|
||||||
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes
|
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes
|
||||||
/// are `null`. Other intermediate nodes will be computed based on these known nodes.
|
/// are `null`. Other intermediate nodes will be computed based on these known nodes.
|
||||||
pub struct MerkleTreeInitialData<E: HashElement> {
|
pub struct MerkleTreeInitialData<E: HashElement> {
|
||||||
|
219
common/append_merkle/src/node_manager.rs
Normal file
219
common/append_merkle/src/node_manager.rs
Normal file
@ -0,0 +1,219 @@
|
|||||||
|
use crate::HashElement;
|
||||||
|
use anyhow::Result;
|
||||||
|
use lru::LruCache;
|
||||||
|
use std::any::Any;
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
pub struct NodeManager<E: HashElement> {
|
||||||
|
cache: LruCache<(usize, usize), E>,
|
||||||
|
layer_size: Vec<usize>,
|
||||||
|
db: Arc<dyn NodeDatabase<E>>,
|
||||||
|
db_tx: Option<Box<dyn NodeTransaction<E>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: HashElement> NodeManager<E> {
|
||||||
|
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Result<Self> {
|
||||||
|
let mut layer = 0;
|
||||||
|
let mut layer_size = Vec::new();
|
||||||
|
while let Some(size) = db.get_layer_size(layer)? {
|
||||||
|
layer_size.push(size);
|
||||||
|
layer += 1;
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||||
|
layer_size,
|
||||||
|
db,
|
||||||
|
db_tx: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_dummy() -> Self {
|
||||||
|
Self {
|
||||||
|
cache: LruCache::unbounded(),
|
||||||
|
layer_size: vec![],
|
||||||
|
db: Arc::new(EmptyNodeDatabase {}),
|
||||||
|
db_tx: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_node(&mut self, layer: usize, node: E) {
|
||||||
|
self.add_node(layer, self.layer_size[layer], node);
|
||||||
|
self.set_layer_size(layer, self.layer_size[layer] + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
|
||||||
|
let mut pos = self.layer_size[layer];
|
||||||
|
let mut saved_nodes = Vec::with_capacity(nodes.len());
|
||||||
|
for node in nodes {
|
||||||
|
self.cache.put((layer, pos), node.clone());
|
||||||
|
saved_nodes.push((layer, pos, node));
|
||||||
|
pos += 1;
|
||||||
|
}
|
||||||
|
self.set_layer_size(layer, pos);
|
||||||
|
self.db_tx().save_node_list(&saved_nodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
||||||
|
match self.cache.peek(&(layer, pos)) {
|
||||||
|
Some(node) => Some(node.clone()),
|
||||||
|
None => self.db.get_node(layer, pos).unwrap_or_else(|e| {
|
||||||
|
error!("Failed to get node: {}", e);
|
||||||
|
None
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator<E> {
|
||||||
|
NodeIterator {
|
||||||
|
node_manager: self,
|
||||||
|
layer,
|
||||||
|
start_pos,
|
||||||
|
end_pos,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
||||||
|
// No need to insert if the value is unchanged.
|
||||||
|
if self.cache.get(&(layer, pos)) != Some(&node) {
|
||||||
|
self.db_tx().save_node(layer, pos, &node);
|
||||||
|
self.cache.put((layer, pos), node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_layer(&mut self) {
|
||||||
|
self.layer_size.push(0);
|
||||||
|
let layer = self.layer_size.len() - 1;
|
||||||
|
self.db_tx().save_layer_size(layer, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn layer_size(&self, layer: usize) -> usize {
|
||||||
|
self.layer_size[layer]
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn num_layers(&self) -> usize {
|
||||||
|
self.layer_size.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) {
|
||||||
|
let mut removed_nodes = Vec::new();
|
||||||
|
for pos in pos_end..self.layer_size[layer] {
|
||||||
|
self.cache.pop(&(layer, pos));
|
||||||
|
removed_nodes.push((layer, pos));
|
||||||
|
}
|
||||||
|
self.db_tx().remove_node_list(&removed_nodes);
|
||||||
|
self.set_layer_size(layer, pos_end);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn truncate_layer(&mut self, layer: usize) {
|
||||||
|
self.truncate_nodes(layer, 0);
|
||||||
|
if layer == self.num_layers() - 1 {
|
||||||
|
self.layer_size.pop();
|
||||||
|
self.db_tx().remove_layer_size(layer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start_transaction(&mut self) {
|
||||||
|
if self.db_tx.is_some() {
|
||||||
|
error!("start new tx before commit");
|
||||||
|
panic!("start new tx before commit");
|
||||||
|
}
|
||||||
|
self.db_tx = Some(self.db.start_transaction());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn commit(&mut self) {
|
||||||
|
let tx = match self.db_tx.take() {
|
||||||
|
Some(tx) => tx,
|
||||||
|
None => {
|
||||||
|
error!("db_tx is None");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = self.db.commit(tx) {
|
||||||
|
error!("Failed to commit db transaction: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn db_tx(&mut self) -> &mut dyn NodeTransaction<E> {
|
||||||
|
(*self.db_tx.as_mut().expect("tx checked")).as_mut()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_layer_size(&mut self, layer: usize, size: usize) {
|
||||||
|
self.layer_size[layer] = size;
|
||||||
|
self.db_tx().save_layer_size(layer, size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NodeIterator<'a, E: HashElement> {
|
||||||
|
node_manager: &'a NodeManager<E>,
|
||||||
|
layer: usize,
|
||||||
|
start_pos: usize,
|
||||||
|
end_pos: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
|
||||||
|
type Item = E;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
if self.start_pos < self.end_pos {
|
||||||
|
let r = self.node_manager.get_node(self.layer, self.start_pos);
|
||||||
|
self.start_pos += 1;
|
||||||
|
r
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
||||||
|
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
||||||
|
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>>;
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>>;
|
||||||
|
fn commit(&self, tx: Box<dyn NodeTransaction<E>>) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait NodeTransaction<E: HashElement>: Send + Sync {
|
||||||
|
fn save_node(&mut self, layer: usize, pos: usize, node: &E);
|
||||||
|
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
||||||
|
fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]);
|
||||||
|
fn remove_node_list(&mut self, nodes: &[(usize, usize)]);
|
||||||
|
fn save_layer_size(&mut self, layer: usize, size: usize);
|
||||||
|
fn remove_layer_size(&mut self, layer: usize);
|
||||||
|
|
||||||
|
fn into_any(self: Box<Self>) -> Box<dyn Any>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
||||||
|
pub struct EmptyNodeDatabase {}
|
||||||
|
pub struct EmptyNodeTransaction {}
|
||||||
|
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
||||||
|
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
fn get_layer_size(&self, _layer: usize) -> Result<Option<usize>> {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>> {
|
||||||
|
Box::new(EmptyNodeTransaction {})
|
||||||
|
}
|
||||||
|
fn commit(&self, _tx: Box<dyn NodeTransaction<E>>) -> Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: HashElement> NodeTransaction<E> for EmptyNodeTransaction {
|
||||||
|
fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {}
|
||||||
|
|
||||||
|
fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {}
|
||||||
|
|
||||||
|
fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {}
|
||||||
|
|
||||||
|
fn save_layer_size(&mut self, _layer: usize, _size: usize) {}
|
||||||
|
|
||||||
|
fn remove_layer_size(&mut self, _layer: usize) {}
|
||||||
|
|
||||||
|
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
@ -112,7 +112,7 @@ impl ClientBuilder {
|
|||||||
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
||||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
let executor = require!("sync", self, runtime_context).clone().executor;
|
||||||
let store = Arc::new(
|
let store = Arc::new(
|
||||||
LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
|
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
|
||||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion};
|
|||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
|
use storage::log_store::log_manager::LogConfig;
|
||||||
use storage::StorageConfig;
|
use storage::StorageConfig;
|
||||||
|
|
||||||
impl ZgsConfig {
|
impl ZgsConfig {
|
||||||
@ -101,8 +102,11 @@ impl ZgsConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn storage_config(&self) -> Result<StorageConfig, String> {
|
pub fn storage_config(&self) -> Result<StorageConfig, String> {
|
||||||
|
let mut log_config = LogConfig::default();
|
||||||
|
log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity;
|
||||||
Ok(StorageConfig {
|
Ok(StorageConfig {
|
||||||
db_dir: self.db_dir.clone().into(),
|
db_dir: self.db_dir.clone().into(),
|
||||||
|
log_config,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,6 +60,7 @@ build_config! {
|
|||||||
(prune_check_time_s, (u64), 60)
|
(prune_check_time_s, (u64), 60)
|
||||||
(prune_batch_size, (usize), 16 * 1024)
|
(prune_batch_size, (usize), 16 * 1024)
|
||||||
(prune_batch_wait_time_ms, (u64), 1000)
|
(prune_batch_wait_time_ms, (u64), 1000)
|
||||||
|
(merkle_node_cache_capacity, (usize), 32 * 1024 * 1024)
|
||||||
|
|
||||||
// misc
|
// misc
|
||||||
(log_config_file, (String), "log_config".to_string())
|
(log_config_file, (String), "log_config".to_string())
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use crate::log_store::log_manager::LogConfig;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr};
|
use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr};
|
||||||
@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config";
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub db_dir: PathBuf,
|
pub db_dir: PathBuf,
|
||||||
|
pub log_config: LogConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)]
|
#[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)]
|
||||||
|
@ -9,9 +9,11 @@ use crate::log_store::log_manager::{
|
|||||||
};
|
};
|
||||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
|
use any::Any;
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
|
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use kvdb::DBTransaction;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
@ -20,20 +22,20 @@ use std::cmp::Ordering;
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{cmp, mem};
|
use std::{any, cmp, mem};
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, error, trace};
|
||||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||||
|
|
||||||
pub struct FlowStore {
|
pub struct FlowStore {
|
||||||
db: FlowDBStore,
|
db: Arc<FlowDBStore>,
|
||||||
seal_manager: SealTaskManager,
|
seal_manager: SealTaskManager,
|
||||||
config: FlowConfig,
|
config: FlowConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowStore {
|
impl FlowStore {
|
||||||
pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
|
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
db: FlowDBStore::new(db),
|
db,
|
||||||
seal_manager: Default::default(),
|
seal_manager: Default::default(),
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
@ -93,6 +95,7 @@ impl FlowStore {
|
|||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FlowConfig {
|
pub struct FlowConfig {
|
||||||
pub batch_size: usize,
|
pub batch_size: usize,
|
||||||
|
pub merkle_node_cache_capacity: usize,
|
||||||
pub shard_config: Arc<RwLock<ShardConfig>>,
|
pub shard_config: Arc<RwLock<ShardConfig>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,6 +103,8 @@ impl Default for FlowConfig {
|
|||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
batch_size: SECTORS_PER_LOAD,
|
batch_size: SECTORS_PER_LOAD,
|
||||||
|
// Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size.
|
||||||
|
merkle_node_cache_capacity: 32 * 1024 * 1024,
|
||||||
shard_config: Default::default(),
|
shard_config: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -436,7 +441,7 @@ impl FlowDBStore {
|
|||||||
let mut expected_index = 0;
|
let mut expected_index = 0;
|
||||||
|
|
||||||
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
||||||
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||||
|
|
||||||
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
||||||
let (index_bytes, root_bytes) = r?;
|
let (index_bytes, root_bytes) = r?;
|
||||||
@ -666,3 +671,84 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
|||||||
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
||||||
Ok((layer_index, position))
|
Ok((layer_index, position))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn layer_size_key(layer: usize) -> Vec<u8> {
|
||||||
|
let mut key = "layer_size".as_bytes().to_vec();
|
||||||
|
key.extend_from_slice(&layer.to_be_bytes());
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NodeDBTransaction(DBTransaction);
|
||||||
|
|
||||||
|
impl NodeDatabase<DataRoot> for FlowDBStore {
|
||||||
|
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
||||||
|
Ok(self
|
||||||
|
.kvdb
|
||||||
|
.get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
|
||||||
|
.map(|v| DataRoot::from_slice(&v)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
|
||||||
|
match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? {
|
||||||
|
Some(v) => Ok(Some(try_decode_usize(&v)?)),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<DataRoot>> {
|
||||||
|
Box::new(NodeDBTransaction(self.kvdb.transaction()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
|
||||||
|
let db_tx: Box<NodeDBTransaction> = tx
|
||||||
|
.into_any()
|
||||||
|
.downcast()
|
||||||
|
.map_err(|e| anyhow!("downcast failed, e={:?}", e))?;
|
||||||
|
self.kvdb.write(db_tx.0).map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeTransaction<DataRoot> for NodeDBTransaction {
|
||||||
|
fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) {
|
||||||
|
self.0.put(
|
||||||
|
COL_FLOW_MPT_NODES,
|
||||||
|
&encode_mpt_node_key(layer, pos),
|
||||||
|
node.as_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) {
|
||||||
|
for (layer_index, position, data) in nodes {
|
||||||
|
self.0.put(
|
||||||
|
COL_FLOW_MPT_NODES,
|
||||||
|
&encode_mpt_node_key(*layer_index, *position),
|
||||||
|
data.as_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_node_list(&mut self, nodes: &[(usize, usize)]) {
|
||||||
|
for (layer_index, position) in nodes {
|
||||||
|
self.0.delete(
|
||||||
|
COL_FLOW_MPT_NODES,
|
||||||
|
&encode_mpt_node_key(*layer_index, *position),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn save_layer_size(&mut self, layer: usize, size: usize) {
|
||||||
|
self.0.put(
|
||||||
|
COL_FLOW_MPT_NODES,
|
||||||
|
&layer_size_key(layer),
|
||||||
|
&size.to_be_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_layer_size(&mut self, layer: usize) {
|
||||||
|
self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -4,11 +4,10 @@ mod seal;
|
|||||||
mod serde;
|
mod serde;
|
||||||
|
|
||||||
use ::serde::{Deserialize, Serialize};
|
use ::serde::{Deserialize, Serialize};
|
||||||
use std::cmp::min;
|
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
|
use std::cmp::min;
|
||||||
|
|
||||||
use crate::log_store::log_manager::data_to_merkle_leaves;
|
use crate::log_store::log_manager::data_to_merkle_leaves;
|
||||||
use crate::try_option;
|
use crate::try_option;
|
||||||
@ -206,7 +205,7 @@ impl EntryBatch {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Some(
|
Ok(Some(
|
||||||
*try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
|
try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
|
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
|
||||||
use crate::log_store::tx_store::TransactionStore;
|
use crate::log_store::tx_store::TransactionStore;
|
||||||
use crate::log_store::{
|
use crate::log_store::{
|
||||||
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
|
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
|
||||||
@ -94,6 +94,7 @@ impl MerkleManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
|
fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
|
||||||
|
debug!("revert merkle tree {}", tx_seq);
|
||||||
// Special case for reverting tx_seq == 0
|
// Special case for reverting tx_seq == 0
|
||||||
if tx_seq == u64::MAX {
|
if tx_seq == u64::MAX {
|
||||||
self.pora_chunks_merkle.reset();
|
self.pora_chunks_merkle.reset();
|
||||||
@ -116,7 +117,7 @@ impl MerkleManager {
|
|||||||
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
|
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
|
||||||
self.last_chunk_merkle.append(H256::zero());
|
self.last_chunk_merkle.append(H256::zero());
|
||||||
self.pora_chunks_merkle
|
self.pora_chunks_merkle
|
||||||
.update_last(*self.last_chunk_merkle.root());
|
.update_last(self.last_chunk_merkle.root());
|
||||||
} else if self.last_chunk_merkle.leaves() != 0 {
|
} else if self.last_chunk_merkle.leaves() != 0 {
|
||||||
let last_chunk_start_index = self.last_chunk_start_index();
|
let last_chunk_start_index = self.last_chunk_start_index();
|
||||||
let last_chunk_data = flow_store.get_available_entries(
|
let last_chunk_data = flow_store.get_available_entries(
|
||||||
@ -355,7 +356,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
|
merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
|
||||||
merkle.try_initialize(&self.flow_store)?;
|
merkle.try_initialize(&self.flow_store)?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
Some(*merkle.last_chunk_merkle.root()),
|
Some(merkle.last_chunk_merkle.root()),
|
||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
|
.leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
|
||||||
@ -577,7 +578,7 @@ impl LogStoreRead for LogManager {
|
|||||||
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
|
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
|
||||||
let merkle = self.merkle.read_recursive();
|
let merkle = self.merkle.read_recursive();
|
||||||
Ok((
|
Ok((
|
||||||
*merkle.pora_chunks_merkle.root(),
|
merkle.pora_chunks_merkle.root(),
|
||||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
|
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
@ -626,13 +627,10 @@ impl LogManager {
|
|||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let tx_store = TransactionStore::new(db.clone())?;
|
let tx_store = TransactionStore::new(db.clone())?;
|
||||||
let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
|
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
||||||
let mut initial_data = flow_store.get_chunk_root_list()?;
|
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
||||||
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||||
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
// first and call `put_tx` later.
|
||||||
// and inserted later.
|
|
||||||
let mut extra_leaves = Vec::new();
|
|
||||||
|
|
||||||
let next_tx_seq = tx_store.next_tx_seq();
|
let next_tx_seq = tx_store.next_tx_seq();
|
||||||
let mut start_tx_seq = if next_tx_seq > 0 {
|
let mut start_tx_seq = if next_tx_seq > 0 {
|
||||||
Some(next_tx_seq - 1)
|
Some(next_tx_seq - 1)
|
||||||
@ -640,15 +638,25 @@ impl LogManager {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
let mut last_tx_to_insert = None;
|
let mut last_tx_to_insert = None;
|
||||||
|
|
||||||
|
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
||||||
|
flow_db,
|
||||||
|
config.flow.merkle_node_cache_capacity,
|
||||||
|
log2_pow2(PORA_CHUNK_SIZE),
|
||||||
|
)?;
|
||||||
if let Some(last_tx_seq) = start_tx_seq {
|
if let Some(last_tx_seq) = start_tx_seq {
|
||||||
if !tx_store.check_tx_completed(last_tx_seq)? {
|
if !tx_store.check_tx_completed(last_tx_seq)? {
|
||||||
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
||||||
let last_tx = tx_store
|
let last_tx = tx_store
|
||||||
.get_tx_by_seq_number(last_tx_seq)?
|
.get_tx_by_seq_number(last_tx_seq)?
|
||||||
.expect("tx missing");
|
.expect("tx missing");
|
||||||
let mut current_len = initial_data.leaves();
|
let current_len = pora_chunks_merkle.leaves();
|
||||||
let expected_len =
|
let expected_len = sector_to_segment(
|
||||||
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
last_tx.start_entry_index
|
||||||
|
+ last_tx.num_entries() as u64
|
||||||
|
+ PORA_CHUNK_SIZE as u64
|
||||||
|
- 1,
|
||||||
|
);
|
||||||
match expected_len.cmp(&(current_len)) {
|
match expected_len.cmp(&(current_len)) {
|
||||||
Ordering::Less => {
|
Ordering::Less => {
|
||||||
bail!(
|
bail!(
|
||||||
@ -676,43 +684,33 @@ impl LogManager {
|
|||||||
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
||||||
);
|
);
|
||||||
if current_len > expected_len {
|
if current_len > expected_len {
|
||||||
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
|
pora_chunks_merkle.revert_to_leaves(expected_len)?;
|
||||||
{
|
|
||||||
current_len -= 1 << (subtree_depth - 1);
|
|
||||||
if current_len == expected_len {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
assert_eq!(current_len, expected_len);
|
||||||
"revert last tx with no-op: {} {}",
|
|
||||||
current_len, expected_len
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
assert_eq!(current_len, expected_len);
|
start_tx_seq = Some(previous_tx.seq);
|
||||||
while let Some((index, h)) = initial_data.known_leaves.pop() {
|
|
||||||
if index < current_len {
|
|
||||||
initial_data.known_leaves.push((index, h));
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
extra_leaves.push((index, h));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
start_tx_seq = Some(last_tx_seq - 1);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut pora_chunks_merkle =
|
|
||||||
Merkle::new_with_subtrees(initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq)?;
|
|
||||||
let last_chunk_merkle = match start_tx_seq {
|
let last_chunk_merkle = match start_tx_seq {
|
||||||
Some(tx_seq) => {
|
Some(tx_seq) => {
|
||||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
let tx = tx_store.get_tx_by_seq_number(tx_seq)?.expect("tx missing");
|
||||||
|
if (tx.start_entry_index() + tx.num_entries() as u64) % PORA_CHUNK_SIZE as u64 == 0
|
||||||
|
{
|
||||||
|
// The last chunk should be aligned, so it's empty.
|
||||||
|
Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None)
|
||||||
|
} else {
|
||||||
|
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves() - 1, tx_seq)?
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Initialize
|
// Initialize
|
||||||
None => Merkle::new_with_depth(vec![], 1, None),
|
None => {
|
||||||
|
pora_chunks_merkle.reset();
|
||||||
|
Merkle::new_with_depth(vec![], 1, None)
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
@ -722,10 +720,10 @@ impl LogManager {
|
|||||||
last_chunk_merkle.leaves(),
|
last_chunk_merkle.leaves(),
|
||||||
);
|
);
|
||||||
if last_chunk_merkle.leaves() != 0 {
|
if last_chunk_merkle.leaves() != 0 {
|
||||||
pora_chunks_merkle.append(*last_chunk_merkle.root());
|
pora_chunks_merkle.update_last(last_chunk_merkle.root());
|
||||||
// update the merkle root
|
|
||||||
pora_chunks_merkle.commit(start_tx_seq);
|
|
||||||
}
|
}
|
||||||
|
// update the merkle root
|
||||||
|
pora_chunks_merkle.commit(start_tx_seq);
|
||||||
let merkle = RwLock::new(MerkleManager {
|
let merkle = RwLock::new(MerkleManager {
|
||||||
pora_chunks_merkle,
|
pora_chunks_merkle,
|
||||||
last_chunk_merkle,
|
last_chunk_merkle,
|
||||||
@ -744,18 +742,7 @@ impl LogManager {
|
|||||||
log_manager.start_receiver(receiver, executor);
|
log_manager.start_receiver(receiver, executor);
|
||||||
|
|
||||||
if let Some(tx) = last_tx_to_insert {
|
if let Some(tx) = last_tx_to_insert {
|
||||||
log_manager.revert_to(tx.seq - 1)?;
|
|
||||||
log_manager.put_tx(tx)?;
|
log_manager.put_tx(tx)?;
|
||||||
let mut merkle = log_manager.merkle.write();
|
|
||||||
for (index, h) in extra_leaves {
|
|
||||||
if index < merkle.pora_chunks_merkle.leaves() {
|
|
||||||
merkle.pora_chunks_merkle.fill_leaf(index, h);
|
|
||||||
} else {
|
|
||||||
error!("out of range extra leaf: index={} hash={:?}", index, h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
assert!(extra_leaves.is_empty());
|
|
||||||
}
|
}
|
||||||
log_manager
|
log_manager
|
||||||
.merkle
|
.merkle
|
||||||
@ -894,16 +881,16 @@ impl LogManager {
|
|||||||
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
|
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
|
||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.append_subtree(1, *merkle.last_chunk_merkle.root())?;
|
.append_subtree(1, merkle.last_chunk_merkle.root())?;
|
||||||
} else {
|
} else {
|
||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.update_last(*merkle.last_chunk_merkle.root());
|
.update_last(merkle.last_chunk_merkle.root());
|
||||||
}
|
}
|
||||||
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
||||||
batch_root_map.insert(
|
batch_root_map.insert(
|
||||||
merkle.pora_chunks_merkle.leaves() - 1,
|
merkle.pora_chunks_merkle.leaves() - 1,
|
||||||
(*merkle.last_chunk_merkle.root(), 1),
|
(merkle.last_chunk_merkle.root(), 1),
|
||||||
);
|
);
|
||||||
self.complete_last_chunk_merkle(
|
self.complete_last_chunk_merkle(
|
||||||
merkle.pora_chunks_merkle.leaves() - 1,
|
merkle.pora_chunks_merkle.leaves() - 1,
|
||||||
@ -959,7 +946,7 @@ impl LogManager {
|
|||||||
.append_list(data_to_merkle_leaves(&pad_data)?);
|
.append_list(data_to_merkle_leaves(&pad_data)?);
|
||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.update_last(*merkle.last_chunk_merkle.root());
|
.update_last(merkle.last_chunk_merkle.root());
|
||||||
} else {
|
} else {
|
||||||
if last_chunk_pad != 0 {
|
if last_chunk_pad != 0 {
|
||||||
is_full_empty = false;
|
is_full_empty = false;
|
||||||
@ -969,10 +956,10 @@ impl LogManager {
|
|||||||
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
|
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
|
||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.update_last(*merkle.last_chunk_merkle.root());
|
.update_last(merkle.last_chunk_merkle.root());
|
||||||
root_map.insert(
|
root_map.insert(
|
||||||
merkle.pora_chunks_merkle.leaves() - 1,
|
merkle.pora_chunks_merkle.leaves() - 1,
|
||||||
(*merkle.last_chunk_merkle.root(), 1),
|
(merkle.last_chunk_merkle.root(), 1),
|
||||||
);
|
);
|
||||||
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
||||||
}
|
}
|
||||||
@ -983,7 +970,7 @@ impl LogManager {
|
|||||||
let data = pad_data[start_index * ENTRY_SIZE
|
let data = pad_data[start_index * ENTRY_SIZE
|
||||||
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
|
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
|
||||||
.to_vec();
|
.to_vec();
|
||||||
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
|
let root = Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
|
||||||
merkle.pora_chunks_merkle.append(root);
|
merkle.pora_chunks_merkle.append(root);
|
||||||
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
|
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
|
||||||
start_index += PORA_CHUNK_SIZE;
|
start_index += PORA_CHUNK_SIZE;
|
||||||
@ -1061,7 +1048,7 @@ impl LogManager {
|
|||||||
}
|
}
|
||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.update_last(*merkle.last_chunk_merkle.root());
|
.update_last(merkle.last_chunk_merkle.root());
|
||||||
}
|
}
|
||||||
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
|
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
|
||||||
for (chunk_index, chunk_root) in chunk_roots {
|
for (chunk_index, chunk_root) in chunk_roots {
|
||||||
|
@ -8,6 +8,7 @@ use ethereum_types::H256;
|
|||||||
use rand::random;
|
use rand::random;
|
||||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
|
||||||
use task_executor::test_utils::TestRuntime;
|
use task_executor::test_utils::TestRuntime;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -292,6 +292,9 @@ impl TransactionStore {
|
|||||||
match tx.start_entry_index.cmp(&last_chunk_start_index) {
|
match tx.start_entry_index.cmp(&last_chunk_start_index) {
|
||||||
cmp::Ordering::Greater => {
|
cmp::Ordering::Greater => {
|
||||||
tx_list.push((tx_seq, tx.merkle_nodes));
|
tx_list.push((tx_seq, tx.merkle_nodes));
|
||||||
|
if tx.start_entry_index >= last_chunk_start_index + PORA_CHUNK_SIZE as u64 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cmp::Ordering::Equal => {
|
cmp::Ordering::Equal => {
|
||||||
tx_list.push((tx_seq, tx.merkle_nodes));
|
tx_list.push((tx_seq, tx.merkle_nodes));
|
||||||
|
0
tests/crash_test.py
Normal file → Executable file
0
tests/crash_test.py
Normal file → Executable file
43
tests/node_cache_test.py
Executable file
43
tests/node_cache_test.py
Executable file
@ -0,0 +1,43 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from utility.submission import create_submission, submit_data
|
||||||
|
from utility.utils import wait_until
|
||||||
|
|
||||||
|
|
||||||
|
class NodeCacheTest(TestFramework):
|
||||||
|
def setup_params(self):
|
||||||
|
self.zgs_node_configs[0] = {
|
||||||
|
"merkle_node_cache_capacity": 1024,
|
||||||
|
}
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
client = self.nodes[0]
|
||||||
|
|
||||||
|
chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
|
||||||
|
submissions, data_root = create_submission(chunk_data)
|
||||||
|
self.contract.submit(submissions)
|
||||||
|
wait_until(lambda: self.contract.num_submissions() == 1)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
|
||||||
|
segment = submit_data(client, chunk_data)
|
||||||
|
self.log.info("segment: %s", len(segment))
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
self.stop_storage_node(0)
|
||||||
|
self.start_storage_node(0)
|
||||||
|
self.nodes[0].wait_for_rpc_connection()
|
||||||
|
|
||||||
|
chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
|
||||||
|
submissions, data_root = create_submission(chunk_data)
|
||||||
|
self.contract.submit(submissions)
|
||||||
|
wait_until(lambda: self.contract.num_submissions() == 2)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
|
||||||
|
segment = submit_data(client, chunk_data)
|
||||||
|
self.log.info("segment: %s", len(segment))
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
NodeCacheTest().main()
|
Loading…
Reference in New Issue
Block a user