mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
6 Commits
04ad257724
...
065b436fba
Author | SHA1 | Date | |
---|---|---|---|
![]() |
065b436fba | ||
![]() |
703d926a23 | ||
![]() |
6a4b246e8b | ||
![]() |
1c4257f692 | ||
![]() |
9c2f6e9d7d | ||
![]() |
19829f1def |
@ -16,7 +16,7 @@ use crate::merkle_tree::MerkleTreeWrite;
|
||||
pub use crate::merkle_tree::{
|
||||
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
||||
};
|
||||
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager};
|
||||
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction};
|
||||
pub use proof::{Proof, RangeProof};
|
||||
pub use sha3::Sha3Algorithm;
|
||||
|
||||
@ -47,6 +47,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
leaf_height,
|
||||
_a: Default::default(),
|
||||
};
|
||||
merkle.node_manager.start_transaction();
|
||||
merkle.node_manager.add_layer();
|
||||
merkle.node_manager.append_nodes(0, &leaves);
|
||||
if merkle.leaves() == 0 {
|
||||
@ -71,49 +72,33 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
pub fn new_with_subtrees(
|
||||
node_db: Arc<dyn NodeDatabase<E>>,
|
||||
node_cache_capacity: usize,
|
||||
initial_data: MerkleTreeInitialData<E>,
|
||||
leaf_height: usize,
|
||||
start_tx_seq: Option<u64>,
|
||||
) -> Result<Self> {
|
||||
let mut merkle = Self {
|
||||
node_manager: NodeManager::new(node_db, node_cache_capacity),
|
||||
node_manager: NodeManager::new(node_db, node_cache_capacity)?,
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: None,
|
||||
leaf_height,
|
||||
_a: Default::default(),
|
||||
};
|
||||
merkle.node_manager.add_layer();
|
||||
if initial_data.subtree_list.is_empty() {
|
||||
if let Some(seq) = start_tx_seq {
|
||||
merkle.delta_nodes_map.insert(
|
||||
seq,
|
||||
DeltaNodes {
|
||||
right_most_nodes: vec![],
|
||||
},
|
||||
);
|
||||
}
|
||||
return Ok(merkle);
|
||||
}
|
||||
merkle.append_subtree_list(initial_data.subtree_list)?;
|
||||
merkle.commit(start_tx_seq);
|
||||
for (index, h) in initial_data.known_leaves {
|
||||
merkle.fill_leaf(index, h);
|
||||
}
|
||||
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
|
||||
// TODO: Delete duplicate nodes from DB.
|
||||
merkle.update_node(layer_index, position, h);
|
||||
if merkle.height() == 0 {
|
||||
merkle.node_manager.start_transaction();
|
||||
merkle.node_manager.add_layer();
|
||||
merkle.node_manager.commit();
|
||||
}
|
||||
Ok(merkle)
|
||||
}
|
||||
|
||||
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
||||
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
||||
let mut node_manager = NodeManager::new_dummy();
|
||||
node_manager.start_transaction();
|
||||
if leaves.is_empty() {
|
||||
// Create an empty merkle tree with `depth`.
|
||||
let mut merkle = Self {
|
||||
// dummy node manager for the last chunk.
|
||||
node_manager: NodeManager::new_dummy(),
|
||||
node_manager,
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: Some(depth),
|
||||
@ -135,7 +120,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
} else {
|
||||
let mut merkle = Self {
|
||||
// dummy node manager for the last chunk.
|
||||
node_manager: NodeManager::new_dummy(),
|
||||
node_manager,
|
||||
delta_nodes_map: BTreeMap::new(),
|
||||
root_to_tx_seq_map: HashMap::new(),
|
||||
min_depth: Some(depth),
|
||||
@ -160,8 +145,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
self.node_manager.push_node(0, new_leaf);
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||
@ -169,9 +156,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let start_index = self.leaves();
|
||||
self.node_manager.append_nodes(0, &leaf_list);
|
||||
self.recompute_after_append_leaves(start_index);
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
/// Append a leaf list by providing their intermediate node hash.
|
||||
@ -184,9 +173,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_root is null");
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let start_index = self.leaves();
|
||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -195,11 +186,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_list contains null");
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
for (subtree_depth, subtree_root) in subtree_list {
|
||||
let start_index = self.leaves();
|
||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
}
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -210,6 +203,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// updating to null is not allowed.
|
||||
return;
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
if self.layer_len(0) == 0 {
|
||||
// Special case for the first data.
|
||||
self.push_node(0, updated_leaf);
|
||||
@ -217,6 +211,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
||||
}
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
/// Fill an unknown `null` leaf with its real value.
|
||||
@ -226,8 +221,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
if leaf == E::null() {
|
||||
// fill leaf with null is not allowed.
|
||||
} else if self.node(0, index) == E::null() {
|
||||
self.node_manager.start_transaction();
|
||||
self.update_node(0, index, leaf);
|
||||
self.recompute_after_fill_leaves(index, index + 1);
|
||||
self.node_manager.commit();
|
||||
} else if self.node(0, index) != leaf {
|
||||
panic!(
|
||||
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
||||
@ -246,6 +243,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
&mut self,
|
||||
proof: RangeProof<E>,
|
||||
) -> Result<Vec<(usize, usize, E)>> {
|
||||
self.node_manager.start_transaction();
|
||||
let mut updated_nodes = Vec::new();
|
||||
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
|
||||
if left_nodes.len() >= self.leaf_height {
|
||||
@ -257,6 +255,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
updated_nodes
|
||||
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
|
||||
}
|
||||
self.node_manager.commit();
|
||||
Ok(updated_nodes)
|
||||
}
|
||||
|
||||
@ -282,13 +281,16 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
if tx_merkle_nodes.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let mut position_and_data =
|
||||
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
||||
let start_index = (start_index >> self.leaf_height) as usize;
|
||||
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
||||
*position += start_index >> i;
|
||||
}
|
||||
self.fill_with_proof(position_and_data)
|
||||
let updated_nodes = self.fill_with_proof(position_and_data)?;
|
||||
self.node_manager.commit();
|
||||
Ok(updated_nodes)
|
||||
}
|
||||
|
||||
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
||||
@ -546,13 +548,14 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
// Any previous state of an empty tree is always empty.
|
||||
return Ok(());
|
||||
}
|
||||
self.node_manager.start_transaction();
|
||||
let delta_nodes = self
|
||||
.delta_nodes_map
|
||||
.get(&tx_seq)
|
||||
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
|
||||
.clone();
|
||||
// Dropping the upper layers that are not in the old merkle tree.
|
||||
for height in (delta_nodes.right_most_nodes.len()..(self.height() - 1)).rev() {
|
||||
for height in (delta_nodes.right_most_nodes.len()..self.height()).rev() {
|
||||
self.node_manager.truncate_layer(height);
|
||||
}
|
||||
for (height, (last_index, right_most_node)) in
|
||||
@ -562,6 +565,24 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.update_node(height, *last_index, right_most_node.clone())
|
||||
}
|
||||
self.clear_after(tx_seq);
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Revert to a tx_seq not in `delta_nodes_map`.
|
||||
// This is needed to revert the last unfinished tx after restart.
|
||||
pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> {
|
||||
self.node_manager.start_transaction();
|
||||
for height in (0..self.height()).rev() {
|
||||
let kept_nodes = leaves >> height;
|
||||
if kept_nodes == 0 {
|
||||
self.node_manager.truncate_layer(height);
|
||||
} else {
|
||||
self.node_manager.truncate_nodes(height, kept_nodes + 1);
|
||||
}
|
||||
}
|
||||
self.recompute_after_append_leaves(leaves);
|
||||
self.node_manager.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -588,6 +609,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.node_manager.start_transaction();
|
||||
for height in (0..self.height()).rev() {
|
||||
self.node_manager.truncate_layer(height);
|
||||
}
|
||||
@ -598,6 +620,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
} else {
|
||||
self.node_manager.add_layer();
|
||||
}
|
||||
self.node_manager.commit();
|
||||
}
|
||||
|
||||
fn clear_after(&mut self, tx_seq: u64) {
|
||||
@ -757,11 +780,10 @@ macro_rules! ensure_eq {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::merkle_tree::MerkleTreeRead;
|
||||
use crate::node_manager::EmptyNodeDatabase;
|
||||
|
||||
use crate::sha3::Sha3Algorithm;
|
||||
use crate::AppendMerkleTree;
|
||||
use ethereum_types::H256;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn test_proof() {
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::HashElement;
|
||||
use anyhow::Result;
|
||||
use lru::LruCache;
|
||||
use std::any::Any;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::error;
|
||||
@ -9,15 +10,23 @@ pub struct NodeManager<E: HashElement> {
|
||||
cache: LruCache<(usize, usize), E>,
|
||||
layer_size: Vec<usize>,
|
||||
db: Arc<dyn NodeDatabase<E>>,
|
||||
db_tx: Option<Box<dyn NodeTransaction<E>>>,
|
||||
}
|
||||
|
||||
impl<E: HashElement> NodeManager<E> {
|
||||
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Self {
|
||||
Self {
|
||||
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||
layer_size: vec![],
|
||||
db,
|
||||
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Result<Self> {
|
||||
let mut layer = 0;
|
||||
let mut layer_size = Vec::new();
|
||||
while let Some(size) = db.get_layer_size(layer)? {
|
||||
layer_size.push(size);
|
||||
layer += 1;
|
||||
}
|
||||
Ok(Self {
|
||||
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||
layer_size,
|
||||
db,
|
||||
db_tx: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_dummy() -> Self {
|
||||
@ -25,25 +34,25 @@ impl<E: HashElement> NodeManager<E> {
|
||||
cache: LruCache::unbounded(),
|
||||
layer_size: vec![],
|
||||
db: Arc::new(EmptyNodeDatabase {}),
|
||||
db_tx: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_node(&mut self, layer: usize, node: E) {
|
||||
self.add_node(layer, self.layer_size[layer], node);
|
||||
self.layer_size[layer] += 1;
|
||||
self.set_layer_size(layer, self.layer_size[layer] + 1);
|
||||
}
|
||||
|
||||
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
|
||||
let pos = &mut self.layer_size[layer];
|
||||
let mut pos = self.layer_size[layer];
|
||||
let mut saved_nodes = Vec::with_capacity(nodes.len());
|
||||
for node in nodes {
|
||||
self.cache.put((layer, *pos), node.clone());
|
||||
saved_nodes.push((layer, *pos, node));
|
||||
*pos += 1;
|
||||
}
|
||||
if let Err(e) = self.db.save_node_list(&saved_nodes) {
|
||||
error!("Failed to save node list: {:?}", e);
|
||||
self.cache.put((layer, pos), node.clone());
|
||||
saved_nodes.push((layer, pos, node));
|
||||
pos += 1;
|
||||
}
|
||||
self.set_layer_size(layer, pos);
|
||||
self.db_tx().save_node_list(&saved_nodes);
|
||||
}
|
||||
|
||||
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
||||
@ -66,14 +75,17 @@ impl<E: HashElement> NodeManager<E> {
|
||||
}
|
||||
|
||||
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
||||
if let Err(e) = self.db.save_node(layer, pos, &node) {
|
||||
error!("Failed to save node: {}", e);
|
||||
// No need to insert if the value is unchanged.
|
||||
if self.cache.get(&(layer, pos)) != Some(&node) {
|
||||
self.db_tx().save_node(layer, pos, &node);
|
||||
self.cache.put((layer, pos), node);
|
||||
}
|
||||
self.cache.put((layer, pos), node);
|
||||
}
|
||||
|
||||
pub fn add_layer(&mut self) {
|
||||
self.layer_size.push(0);
|
||||
let layer = self.layer_size.len() - 1;
|
||||
self.db_tx().save_layer_size(layer, 0);
|
||||
}
|
||||
|
||||
pub fn layer_size(&self, layer: usize) -> usize {
|
||||
@ -90,18 +102,46 @@ impl<E: HashElement> NodeManager<E> {
|
||||
self.cache.pop(&(layer, pos));
|
||||
removed_nodes.push((layer, pos));
|
||||
}
|
||||
if let Err(e) = self.db.remove_node_list(&removed_nodes) {
|
||||
error!("Failed to remove node list: {:?}", e);
|
||||
}
|
||||
self.layer_size[layer] = pos_end;
|
||||
self.db_tx().remove_node_list(&removed_nodes);
|
||||
self.set_layer_size(layer, pos_end);
|
||||
}
|
||||
|
||||
pub fn truncate_layer(&mut self, layer: usize) {
|
||||
self.truncate_nodes(layer, 0);
|
||||
if layer == self.num_layers() - 1 {
|
||||
self.layer_size.pop();
|
||||
self.db_tx().remove_layer_size(layer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_transaction(&mut self) {
|
||||
if self.db_tx.is_none() {
|
||||
error!("start new tx before commit");
|
||||
}
|
||||
self.db_tx = Some(self.db.start_transaction());
|
||||
}
|
||||
|
||||
pub fn commit(&mut self) {
|
||||
let tx = match self.db_tx.take() {
|
||||
Some(tx) => tx,
|
||||
None => {
|
||||
error!("db_tx is None");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(e) = self.db.commit(tx) {
|
||||
error!("Failed to commit db transaction: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
fn db_tx(&mut self) -> &mut dyn NodeTransaction<E> {
|
||||
(*self.db_tx.as_mut().expect("tx checked")).as_mut()
|
||||
}
|
||||
|
||||
fn set_layer_size(&mut self, layer: usize, size: usize) {
|
||||
self.layer_size[layer] = size;
|
||||
self.db_tx().save_layer_size(layer, size);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NodeIterator<'a, E: HashElement> {
|
||||
@ -127,28 +167,52 @@ impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
|
||||
|
||||
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
||||
fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>;
|
||||
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>>;
|
||||
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>>;
|
||||
fn commit(&self, tx: Box<dyn NodeTransaction<E>>) -> Result<()>;
|
||||
}
|
||||
|
||||
pub trait NodeTransaction<E: HashElement>: Send + Sync {
|
||||
fn save_node(&mut self, layer: usize, pos: usize, node: &E);
|
||||
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
||||
fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>;
|
||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>;
|
||||
fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]);
|
||||
fn remove_node_list(&mut self, nodes: &[(usize, usize)]);
|
||||
fn save_layer_size(&mut self, layer: usize, size: usize);
|
||||
fn remove_layer_size(&mut self, layer: usize);
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any>;
|
||||
}
|
||||
|
||||
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
||||
pub struct EmptyNodeDatabase {}
|
||||
pub struct EmptyNodeTransaction {}
|
||||
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
||||
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> {
|
||||
Ok(())
|
||||
fn get_layer_size(&self, _layer: usize) -> Result<Option<usize>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> {
|
||||
Ok(())
|
||||
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>> {
|
||||
Box::new(EmptyNodeTransaction {})
|
||||
}
|
||||
|
||||
fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> {
|
||||
fn commit(&self, _tx: Box<dyn NodeTransaction<E>>) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: HashElement> NodeTransaction<E> for EmptyNodeTransaction {
|
||||
fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {}
|
||||
|
||||
fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {}
|
||||
|
||||
fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {}
|
||||
|
||||
fn save_layer_size(&mut self, _layer: usize, _size: usize) {}
|
||||
|
||||
fn remove_layer_size(&mut self, _layer: usize) {}
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
@ -9,9 +9,11 @@ use crate::log_store::log_manager::{
|
||||
};
|
||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
use any::Any;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||
use itertools::Itertools;
|
||||
use kvdb::DBTransaction;
|
||||
use parking_lot::RwLock;
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||
use ssz::{Decode, Encode};
|
||||
@ -20,7 +22,7 @@ use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::{cmp, mem};
|
||||
use std::{any, cmp, mem};
|
||||
use tracing::{debug, error, trace};
|
||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||
|
||||
@ -670,6 +672,14 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
||||
Ok((layer_index, position))
|
||||
}
|
||||
|
||||
fn layer_size_key(layer: usize) -> Vec<u8> {
|
||||
let mut key = "layer_size".as_bytes().to_vec();
|
||||
key.extend_from_slice(&layer.to_be_bytes());
|
||||
key
|
||||
}
|
||||
|
||||
pub struct NodeDBTransaction(DBTransaction);
|
||||
|
||||
impl NodeDatabase<DataRoot> for FlowDBStore {
|
||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
||||
Ok(self
|
||||
@ -678,36 +688,67 @@ impl NodeDatabase<DataRoot> for FlowDBStore {
|
||||
.map(|v| DataRoot::from_slice(&v)))
|
||||
}
|
||||
|
||||
fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
tx.put(
|
||||
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
|
||||
match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? {
|
||||
Some(v) => Ok(Some(try_decode_usize(&v)?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn start_transaction(&self) -> Box<dyn NodeTransaction<DataRoot>> {
|
||||
Box::new(NodeDBTransaction(self.kvdb.transaction()))
|
||||
}
|
||||
|
||||
fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
|
||||
let db_tx: Box<NodeDBTransaction> = tx
|
||||
.into_any()
|
||||
.downcast()
|
||||
.map_err(|e| anyhow!("downcast failed, e={:?}", e))?;
|
||||
self.kvdb.write(db_tx.0).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeTransaction<DataRoot> for NodeDBTransaction {
|
||||
fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) {
|
||||
self.0.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(layer, pos),
|
||||
node.as_bytes(),
|
||||
);
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) {
|
||||
for (layer_index, position, data) in nodes {
|
||||
tx.put(
|
||||
self.0.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(*layer_index, *position),
|
||||
data.as_bytes(),
|
||||
);
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
fn remove_node_list(&mut self, nodes: &[(usize, usize)]) {
|
||||
for (layer_index, position) in nodes {
|
||||
tx.delete(
|
||||
self.0.delete(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(*layer_index, *position),
|
||||
);
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn save_layer_size(&mut self, layer: usize, size: usize) {
|
||||
self.0.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&layer_size_key(layer),
|
||||
&size.to_be_bytes(),
|
||||
);
|
||||
}
|
||||
|
||||
fn remove_layer_size(&mut self, layer: usize) {
|
||||
self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer));
|
||||
}
|
||||
|
||||
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
@ -629,12 +629,8 @@ impl LogManager {
|
||||
let tx_store = TransactionStore::new(db.clone())?;
|
||||
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
||||
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
||||
let mut initial_data = flow_store.get_chunk_root_list()?;
|
||||
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
||||
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
||||
// and inserted later.
|
||||
let mut extra_leaves = Vec::new();
|
||||
|
||||
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||
// first and call `put_tx` later.
|
||||
let next_tx_seq = tx_store.next_tx_seq();
|
||||
let mut start_tx_seq = if next_tx_seq > 0 {
|
||||
Some(next_tx_seq - 1)
|
||||
@ -642,13 +638,19 @@ impl LogManager {
|
||||
None
|
||||
};
|
||||
let mut last_tx_to_insert = None;
|
||||
|
||||
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
||||
flow_db,
|
||||
config.flow.merkle_node_cache_capacity,
|
||||
log2_pow2(PORA_CHUNK_SIZE),
|
||||
)?;
|
||||
if let Some(last_tx_seq) = start_tx_seq {
|
||||
if !tx_store.check_tx_completed(last_tx_seq)? {
|
||||
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
||||
let last_tx = tx_store
|
||||
.get_tx_by_seq_number(last_tx_seq)?
|
||||
.expect("tx missing");
|
||||
let mut current_len = initial_data.leaves();
|
||||
let current_len = pora_chunks_merkle.leaves();
|
||||
let expected_len =
|
||||
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
||||
match expected_len.cmp(&(current_len)) {
|
||||
@ -678,42 +680,15 @@ impl LogManager {
|
||||
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
||||
);
|
||||
if current_len > expected_len {
|
||||
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
|
||||
{
|
||||
current_len -= 1 << (subtree_depth - 1);
|
||||
if current_len == expected_len {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"revert last tx with no-op: {} {}",
|
||||
current_len, expected_len
|
||||
);
|
||||
pora_chunks_merkle.revert_to_leaves(expected_len)?;
|
||||
}
|
||||
assert_eq!(current_len, expected_len);
|
||||
while let Some((index, h)) = initial_data.known_leaves.pop() {
|
||||
if index < current_len {
|
||||
initial_data.known_leaves.push((index, h));
|
||||
break;
|
||||
} else {
|
||||
extra_leaves.push((index, h));
|
||||
}
|
||||
}
|
||||
start_tx_seq = Some(last_tx_seq - 1);
|
||||
start_tx_seq = Some(previous_tx.seq);
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
||||
flow_db,
|
||||
config.flow.merkle_node_cache_capacity,
|
||||
initial_data,
|
||||
log2_pow2(PORA_CHUNK_SIZE),
|
||||
start_tx_seq,
|
||||
)?;
|
||||
let last_chunk_merkle = match start_tx_seq {
|
||||
Some(tx_seq) => {
|
||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
||||
@ -751,18 +726,7 @@ impl LogManager {
|
||||
log_manager.start_receiver(receiver, executor);
|
||||
|
||||
if let Some(tx) = last_tx_to_insert {
|
||||
log_manager.revert_to(tx.seq - 1)?;
|
||||
log_manager.put_tx(tx)?;
|
||||
let mut merkle = log_manager.merkle.write();
|
||||
for (index, h) in extra_leaves {
|
||||
if index < merkle.pora_chunks_merkle.leaves() {
|
||||
merkle.pora_chunks_merkle.fill_leaf(index, h);
|
||||
} else {
|
||||
error!("out of range extra leaf: index={} hash={:?}", index, h);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert!(extra_leaves.is_empty());
|
||||
}
|
||||
log_manager
|
||||
.merkle
|
||||
|
@ -3,14 +3,12 @@ use crate::log_store::log_manager::{
|
||||
PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
|
||||
use append_merkle::{
|
||||
Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm,
|
||||
};
|
||||
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||
use ethereum_types::H256;
|
||||
use rand::random;
|
||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||
use std::cmp;
|
||||
use std::sync::Arc;
|
||||
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
|
||||
#[test]
|
||||
|
43
tests/node_cache_test.py
Executable file
43
tests/node_cache_test.py
Executable file
@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.submission import create_submission, submit_data
|
||||
from utility.utils import wait_until
|
||||
|
||||
|
||||
class NodeCacheTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.zgs_node_configs[0] = {
|
||||
"merkle_node_cache_capacity": 1024,
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
client = self.nodes[0]
|
||||
|
||||
chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
wait_until(lambda: self.contract.num_submissions() == 1)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||
|
||||
segment = submit_data(client, chunk_data)
|
||||
self.log.info("segment: %s", len(segment))
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
self.stop_storage_node(0)
|
||||
self.start_storage_node(0)
|
||||
self.nodes[0].wait_for_rpc_connection()
|
||||
|
||||
chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
wait_until(lambda: self.contract.num_submissions() == 2)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||
|
||||
segment = submit_data(client, chunk_data)
|
||||
self.log.info("segment: %s", len(segment))
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
NodeCacheTest().main()
|
Loading…
Reference in New Issue
Block a user