mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 00:27:39 +00:00 
			
		
		
		
	Compare commits
	
		
			19 Commits
		
	
	
		
			e31dc973f2
			...
			f47ac0e03a
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					f47ac0e03a | ||
| 
						 | 
					daf261de8d | ||
| 
						 | 
					57d4d81ebf | ||
| 
						 | 
					55fb210da9 | ||
| 
						 | 
					cb7de7f60e | ||
| 
						 | 
					9a199ba714 | ||
| 
						 | 
					06ae2450d0 | ||
| 
						 | 
					a872480faa | ||
| 
						 | 
					a7a2f24784 | ||
| 
						 | 
					5dc7c52d41 | ||
| 
						 | 
					5a48c5ba94 | ||
| 
						 | 
					5409d8f418 | ||
| 
						 | 
					e338dca318 | ||
| 
						 | 
					57ba8f9bb7 | ||
| 
						 | 
					e15143e0a4 | ||
| 
						 | 
					9b68a8b7d7 | ||
| 
						 | 
					789eae5cc1 | ||
| 
						 | 
					2f9960e8e7 | ||
| 
						 | 
					506d234562 | 
							
								
								
									
										35
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										35
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@ -223,7 +223,9 @@ dependencies = [
 | 
			
		||||
 "eth2_ssz",
 | 
			
		||||
 "eth2_ssz_derive",
 | 
			
		||||
 "ethereum-types 0.14.1",
 | 
			
		||||
 "itertools 0.13.0",
 | 
			
		||||
 "lazy_static",
 | 
			
		||||
 "lru 0.12.5",
 | 
			
		||||
 "metrics",
 | 
			
		||||
 "once_cell",
 | 
			
		||||
 "serde",
 | 
			
		||||
@ -1674,7 +1676,7 @@ dependencies = [
 | 
			
		||||
 "hkdf",
 | 
			
		||||
 "lazy_static",
 | 
			
		||||
 "libp2p-core 0.30.2",
 | 
			
		||||
 "lru",
 | 
			
		||||
 "lru 0.7.8",
 | 
			
		||||
 "parking_lot 0.11.2",
 | 
			
		||||
 "rand 0.8.5",
 | 
			
		||||
 "rlp",
 | 
			
		||||
@ -2515,6 +2517,12 @@ version = "1.0.7"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "foldhash"
 | 
			
		||||
version = "0.1.3"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "foreign-types"
 | 
			
		||||
version = "0.3.2"
 | 
			
		||||
@ -2947,6 +2955,17 @@ dependencies = [
 | 
			
		||||
 "allocator-api2",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "hashbrown"
 | 
			
		||||
version = "0.15.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "allocator-api2",
 | 
			
		||||
 "equivalent",
 | 
			
		||||
 "foldhash",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "hashers"
 | 
			
		||||
version = "1.0.1"
 | 
			
		||||
@ -4118,7 +4137,7 @@ dependencies = [
 | 
			
		||||
 "libp2p-core 0.33.0",
 | 
			
		||||
 "libp2p-swarm",
 | 
			
		||||
 "log",
 | 
			
		||||
 "lru",
 | 
			
		||||
 "lru 0.7.8",
 | 
			
		||||
 "prost 0.10.4",
 | 
			
		||||
 "prost-build 0.10.4",
 | 
			
		||||
 "prost-codec",
 | 
			
		||||
@ -4651,6 +4670,15 @@ dependencies = [
 | 
			
		||||
 "hashbrown 0.12.3",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "lru"
 | 
			
		||||
version = "0.12.5"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "hashbrown 0.15.0",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "lru-cache"
 | 
			
		||||
version = "0.1.2"
 | 
			
		||||
@ -5020,7 +5048,7 @@ dependencies = [
 | 
			
		||||
 "lazy_static",
 | 
			
		||||
 "libp2p",
 | 
			
		||||
 "lighthouse_metrics",
 | 
			
		||||
 "lru",
 | 
			
		||||
 "lru 0.7.8",
 | 
			
		||||
 "parking_lot 0.12.3",
 | 
			
		||||
 "rand 0.8.5",
 | 
			
		||||
 "regex",
 | 
			
		||||
@ -7277,6 +7305,7 @@ dependencies = [
 | 
			
		||||
 "merkle_light",
 | 
			
		||||
 "merkle_tree",
 | 
			
		||||
 "metrics",
 | 
			
		||||
 "once_cell",
 | 
			
		||||
 "parking_lot 0.12.3",
 | 
			
		||||
 "rand 0.8.5",
 | 
			
		||||
 "rayon",
 | 
			
		||||
 | 
			
		||||
@ -36,4 +36,8 @@ eth2_ssz = { path = "version-meld/eth2_ssz" }
 | 
			
		||||
enr = { path = "version-meld/enr" }
 | 
			
		||||
 | 
			
		||||
[profile.bench.package.'storage']
 | 
			
		||||
debug = true
 | 
			
		||||
debug = true
 | 
			
		||||
 | 
			
		||||
[profile.dev]
 | 
			
		||||
# enabling debug_assertions will make node fail to start because of checks in `clap`.
 | 
			
		||||
debug-assertions = false
 | 
			
		||||
@ -15,3 +15,6 @@ tracing = "0.1.36"
 | 
			
		||||
once_cell = "1.19.0"
 | 
			
		||||
 | 
			
		||||
metrics = { workspace = true }
 | 
			
		||||
 | 
			
		||||
itertools = "0.13.0"
 | 
			
		||||
lru = "0.12.5"
 | 
			
		||||
@ -1,25 +1,30 @@
 | 
			
		||||
mod merkle_tree;
 | 
			
		||||
mod metrics;
 | 
			
		||||
mod node_manager;
 | 
			
		||||
mod proof;
 | 
			
		||||
mod sha3;
 | 
			
		||||
 | 
			
		||||
use anyhow::{anyhow, bail, Result};
 | 
			
		||||
use itertools::Itertools;
 | 
			
		||||
use std::cmp::Ordering;
 | 
			
		||||
use std::collections::{BTreeMap, HashMap};
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::marker::PhantomData;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use tracing::{trace, warn};
 | 
			
		||||
 | 
			
		||||
use crate::merkle_tree::MerkleTreeWrite;
 | 
			
		||||
pub use crate::merkle_tree::{
 | 
			
		||||
    Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
 | 
			
		||||
};
 | 
			
		||||
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction};
 | 
			
		||||
pub use proof::{Proof, RangeProof};
 | 
			
		||||
pub use sha3::Sha3Algorithm;
 | 
			
		||||
 | 
			
		||||
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
 | 
			
		||||
    /// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
 | 
			
		||||
    layers: Vec<Vec<E>>,
 | 
			
		||||
    node_manager: NodeManager<E>,
 | 
			
		||||
    /// Keep the delta nodes that can be used to construct a history tree.
 | 
			
		||||
    /// The key is the root node of that version.
 | 
			
		||||
    delta_nodes_map: BTreeMap<u64, DeltaNodes<E>>,
 | 
			
		||||
@ -37,13 +42,16 @@ pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
 | 
			
		||||
impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
 | 
			
		||||
        let mut merkle = Self {
 | 
			
		||||
            layers: vec![leaves],
 | 
			
		||||
            node_manager: NodeManager::new_dummy(),
 | 
			
		||||
            delta_nodes_map: BTreeMap::new(),
 | 
			
		||||
            root_to_tx_seq_map: HashMap::new(),
 | 
			
		||||
            min_depth: None,
 | 
			
		||||
            leaf_height,
 | 
			
		||||
            _a: Default::default(),
 | 
			
		||||
        };
 | 
			
		||||
        merkle.node_manager.start_transaction();
 | 
			
		||||
        merkle.node_manager.add_layer();
 | 
			
		||||
        merkle.node_manager.append_nodes(0, &leaves);
 | 
			
		||||
        if merkle.leaves() == 0 {
 | 
			
		||||
            if let Some(seq) = start_tx_seq {
 | 
			
		||||
                merkle.delta_nodes_map.insert(
 | 
			
		||||
@ -53,10 +61,12 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
                    },
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            merkle.node_manager.commit();
 | 
			
		||||
            return merkle;
 | 
			
		||||
        }
 | 
			
		||||
        // Reconstruct the whole tree.
 | 
			
		||||
        merkle.recompute(0, 0, None);
 | 
			
		||||
        merkle.node_manager.commit();
 | 
			
		||||
        // Commit the first version in memory.
 | 
			
		||||
        // TODO(zz): Check when the roots become available.
 | 
			
		||||
        merkle.commit(start_tx_seq);
 | 
			
		||||
@ -64,53 +74,44 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn new_with_subtrees(
 | 
			
		||||
        initial_data: MerkleTreeInitialData<E>,
 | 
			
		||||
        node_db: Arc<dyn NodeDatabase<E>>,
 | 
			
		||||
        node_cache_capacity: usize,
 | 
			
		||||
        leaf_height: usize,
 | 
			
		||||
        start_tx_seq: Option<u64>,
 | 
			
		||||
    ) -> Result<Self> {
 | 
			
		||||
        let mut merkle = Self {
 | 
			
		||||
            layers: vec![vec![]],
 | 
			
		||||
            node_manager: NodeManager::new(node_db, node_cache_capacity)?,
 | 
			
		||||
            delta_nodes_map: BTreeMap::new(),
 | 
			
		||||
            root_to_tx_seq_map: HashMap::new(),
 | 
			
		||||
            min_depth: None,
 | 
			
		||||
            leaf_height,
 | 
			
		||||
            _a: Default::default(),
 | 
			
		||||
        };
 | 
			
		||||
        if initial_data.subtree_list.is_empty() {
 | 
			
		||||
            if let Some(seq) = start_tx_seq {
 | 
			
		||||
                merkle.delta_nodes_map.insert(
 | 
			
		||||
                    seq,
 | 
			
		||||
                    DeltaNodes {
 | 
			
		||||
                        right_most_nodes: vec![],
 | 
			
		||||
                    },
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            return Ok(merkle);
 | 
			
		||||
        }
 | 
			
		||||
        merkle.append_subtree_list(initial_data.subtree_list)?;
 | 
			
		||||
        merkle.commit(start_tx_seq);
 | 
			
		||||
        for (index, h) in initial_data.known_leaves {
 | 
			
		||||
            merkle.fill_leaf(index, h);
 | 
			
		||||
        }
 | 
			
		||||
        for (layer_index, position, h) in initial_data.extra_mpt_nodes {
 | 
			
		||||
            // TODO: Delete duplicate nodes from DB.
 | 
			
		||||
            merkle.layers[layer_index][position] = h;
 | 
			
		||||
        if merkle.height() == 0 {
 | 
			
		||||
            merkle.node_manager.start_transaction();
 | 
			
		||||
            merkle.node_manager.add_layer();
 | 
			
		||||
            merkle.node_manager.commit();
 | 
			
		||||
        }
 | 
			
		||||
        Ok(merkle)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// This is only used for the last chunk, so `leaf_height` is always 0 so far.
 | 
			
		||||
    pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
 | 
			
		||||
        let mut node_manager = NodeManager::new_dummy();
 | 
			
		||||
        node_manager.start_transaction();
 | 
			
		||||
        if leaves.is_empty() {
 | 
			
		||||
            // Create an empty merkle tree with `depth`.
 | 
			
		||||
            let mut merkle = Self {
 | 
			
		||||
                layers: vec![vec![]; depth],
 | 
			
		||||
                // dummy node manager for the last chunk.
 | 
			
		||||
                node_manager,
 | 
			
		||||
                delta_nodes_map: BTreeMap::new(),
 | 
			
		||||
                root_to_tx_seq_map: HashMap::new(),
 | 
			
		||||
                min_depth: Some(depth),
 | 
			
		||||
                leaf_height: 0,
 | 
			
		||||
                _a: Default::default(),
 | 
			
		||||
            };
 | 
			
		||||
            for _ in 0..depth {
 | 
			
		||||
                merkle.node_manager.add_layer();
 | 
			
		||||
            }
 | 
			
		||||
            if let Some(seq) = start_tx_seq {
 | 
			
		||||
                merkle.delta_nodes_map.insert(
 | 
			
		||||
                    seq,
 | 
			
		||||
@ -119,20 +120,26 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
                    },
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            merkle.node_manager.commit();
 | 
			
		||||
            merkle
 | 
			
		||||
        } else {
 | 
			
		||||
            let mut layers = vec![vec![]; depth];
 | 
			
		||||
            layers[0] = leaves;
 | 
			
		||||
            let mut merkle = Self {
 | 
			
		||||
                layers,
 | 
			
		||||
                // dummy node manager for the last chunk.
 | 
			
		||||
                node_manager,
 | 
			
		||||
                delta_nodes_map: BTreeMap::new(),
 | 
			
		||||
                root_to_tx_seq_map: HashMap::new(),
 | 
			
		||||
                min_depth: Some(depth),
 | 
			
		||||
                leaf_height: 0,
 | 
			
		||||
                _a: Default::default(),
 | 
			
		||||
            };
 | 
			
		||||
            merkle.node_manager.add_layer();
 | 
			
		||||
            merkle.append_nodes(0, &leaves);
 | 
			
		||||
            for _ in 1..depth {
 | 
			
		||||
                merkle.node_manager.add_layer();
 | 
			
		||||
            }
 | 
			
		||||
            // Reconstruct the whole tree.
 | 
			
		||||
            merkle.recompute(0, 0, None);
 | 
			
		||||
            merkle.node_manager.commit();
 | 
			
		||||
            // Commit the first version in memory.
 | 
			
		||||
            merkle.commit(start_tx_seq);
 | 
			
		||||
            merkle
 | 
			
		||||
@ -145,21 +152,25 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        self.layers[0].push(new_leaf);
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        self.node_manager.push_node(0, new_leaf);
 | 
			
		||||
        self.recompute_after_append_leaves(self.leaves() - 1);
 | 
			
		||||
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
 | 
			
		||||
    pub fn append_list(&mut self, leaf_list: Vec<E>) {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if leaf_list.contains(&E::null()) {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        let start_index = self.leaves();
 | 
			
		||||
        self.layers[0].append(&mut leaf_list);
 | 
			
		||||
        self.node_manager.append_nodes(0, &leaf_list);
 | 
			
		||||
        self.recompute_after_append_leaves(start_index);
 | 
			
		||||
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND_LIST.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -174,11 +185,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            bail!("subtree_root is null");
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        let start_index = self.leaves();
 | 
			
		||||
        self.append_subtree_inner(subtree_depth, subtree_root)?;
 | 
			
		||||
        self.recompute_after_append_subtree(start_index, subtree_depth - 1);
 | 
			
		||||
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND_SUBTREE.update_since(start_time);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -188,12 +201,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            bail!("subtree_list contains null");
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        for (subtree_depth, subtree_root) in subtree_list {
 | 
			
		||||
            let start_index = self.leaves();
 | 
			
		||||
            self.append_subtree_inner(subtree_depth, subtree_root)?;
 | 
			
		||||
            self.recompute_after_append_subtree(start_index, subtree_depth - 1);
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::APPEND_SUBTREE_LIST.update_since(start_time);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -205,13 +221,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            // updating to null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        if self.layers[0].is_empty() {
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        if self.layer_len(0) == 0 {
 | 
			
		||||
            // Special case for the first data.
 | 
			
		||||
            self.layers[0].push(updated_leaf);
 | 
			
		||||
            self.push_node(0, updated_leaf);
 | 
			
		||||
        } else {
 | 
			
		||||
            *self.layers[0].last_mut().unwrap() = updated_leaf;
 | 
			
		||||
            self.update_node(0, self.layer_len(0) - 1, updated_leaf);
 | 
			
		||||
        }
 | 
			
		||||
        self.recompute_after_append_leaves(self.leaves() - 1);
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        metrics::UPDATE_LAST.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -221,13 +239,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    pub fn fill_leaf(&mut self, index: usize, leaf: E) {
 | 
			
		||||
        if leaf == E::null() {
 | 
			
		||||
            // fill leaf with null is not allowed.
 | 
			
		||||
        } else if self.layers[0][index] == E::null() {
 | 
			
		||||
            self.layers[0][index] = leaf;
 | 
			
		||||
        } else if self.node(0, index) == E::null() {
 | 
			
		||||
            self.node_manager.start_transaction();
 | 
			
		||||
            self.update_node(0, index, leaf);
 | 
			
		||||
            self.recompute_after_fill_leaves(index, index + 1);
 | 
			
		||||
        } else if self.layers[0][index] != leaf {
 | 
			
		||||
            self.node_manager.commit();
 | 
			
		||||
        } else if self.node(0, index) != leaf {
 | 
			
		||||
            panic!(
 | 
			
		||||
                "Fill with invalid leaf, index={} was={:?} get={:?}",
 | 
			
		||||
                index, self.layers[0][index], leaf
 | 
			
		||||
                index,
 | 
			
		||||
                self.node(0, index),
 | 
			
		||||
                leaf
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -240,6 +262,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        &mut self,
 | 
			
		||||
        proof: RangeProof<E>,
 | 
			
		||||
    ) -> Result<Vec<(usize, usize, E)>> {
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        let mut updated_nodes = Vec::new();
 | 
			
		||||
        let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
 | 
			
		||||
        if left_nodes.len() >= self.leaf_height {
 | 
			
		||||
@ -251,6 +274,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            updated_nodes
 | 
			
		||||
                .append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        Ok(updated_nodes)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -276,13 +300,16 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        if tx_merkle_nodes.is_empty() {
 | 
			
		||||
            return Ok(Vec::new());
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        let mut position_and_data =
 | 
			
		||||
            proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
 | 
			
		||||
        let start_index = (start_index >> self.leaf_height) as usize;
 | 
			
		||||
        for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
 | 
			
		||||
            *position += start_index >> i;
 | 
			
		||||
        }
 | 
			
		||||
        self.fill_with_proof(position_and_data)
 | 
			
		||||
        let updated_nodes = self.fill_with_proof(position_and_data)?;
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        Ok(updated_nodes)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
 | 
			
		||||
@ -294,28 +321,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        let mut updated_nodes = Vec::new();
 | 
			
		||||
        // A valid proof should not fail the following checks.
 | 
			
		||||
        for (i, (position, data)) in position_and_data.into_iter().enumerate() {
 | 
			
		||||
            let layer = &mut self.layers[i];
 | 
			
		||||
            if position > layer.len() {
 | 
			
		||||
            if position > self.layer_len(i) {
 | 
			
		||||
                bail!(
 | 
			
		||||
                    "proof position out of range, position={} layer.len()={}",
 | 
			
		||||
                    position,
 | 
			
		||||
                    layer.len()
 | 
			
		||||
                    self.layer_len(i)
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            if position == layer.len() {
 | 
			
		||||
            if position == self.layer_len(i) {
 | 
			
		||||
                // skip padding node.
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
            if layer[position] == E::null() {
 | 
			
		||||
                layer[position] = data.clone();
 | 
			
		||||
            if self.node(i, position) == E::null() {
 | 
			
		||||
                self.update_node(i, position, data.clone());
 | 
			
		||||
                updated_nodes.push((i, position, data))
 | 
			
		||||
            } else if layer[position] != data {
 | 
			
		||||
            } else if self.node(i, position) != data {
 | 
			
		||||
                // The last node in each layer may have changed in the tree.
 | 
			
		||||
                trace!(
 | 
			
		||||
                    "conflict data layer={} position={} tree_data={:?} proof_data={:?}",
 | 
			
		||||
                    i,
 | 
			
		||||
                    position,
 | 
			
		||||
                    layer[position],
 | 
			
		||||
                    self.node(i, position),
 | 
			
		||||
                    data
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
@ -331,8 +357,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        if position >= self.leaves() {
 | 
			
		||||
            bail!("Out of bound: position={} end={}", position, self.leaves());
 | 
			
		||||
        }
 | 
			
		||||
        if self.layers[0][position] != E::null() {
 | 
			
		||||
            Ok(Some(self.layers[0][position].clone()))
 | 
			
		||||
        if self.node(0, position) != E::null() {
 | 
			
		||||
            Ok(Some(self.node(0, position)))
 | 
			
		||||
        } else {
 | 
			
		||||
            // The leaf hash is unknown.
 | 
			
		||||
            Ok(None)
 | 
			
		||||
@ -380,10 +406,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            let mut right_most_nodes = Vec::new();
 | 
			
		||||
            for layer in &self.layers {
 | 
			
		||||
                right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone()));
 | 
			
		||||
            for height in 0..self.height() {
 | 
			
		||||
                let pos = self.layer_len(height) - 1;
 | 
			
		||||
                right_most_nodes.push((pos, self.node(height, pos)));
 | 
			
		||||
            }
 | 
			
		||||
            let root = self.root().clone();
 | 
			
		||||
            let root = self.root();
 | 
			
		||||
            self.delta_nodes_map
 | 
			
		||||
                .insert(tx_seq, DeltaNodes::new(right_most_nodes));
 | 
			
		||||
            self.root_to_tx_seq_map.insert(root, tx_seq);
 | 
			
		||||
@ -391,8 +418,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn before_extend_layer(&mut self, height: usize) {
 | 
			
		||||
        if height == self.layers.len() {
 | 
			
		||||
            self.layers.push(Vec::new());
 | 
			
		||||
        if height == self.height() {
 | 
			
		||||
            self.node_manager.add_layer()
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -409,7 +436,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Given a range of changed leaf nodes and recompute the tree.
 | 
			
		||||
    /// Since this tree is append-only, we always compute to the end.
 | 
			
		||||
    fn recompute(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        mut start_index: usize,
 | 
			
		||||
@ -419,42 +445,51 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        start_index >>= height;
 | 
			
		||||
        maybe_end_index = maybe_end_index.map(|end| end >> height);
 | 
			
		||||
        // Loop until we compute the new root and reach `tree_depth`.
 | 
			
		||||
        while self.layers[height].len() > 1 || height < self.layers.len() - 1 {
 | 
			
		||||
        while self.layer_len(height) > 1 || height < self.height() - 1 {
 | 
			
		||||
            let next_layer_start_index = start_index >> 1;
 | 
			
		||||
            if start_index % 2 == 1 {
 | 
			
		||||
                start_index -= 1;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let mut end_index = maybe_end_index.unwrap_or(self.layers[height].len());
 | 
			
		||||
            if end_index % 2 == 1 && end_index != self.layers[height].len() {
 | 
			
		||||
            let mut end_index = maybe_end_index.unwrap_or(self.layer_len(height));
 | 
			
		||||
            if end_index % 2 == 1 && end_index != self.layer_len(height) {
 | 
			
		||||
                end_index += 1;
 | 
			
		||||
            }
 | 
			
		||||
            let mut i = 0;
 | 
			
		||||
            let mut iter = self.layers[height][start_index..end_index].chunks_exact(2);
 | 
			
		||||
            let iter = self
 | 
			
		||||
                .node_manager
 | 
			
		||||
                .get_nodes(height, start_index, end_index)
 | 
			
		||||
                .chunks(2);
 | 
			
		||||
            // We cannot modify the parent layer while iterating the child layer,
 | 
			
		||||
            // so just keep the changes and update them later.
 | 
			
		||||
            let mut parent_update = Vec::new();
 | 
			
		||||
            while let Some([left, right]) = iter.next() {
 | 
			
		||||
                // If either left or right is null (unknown), we cannot compute the parent hash.
 | 
			
		||||
                // Note that if we are recompute a range of an existing tree,
 | 
			
		||||
                // we do not need to keep these possibly null parent. This is only saved
 | 
			
		||||
                // for the case of constructing a new tree from the leaves.
 | 
			
		||||
                let parent = if *left == E::null() || *right == E::null() {
 | 
			
		||||
                    E::null()
 | 
			
		||||
            for chunk_iter in &iter {
 | 
			
		||||
                let chunk: Vec<_> = chunk_iter.collect();
 | 
			
		||||
                if chunk.len() == 2 {
 | 
			
		||||
                    let left = &chunk[0];
 | 
			
		||||
                    let right = &chunk[1];
 | 
			
		||||
                    // If either left or right is null (unknown), we cannot compute the parent hash.
 | 
			
		||||
                    // Note that if we are recompute a range of an existing tree,
 | 
			
		||||
                    // we do not need to keep these possibly null parent. This is only saved
 | 
			
		||||
                    // for the case of constructing a new tree from the leaves.
 | 
			
		||||
                    let parent = if *left == E::null() || *right == E::null() {
 | 
			
		||||
                        E::null()
 | 
			
		||||
                    } else {
 | 
			
		||||
                        A::parent(left, right)
 | 
			
		||||
                    };
 | 
			
		||||
                    parent_update.push((next_layer_start_index + i, parent));
 | 
			
		||||
                    i += 1;
 | 
			
		||||
                } else {
 | 
			
		||||
                    A::parent(left, right)
 | 
			
		||||
                };
 | 
			
		||||
                parent_update.push((next_layer_start_index + i, parent));
 | 
			
		||||
                i += 1;
 | 
			
		||||
            }
 | 
			
		||||
            if let [r] = iter.remainder() {
 | 
			
		||||
                // Same as above.
 | 
			
		||||
                let parent = if *r == E::null() {
 | 
			
		||||
                    E::null()
 | 
			
		||||
                } else {
 | 
			
		||||
                    A::parent_single(r, height + self.leaf_height)
 | 
			
		||||
                };
 | 
			
		||||
                parent_update.push((next_layer_start_index + i, parent));
 | 
			
		||||
                    assert_eq!(chunk.len(), 1);
 | 
			
		||||
                    let r = &chunk[0];
 | 
			
		||||
                    // Same as above.
 | 
			
		||||
                    let parent = if *r == E::null() {
 | 
			
		||||
                        E::null()
 | 
			
		||||
                    } else {
 | 
			
		||||
                        A::parent_single(r, height + self.leaf_height)
 | 
			
		||||
                    };
 | 
			
		||||
                    parent_update.push((next_layer_start_index + i, parent));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if !parent_update.is_empty() {
 | 
			
		||||
                self.before_extend_layer(height + 1);
 | 
			
		||||
@ -463,27 +498,27 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            // we can just overwrite `last_changed_parent_index` with new values.
 | 
			
		||||
            let mut last_changed_parent_index = None;
 | 
			
		||||
            for (parent_index, parent) in parent_update {
 | 
			
		||||
                match parent_index.cmp(&self.layers[height + 1].len()) {
 | 
			
		||||
                match parent_index.cmp(&self.layer_len(height + 1)) {
 | 
			
		||||
                    Ordering::Less => {
 | 
			
		||||
                        // We do not overwrite with null.
 | 
			
		||||
                        if parent != E::null() {
 | 
			
		||||
                            if self.layers[height + 1][parent_index] == E::null()
 | 
			
		||||
                            if self.node(height + 1, parent_index) == E::null()
 | 
			
		||||
                                // The last node in a layer can be updated.
 | 
			
		||||
                                || (self.layers[height + 1][parent_index] != parent
 | 
			
		||||
                                    && parent_index == self.layers[height + 1].len() - 1)
 | 
			
		||||
                                || (self.node(height + 1, parent_index) != parent
 | 
			
		||||
                                    && parent_index == self.layer_len(height + 1) - 1)
 | 
			
		||||
                            {
 | 
			
		||||
                                self.layers[height + 1][parent_index] = parent;
 | 
			
		||||
                                self.update_node(height + 1, parent_index, parent);
 | 
			
		||||
                                last_changed_parent_index = Some(parent_index);
 | 
			
		||||
                            } else if self.layers[height + 1][parent_index] != parent {
 | 
			
		||||
                            } else if self.node(height + 1, parent_index) != parent {
 | 
			
		||||
                                // Recompute changes a node in the middle. This should be impossible
 | 
			
		||||
                                // if the inputs are valid.
 | 
			
		||||
                                panic!("Invalid append merkle tree! height={} index={} expected={:?} get={:?}",
 | 
			
		||||
                                       height + 1, parent_index, self.layers[height + 1][parent_index], parent);
 | 
			
		||||
                                       height + 1, parent_index, self.node(height + 1, parent_index), parent);
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    Ordering::Equal => {
 | 
			
		||||
                        self.layers[height + 1].push(parent);
 | 
			
		||||
                        self.push_node(height + 1, parent);
 | 
			
		||||
                        last_changed_parent_index = Some(parent_index);
 | 
			
		||||
                    }
 | 
			
		||||
                    Ordering::Greater => {
 | 
			
		||||
@ -514,10 +549,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        for height in 0..(subtree_depth - 1) {
 | 
			
		||||
            self.before_extend_layer(height);
 | 
			
		||||
            let subtree_layer_size = 1 << (subtree_depth - 1 - height);
 | 
			
		||||
            self.layers[height].append(&mut vec![E::null(); subtree_layer_size]);
 | 
			
		||||
            self.append_nodes(height, &vec![E::null(); subtree_layer_size]);
 | 
			
		||||
        }
 | 
			
		||||
        self.before_extend_layer(subtree_depth - 1);
 | 
			
		||||
        self.layers[subtree_depth - 1].push(subtree_root);
 | 
			
		||||
        self.push_node(subtree_depth - 1, subtree_root);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -528,23 +563,45 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn revert_to(&mut self, tx_seq: u64) -> Result<()> {
 | 
			
		||||
        if self.layers[0].is_empty() {
 | 
			
		||||
        if self.layer_len(0) == 0 {
 | 
			
		||||
            // Any previous state of an empty tree is always empty.
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        let delta_nodes = self
 | 
			
		||||
            .delta_nodes_map
 | 
			
		||||
            .get(&tx_seq)
 | 
			
		||||
            .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?;
 | 
			
		||||
            .ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
 | 
			
		||||
            .clone();
 | 
			
		||||
        // Dropping the upper layers that are not in the old merkle tree.
 | 
			
		||||
        self.layers.truncate(delta_nodes.right_most_nodes.len());
 | 
			
		||||
        for height in (delta_nodes.right_most_nodes.len()..self.height()).rev() {
 | 
			
		||||
            self.node_manager.truncate_layer(height);
 | 
			
		||||
        }
 | 
			
		||||
        for (height, (last_index, right_most_node)) in
 | 
			
		||||
            delta_nodes.right_most_nodes.iter().enumerate()
 | 
			
		||||
        {
 | 
			
		||||
            self.layers[height].truncate(*last_index + 1);
 | 
			
		||||
            self.layers[height][*last_index] = right_most_node.clone();
 | 
			
		||||
            self.node_manager.truncate_nodes(height, *last_index + 1);
 | 
			
		||||
            self.update_node(height, *last_index, right_most_node.clone())
 | 
			
		||||
        }
 | 
			
		||||
        self.clear_after(tx_seq);
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Revert to a tx_seq not in `delta_nodes_map`.
 | 
			
		||||
    // This is needed to revert the last unfinished tx after restart.
 | 
			
		||||
    pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> {
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        for height in (0..self.height()).rev() {
 | 
			
		||||
            let kept_nodes = leaves >> height;
 | 
			
		||||
            if kept_nodes == 0 {
 | 
			
		||||
                self.node_manager.truncate_layer(height);
 | 
			
		||||
            } else {
 | 
			
		||||
                self.node_manager.truncate_nodes(height, kept_nodes + 1);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        self.recompute_after_append_leaves(leaves);
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -564,17 +621,25 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            bail!("empty tree");
 | 
			
		||||
        }
 | 
			
		||||
        Ok(HistoryTree {
 | 
			
		||||
            layers: &self.layers,
 | 
			
		||||
            node_manager: &self.node_manager,
 | 
			
		||||
            delta_nodes,
 | 
			
		||||
            leaf_height: self.leaf_height,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn reset(&mut self) {
 | 
			
		||||
        self.layers = match self.min_depth {
 | 
			
		||||
            None => vec![vec![]],
 | 
			
		||||
            Some(depth) => vec![vec![]; depth],
 | 
			
		||||
        };
 | 
			
		||||
        self.node_manager.start_transaction();
 | 
			
		||||
        for height in (0..self.height()).rev() {
 | 
			
		||||
            self.node_manager.truncate_layer(height);
 | 
			
		||||
        }
 | 
			
		||||
        if let Some(depth) = self.min_depth {
 | 
			
		||||
            for _ in 0..depth {
 | 
			
		||||
                self.node_manager.add_layer();
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            self.node_manager.add_layer();
 | 
			
		||||
        }
 | 
			
		||||
        self.node_manager.commit();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn clear_after(&mut self, tx_seq: u64) {
 | 
			
		||||
@ -594,10 +659,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    fn first_known_root_at(&self, index: usize) -> (usize, E) {
 | 
			
		||||
        let mut height = 0;
 | 
			
		||||
        let mut index_in_layer = index;
 | 
			
		||||
        while height < self.layers.len() {
 | 
			
		||||
        while height < self.height() {
 | 
			
		||||
            let node = self.node(height, index_in_layer);
 | 
			
		||||
            if !node.is_null() {
 | 
			
		||||
                return (height + 1, node.clone());
 | 
			
		||||
                return (height + 1, node);
 | 
			
		||||
            }
 | 
			
		||||
            height += 1;
 | 
			
		||||
            index_in_layer /= 2;
 | 
			
		||||
@ -642,7 +707,7 @@ impl<E: HashElement> DeltaNodes<E> {
 | 
			
		||||
 | 
			
		||||
pub struct HistoryTree<'m, E: HashElement> {
 | 
			
		||||
    /// A reference to the global tree nodes.
 | 
			
		||||
    layers: &'m Vec<Vec<E>>,
 | 
			
		||||
    node_manager: &'m NodeManager<E>,
 | 
			
		||||
    /// The delta nodes that are difference from `layers`.
 | 
			
		||||
    /// This could be a reference, we just take ownership for convenience.
 | 
			
		||||
    delta_nodes: &'m DeltaNodes<E>,
 | 
			
		||||
@ -653,16 +718,18 @@ pub struct HistoryTree<'m, E: HashElement> {
 | 
			
		||||
impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A> {
 | 
			
		||||
    type E = E;
 | 
			
		||||
 | 
			
		||||
    fn node(&self, layer: usize, index: usize) -> &Self::E {
 | 
			
		||||
        &self.layers[layer][index]
 | 
			
		||||
    fn node(&self, layer: usize, index: usize) -> Self::E {
 | 
			
		||||
        self.node_manager
 | 
			
		||||
            .get_node(layer, index)
 | 
			
		||||
            .expect("index checked")
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn height(&self) -> usize {
 | 
			
		||||
        self.layers.len()
 | 
			
		||||
        self.node_manager.num_layers()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn layer_len(&self, layer_height: usize) -> usize {
 | 
			
		||||
        self.layers[layer_height].len()
 | 
			
		||||
        self.node_manager.layer_size(layer_height)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn padding_node(&self, height: usize) -> Self::E {
 | 
			
		||||
@ -672,10 +739,13 @@ impl<E: HashElement, A: Algorithm<E>> MerkleTreeRead for AppendMerkleTree<E, A>
 | 
			
		||||
 | 
			
		||||
impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
 | 
			
		||||
    type E = E;
 | 
			
		||||
    fn node(&self, layer: usize, index: usize) -> &Self::E {
 | 
			
		||||
    fn node(&self, layer: usize, index: usize) -> Self::E {
 | 
			
		||||
        match self.delta_nodes.get(layer, index).expect("range checked") {
 | 
			
		||||
            Some(node) if *node != E::null() => node,
 | 
			
		||||
            _ => &self.layers[layer][index],
 | 
			
		||||
            Some(node) if *node != E::null() => node.clone(),
 | 
			
		||||
            _ => self
 | 
			
		||||
                .node_manager
 | 
			
		||||
                .get_node(layer, index)
 | 
			
		||||
                .expect("index checked"),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -692,6 +762,22 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<E: HashElement, A: Algorithm<E>> MerkleTreeWrite for AppendMerkleTree<E, A> {
 | 
			
		||||
    type E = E;
 | 
			
		||||
 | 
			
		||||
    fn push_node(&mut self, layer: usize, node: Self::E) {
 | 
			
		||||
        self.node_manager.push_node(layer, node);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]) {
 | 
			
		||||
        self.node_manager.append_nodes(layer, nodes);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn update_node(&mut self, layer: usize, pos: usize, node: Self::E) {
 | 
			
		||||
        self.node_manager.add_node(layer, pos, node);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[macro_export]
 | 
			
		||||
macro_rules! ensure_eq {
 | 
			
		||||
    ($given:expr, $expected:expr) => {
 | 
			
		||||
@ -713,6 +799,7 @@ macro_rules! ensure_eq {
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests {
 | 
			
		||||
    use crate::merkle_tree::MerkleTreeRead;
 | 
			
		||||
 | 
			
		||||
    use crate::sha3::Sha3Algorithm;
 | 
			
		||||
    use crate::AppendMerkleTree;
 | 
			
		||||
    use ethereum_types::H256;
 | 
			
		||||
 | 
			
		||||
@ -49,7 +49,7 @@ pub trait Algorithm<E: HashElement> {
 | 
			
		||||
 | 
			
		||||
pub trait MerkleTreeRead {
 | 
			
		||||
    type E: HashElement;
 | 
			
		||||
    fn node(&self, layer: usize, index: usize) -> &Self::E;
 | 
			
		||||
    fn node(&self, layer: usize, index: usize) -> Self::E;
 | 
			
		||||
    fn height(&self) -> usize;
 | 
			
		||||
    fn layer_len(&self, layer_height: usize) -> usize;
 | 
			
		||||
    fn padding_node(&self, height: usize) -> Self::E;
 | 
			
		||||
@ -58,7 +58,7 @@ pub trait MerkleTreeRead {
 | 
			
		||||
        self.layer_len(0)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn root(&self) -> &Self::E {
 | 
			
		||||
    fn root(&self) -> Self::E {
 | 
			
		||||
        self.node(self.height() - 1, 0)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -70,16 +70,16 @@ pub trait MerkleTreeRead {
 | 
			
		||||
                self.leaves()
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
        if self.node(0, leaf_index) == &Self::E::null() {
 | 
			
		||||
        if self.node(0, leaf_index) == Self::E::null() {
 | 
			
		||||
            bail!("Not ready to generate proof for leaf_index={}", leaf_index);
 | 
			
		||||
        }
 | 
			
		||||
        if self.height() == 1 {
 | 
			
		||||
            return Proof::new(vec![self.root().clone(), self.root().clone()], vec![]);
 | 
			
		||||
            return Proof::new(vec![self.root(), self.root().clone()], vec![]);
 | 
			
		||||
        }
 | 
			
		||||
        let mut lemma: Vec<Self::E> = Vec::with_capacity(self.height()); // path + root
 | 
			
		||||
        let mut path: Vec<bool> = Vec::with_capacity(self.height() - 2); // path - 1
 | 
			
		||||
        let mut index_in_layer = leaf_index;
 | 
			
		||||
        lemma.push(self.node(0, leaf_index).clone());
 | 
			
		||||
        lemma.push(self.node(0, leaf_index));
 | 
			
		||||
        for height in 0..(self.height() - 1) {
 | 
			
		||||
            trace!(
 | 
			
		||||
                "gen_proof: height={} index={} hash={:?}",
 | 
			
		||||
@ -93,15 +93,15 @@ pub trait MerkleTreeRead {
 | 
			
		||||
                    // TODO: This can be skipped if the tree size is available in validation.
 | 
			
		||||
                    lemma.push(self.padding_node(height));
 | 
			
		||||
                } else {
 | 
			
		||||
                    lemma.push(self.node(height, index_in_layer + 1).clone());
 | 
			
		||||
                    lemma.push(self.node(height, index_in_layer + 1));
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                path.push(false);
 | 
			
		||||
                lemma.push(self.node(height, index_in_layer - 1).clone());
 | 
			
		||||
                lemma.push(self.node(height, index_in_layer - 1));
 | 
			
		||||
            }
 | 
			
		||||
            index_in_layer >>= 1;
 | 
			
		||||
        }
 | 
			
		||||
        lemma.push(self.root().clone());
 | 
			
		||||
        lemma.push(self.root());
 | 
			
		||||
        if lemma.contains(&Self::E::null()) {
 | 
			
		||||
            bail!(
 | 
			
		||||
                "Not enough data to generate proof, lemma={:?} path={:?}",
 | 
			
		||||
@ -130,6 +130,13 @@ pub trait MerkleTreeRead {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub trait MerkleTreeWrite {
 | 
			
		||||
    type E: HashElement;
 | 
			
		||||
    fn push_node(&mut self, layer: usize, node: Self::E);
 | 
			
		||||
    fn append_nodes(&mut self, layer: usize, nodes: &[Self::E]);
 | 
			
		||||
    fn update_node(&mut self, layer: usize, pos: usize, node: Self::E);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes
 | 
			
		||||
/// are `null`. Other intermediate nodes will be computed based on these known nodes.
 | 
			
		||||
pub struct MerkleTreeInitialData<E: HashElement> {
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										219
									
								
								common/append_merkle/src/node_manager.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										219
									
								
								common/append_merkle/src/node_manager.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,219 @@
 | 
			
		||||
use crate::HashElement;
 | 
			
		||||
use anyhow::Result;
 | 
			
		||||
use lru::LruCache;
 | 
			
		||||
use std::any::Any;
 | 
			
		||||
use std::num::NonZeroUsize;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tracing::error;
 | 
			
		||||
 | 
			
		||||
pub struct NodeManager<E: HashElement> {
 | 
			
		||||
    cache: LruCache<(usize, usize), E>,
 | 
			
		||||
    layer_size: Vec<usize>,
 | 
			
		||||
    db: Arc<dyn NodeDatabase<E>>,
 | 
			
		||||
    db_tx: Option<Box<dyn NodeTransaction<E>>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<E: HashElement> NodeManager<E> {
 | 
			
		||||
    pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Result<Self> {
 | 
			
		||||
        let mut layer = 0;
 | 
			
		||||
        let mut layer_size = Vec::new();
 | 
			
		||||
        while let Some(size) = db.get_layer_size(layer)? {
 | 
			
		||||
            layer_size.push(size);
 | 
			
		||||
            layer += 1;
 | 
			
		||||
        }
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
 | 
			
		||||
            layer_size,
 | 
			
		||||
            db,
 | 
			
		||||
            db_tx: None,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn new_dummy() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            cache: LruCache::unbounded(),
 | 
			
		||||
            layer_size: vec![],
 | 
			
		||||
            db: Arc::new(EmptyNodeDatabase {}),
 | 
			
		||||
            db_tx: None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn push_node(&mut self, layer: usize, node: E) {
 | 
			
		||||
        self.add_node(layer, self.layer_size[layer], node);
 | 
			
		||||
        self.set_layer_size(layer, self.layer_size[layer] + 1);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
 | 
			
		||||
        let mut pos = self.layer_size[layer];
 | 
			
		||||
        let mut saved_nodes = Vec::with_capacity(nodes.len());
 | 
			
		||||
        for node in nodes {
 | 
			
		||||
            self.cache.put((layer, pos), node.clone());
 | 
			
		||||
            saved_nodes.push((layer, pos, node));
 | 
			
		||||
            pos += 1;
 | 
			
		||||
        }
 | 
			
		||||
        self.set_layer_size(layer, pos);
 | 
			
		||||
        self.db_tx().save_node_list(&saved_nodes);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
 | 
			
		||||
        match self.cache.peek(&(layer, pos)) {
 | 
			
		||||
            Some(node) => Some(node.clone()),
 | 
			
		||||
            None => self.db.get_node(layer, pos).unwrap_or_else(|e| {
 | 
			
		||||
                error!("Failed to get node: {}", e);
 | 
			
		||||
                None
 | 
			
		||||
            }),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_nodes(&self, layer: usize, start_pos: usize, end_pos: usize) -> NodeIterator<E> {
 | 
			
		||||
        NodeIterator {
 | 
			
		||||
            node_manager: self,
 | 
			
		||||
            layer,
 | 
			
		||||
            start_pos,
 | 
			
		||||
            end_pos,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
 | 
			
		||||
        // No need to insert if the value is unchanged.
 | 
			
		||||
        if self.cache.get(&(layer, pos)) != Some(&node) {
 | 
			
		||||
            self.db_tx().save_node(layer, pos, &node);
 | 
			
		||||
            self.cache.put((layer, pos), node);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn add_layer(&mut self) {
 | 
			
		||||
        self.layer_size.push(0);
 | 
			
		||||
        let layer = self.layer_size.len() - 1;
 | 
			
		||||
        self.db_tx().save_layer_size(layer, 0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn layer_size(&self, layer: usize) -> usize {
 | 
			
		||||
        self.layer_size[layer]
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn num_layers(&self) -> usize {
 | 
			
		||||
        self.layer_size.len()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) {
 | 
			
		||||
        let mut removed_nodes = Vec::new();
 | 
			
		||||
        for pos in pos_end..self.layer_size[layer] {
 | 
			
		||||
            self.cache.pop(&(layer, pos));
 | 
			
		||||
            removed_nodes.push((layer, pos));
 | 
			
		||||
        }
 | 
			
		||||
        self.db_tx().remove_node_list(&removed_nodes);
 | 
			
		||||
        self.set_layer_size(layer, pos_end);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn truncate_layer(&mut self, layer: usize) {
 | 
			
		||||
        self.truncate_nodes(layer, 0);
 | 
			
		||||
        if layer == self.num_layers() - 1 {
 | 
			
		||||
            self.layer_size.pop();
 | 
			
		||||
            self.db_tx().remove_layer_size(layer);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn start_transaction(&mut self) {
 | 
			
		||||
        if self.db_tx.is_some() {
 | 
			
		||||
            error!("start new tx before commit");
 | 
			
		||||
            panic!("start new tx before commit");
 | 
			
		||||
        }
 | 
			
		||||
        self.db_tx = Some(self.db.start_transaction());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn commit(&mut self) {
 | 
			
		||||
        let tx = match self.db_tx.take() {
 | 
			
		||||
            Some(tx) => tx,
 | 
			
		||||
            None => {
 | 
			
		||||
                error!("db_tx is None");
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        if let Err(e) = self.db.commit(tx) {
 | 
			
		||||
            error!("Failed to commit db transaction: {}", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn db_tx(&mut self) -> &mut dyn NodeTransaction<E> {
 | 
			
		||||
        (*self.db_tx.as_mut().expect("tx checked")).as_mut()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn set_layer_size(&mut self, layer: usize, size: usize) {
 | 
			
		||||
        self.layer_size[layer] = size;
 | 
			
		||||
        self.db_tx().save_layer_size(layer, size);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct NodeIterator<'a, E: HashElement> {
 | 
			
		||||
    node_manager: &'a NodeManager<E>,
 | 
			
		||||
    layer: usize,
 | 
			
		||||
    start_pos: usize,
 | 
			
		||||
    end_pos: usize,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
 | 
			
		||||
    type Item = E;
 | 
			
		||||
 | 
			
		||||
    fn next(&mut self) -> Option<Self::Item> {
 | 
			
		||||
        if self.start_pos < self.end_pos {
 | 
			
		||||
            let r = self.node_manager.get_node(self.layer, self.start_pos);
 | 
			
		||||
            self.start_pos += 1;
 | 
			
		||||
            r
 | 
			
		||||
        } else {
 | 
			
		||||
            None
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub trait NodeDatabase<E: HashElement>: Send + Sync {
 | 
			
		||||
    fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
 | 
			
		||||
    fn get_layer_size(&self, layer: usize) -> Result<Option<usize>>;
 | 
			
		||||
    fn start_transaction(&self) -> Box<dyn NodeTransaction<E>>;
 | 
			
		||||
    fn commit(&self, tx: Box<dyn NodeTransaction<E>>) -> Result<()>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub trait NodeTransaction<E: HashElement>: Send + Sync {
 | 
			
		||||
    fn save_node(&mut self, layer: usize, pos: usize, node: &E);
 | 
			
		||||
    /// `nodes` are a list of tuples `(layer, pos, node)`.
 | 
			
		||||
    fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]);
 | 
			
		||||
    fn remove_node_list(&mut self, nodes: &[(usize, usize)]);
 | 
			
		||||
    fn save_layer_size(&mut self, layer: usize, size: usize);
 | 
			
		||||
    fn remove_layer_size(&mut self, layer: usize);
 | 
			
		||||
 | 
			
		||||
    fn into_any(self: Box<Self>) -> Box<dyn Any>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// A dummy database structure for in-memory merkle tree that will not read/write db.
 | 
			
		||||
pub struct EmptyNodeDatabase {}
 | 
			
		||||
pub struct EmptyNodeTransaction {}
 | 
			
		||||
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
 | 
			
		||||
    fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
 | 
			
		||||
        Ok(None)
 | 
			
		||||
    }
 | 
			
		||||
    fn get_layer_size(&self, _layer: usize) -> Result<Option<usize>> {
 | 
			
		||||
        Ok(None)
 | 
			
		||||
    }
 | 
			
		||||
    fn start_transaction(&self) -> Box<dyn NodeTransaction<E>> {
 | 
			
		||||
        Box::new(EmptyNodeTransaction {})
 | 
			
		||||
    }
 | 
			
		||||
    fn commit(&self, _tx: Box<dyn NodeTransaction<E>>) -> Result<()> {
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<E: HashElement> NodeTransaction<E> for EmptyNodeTransaction {
 | 
			
		||||
    fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {}
 | 
			
		||||
 | 
			
		||||
    fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {}
 | 
			
		||||
 | 
			
		||||
    fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {}
 | 
			
		||||
 | 
			
		||||
    fn save_layer_size(&mut self, _layer: usize, _size: usize) {}
 | 
			
		||||
 | 
			
		||||
    fn remove_layer_size(&mut self, _layer: usize) {}
 | 
			
		||||
 | 
			
		||||
    fn into_any(self: Box<Self>) -> Box<dyn Any> {
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -112,7 +112,7 @@ impl ClientBuilder {
 | 
			
		||||
    pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
 | 
			
		||||
        let executor = require!("sync", self, runtime_context).clone().executor;
 | 
			
		||||
        let store = Arc::new(
 | 
			
		||||
            LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
 | 
			
		||||
            LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
 | 
			
		||||
                .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion};
 | 
			
		||||
use std::net::IpAddr;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use storage::config::ShardConfig;
 | 
			
		||||
use storage::log_store::log_manager::LogConfig;
 | 
			
		||||
use storage::StorageConfig;
 | 
			
		||||
 | 
			
		||||
impl ZgsConfig {
 | 
			
		||||
@ -101,8 +102,11 @@ impl ZgsConfig {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn storage_config(&self) -> Result<StorageConfig, String> {
 | 
			
		||||
        let mut log_config = LogConfig::default();
 | 
			
		||||
        log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity;
 | 
			
		||||
        Ok(StorageConfig {
 | 
			
		||||
            db_dir: self.db_dir.clone().into(),
 | 
			
		||||
            log_config,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -60,6 +60,7 @@ build_config! {
 | 
			
		||||
    (prune_check_time_s, (u64), 60)
 | 
			
		||||
    (prune_batch_size, (usize), 16 * 1024)
 | 
			
		||||
    (prune_batch_wait_time_ms, (u64), 1000)
 | 
			
		||||
    (merkle_node_cache_capacity, (usize), 32 * 1024 * 1024)
 | 
			
		||||
 | 
			
		||||
    // misc
 | 
			
		||||
    (log_config_file, (String), "log_config".to_string())
 | 
			
		||||
 | 
			
		||||
@ -33,6 +33,7 @@ tokio = { version = "1.38.0", features = ["full"] }
 | 
			
		||||
task_executor = { path = "../../common/task_executor" }
 | 
			
		||||
lazy_static = "1.4.0"
 | 
			
		||||
metrics = { workspace = true }
 | 
			
		||||
once_cell = { version = "1.19.0", features = [] }
 | 
			
		||||
 | 
			
		||||
[dev-dependencies]
 | 
			
		||||
rand = "0.8.5"
 | 
			
		||||
 | 
			
		||||
@ -1,3 +1,4 @@
 | 
			
		||||
use crate::log_store::log_manager::LogConfig;
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
use ssz_derive::{Decode, Encode};
 | 
			
		||||
use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr};
 | 
			
		||||
@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config";
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    pub db_dir: PathBuf,
 | 
			
		||||
    pub log_config: LogConfig,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)]
 | 
			
		||||
 | 
			
		||||
@ -10,9 +10,11 @@ use crate::log_store::{
 | 
			
		||||
    metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
 | 
			
		||||
};
 | 
			
		||||
use crate::{try_option, ZgsKeyValueDB};
 | 
			
		||||
use any::Any;
 | 
			
		||||
use anyhow::{anyhow, bail, Result};
 | 
			
		||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
 | 
			
		||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
 | 
			
		||||
use itertools::Itertools;
 | 
			
		||||
use kvdb::DBTransaction;
 | 
			
		||||
use parking_lot::RwLock;
 | 
			
		||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
 | 
			
		||||
use ssz::{Decode, Encode};
 | 
			
		||||
@ -22,20 +24,20 @@ use std::collections::BTreeMap;
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use std::{cmp, mem};
 | 
			
		||||
use std::{any, cmp, mem};
 | 
			
		||||
use tracing::{debug, error, trace};
 | 
			
		||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
 | 
			
		||||
 | 
			
		||||
pub struct FlowStore {
 | 
			
		||||
    db: FlowDBStore,
 | 
			
		||||
    db: Arc<FlowDBStore>,
 | 
			
		||||
    seal_manager: SealTaskManager,
 | 
			
		||||
    config: FlowConfig,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl FlowStore {
 | 
			
		||||
    pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
 | 
			
		||||
    pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            db: FlowDBStore::new(db),
 | 
			
		||||
            db,
 | 
			
		||||
            seal_manager: Default::default(),
 | 
			
		||||
            config,
 | 
			
		||||
        }
 | 
			
		||||
@ -105,6 +107,7 @@ impl FlowStore {
 | 
			
		||||
#[derive(Clone, Debug)]
 | 
			
		||||
pub struct FlowConfig {
 | 
			
		||||
    pub batch_size: usize,
 | 
			
		||||
    pub merkle_node_cache_capacity: usize,
 | 
			
		||||
    pub shard_config: Arc<RwLock<ShardConfig>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -112,6 +115,8 @@ impl Default for FlowConfig {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            batch_size: SECTORS_PER_LOAD,
 | 
			
		||||
            // Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size.
 | 
			
		||||
            merkle_node_cache_capacity: 32 * 1024 * 1024,
 | 
			
		||||
            shard_config: Default::default(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -453,7 +458,7 @@ impl FlowDBStore {
 | 
			
		||||
        let mut expected_index = 0;
 | 
			
		||||
 | 
			
		||||
        let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
 | 
			
		||||
        let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
 | 
			
		||||
        let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
 | 
			
		||||
 | 
			
		||||
        for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
 | 
			
		||||
            let (index_bytes, root_bytes) = r?;
 | 
			
		||||
@ -683,3 +688,84 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
 | 
			
		||||
    let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
 | 
			
		||||
    Ok((layer_index, position))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn layer_size_key(layer: usize) -> Vec<u8> {
 | 
			
		||||
    let mut key = "layer_size".as_bytes().to_vec();
 | 
			
		||||
    key.extend_from_slice(&layer.to_be_bytes());
 | 
			
		||||
    key
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct NodeDBTransaction(DBTransaction);
 | 
			
		||||
 | 
			
		||||
impl NodeDatabase<DataRoot> for FlowDBStore {
 | 
			
		||||
    fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
 | 
			
		||||
        Ok(self
 | 
			
		||||
            .kvdb
 | 
			
		||||
            .get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
 | 
			
		||||
            .map(|v| DataRoot::from_slice(&v)))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
 | 
			
		||||
        match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? {
 | 
			
		||||
            Some(v) => Ok(Some(try_decode_usize(&v)?)),
 | 
			
		||||
            None => Ok(None),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn start_transaction(&self) -> Box<dyn NodeTransaction<DataRoot>> {
 | 
			
		||||
        Box::new(NodeDBTransaction(self.kvdb.transaction()))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
 | 
			
		||||
        let db_tx: Box<NodeDBTransaction> = tx
 | 
			
		||||
            .into_any()
 | 
			
		||||
            .downcast()
 | 
			
		||||
            .map_err(|e| anyhow!("downcast failed, e={:?}", e))?;
 | 
			
		||||
        self.kvdb.write(db_tx.0).map_err(Into::into)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl NodeTransaction<DataRoot> for NodeDBTransaction {
 | 
			
		||||
    fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) {
 | 
			
		||||
        self.0.put(
 | 
			
		||||
            COL_FLOW_MPT_NODES,
 | 
			
		||||
            &encode_mpt_node_key(layer, pos),
 | 
			
		||||
            node.as_bytes(),
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) {
 | 
			
		||||
        for (layer_index, position, data) in nodes {
 | 
			
		||||
            self.0.put(
 | 
			
		||||
                COL_FLOW_MPT_NODES,
 | 
			
		||||
                &encode_mpt_node_key(*layer_index, *position),
 | 
			
		||||
                data.as_bytes(),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn remove_node_list(&mut self, nodes: &[(usize, usize)]) {
 | 
			
		||||
        for (layer_index, position) in nodes {
 | 
			
		||||
            self.0.delete(
 | 
			
		||||
                COL_FLOW_MPT_NODES,
 | 
			
		||||
                &encode_mpt_node_key(*layer_index, *position),
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn save_layer_size(&mut self, layer: usize, size: usize) {
 | 
			
		||||
        self.0.put(
 | 
			
		||||
            COL_FLOW_MPT_NODES,
 | 
			
		||||
            &layer_size_key(layer),
 | 
			
		||||
            &size.to_be_bytes(),
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn remove_layer_size(&mut self, layer: usize) {
 | 
			
		||||
        self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn into_any(self: Box<Self>) -> Box<dyn Any> {
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -4,11 +4,10 @@ mod seal;
 | 
			
		||||
mod serde;
 | 
			
		||||
 | 
			
		||||
use ::serde::{Deserialize, Serialize};
 | 
			
		||||
use std::cmp::min;
 | 
			
		||||
 | 
			
		||||
use anyhow::Result;
 | 
			
		||||
use ethereum_types::H256;
 | 
			
		||||
use ssz_derive::{Decode, Encode};
 | 
			
		||||
use std::cmp::min;
 | 
			
		||||
 | 
			
		||||
use crate::log_store::log_manager::data_to_merkle_leaves;
 | 
			
		||||
use crate::try_option;
 | 
			
		||||
@ -206,7 +205,7 @@ impl EntryBatch {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        Ok(Some(
 | 
			
		||||
            *try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
 | 
			
		||||
            try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
 | 
			
		||||
        ))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,5 @@
 | 
			
		||||
use crate::config::ShardConfig;
 | 
			
		||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
 | 
			
		||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
 | 
			
		||||
use crate::log_store::tx_store::TransactionStore;
 | 
			
		||||
use crate::log_store::{
 | 
			
		||||
    FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
 | 
			
		||||
@ -11,6 +11,7 @@ use ethereum_types::H256;
 | 
			
		||||
use kvdb_rocksdb::{Database, DatabaseConfig};
 | 
			
		||||
use merkle_light::merkle::{log2_pow2, MerkleTree};
 | 
			
		||||
use merkle_tree::RawLeafSha3Algorithm;
 | 
			
		||||
use once_cell::sync::Lazy;
 | 
			
		||||
use parking_lot::RwLock;
 | 
			
		||||
use rayon::iter::ParallelIterator;
 | 
			
		||||
use rayon::prelude::ParallelSlice;
 | 
			
		||||
@ -49,6 +50,14 @@ pub const COL_NUM: u32 = 9;
 | 
			
		||||
// Process at most 1M entries (256MB) pad data at a time.
 | 
			
		||||
const PAD_MAX_SIZE: usize = 1 << 20;
 | 
			
		||||
 | 
			
		||||
static PAD_SEGMENT_ROOT: Lazy<H256> = Lazy::new(|| {
 | 
			
		||||
    Merkle::new(
 | 
			
		||||
        data_to_merkle_leaves(&[0; ENTRY_SIZE * PORA_CHUNK_SIZE]).unwrap(),
 | 
			
		||||
        0,
 | 
			
		||||
        None,
 | 
			
		||||
    )
 | 
			
		||||
    .root()
 | 
			
		||||
});
 | 
			
		||||
pub struct UpdateFlowMessage {
 | 
			
		||||
    pub root_map: BTreeMap<usize, (H256, usize)>,
 | 
			
		||||
    pub pad_data: usize,
 | 
			
		||||
@ -96,6 +105,7 @@ impl MerkleManager {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
 | 
			
		||||
        debug!("revert merkle tree {}", tx_seq);
 | 
			
		||||
        // Special case for reverting tx_seq == 0
 | 
			
		||||
        if tx_seq == u64::MAX {
 | 
			
		||||
            self.pora_chunks_merkle.reset();
 | 
			
		||||
@ -118,7 +128,7 @@ impl MerkleManager {
 | 
			
		||||
        if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
 | 
			
		||||
            self.last_chunk_merkle.append(H256::zero());
 | 
			
		||||
            self.pora_chunks_merkle
 | 
			
		||||
                .update_last(*self.last_chunk_merkle.root());
 | 
			
		||||
                .update_last(self.last_chunk_merkle.root());
 | 
			
		||||
        } else if self.last_chunk_merkle.leaves() != 0 {
 | 
			
		||||
            let last_chunk_start_index = self.last_chunk_start_index();
 | 
			
		||||
            let last_chunk_data = flow_store.get_available_entries(
 | 
			
		||||
@ -357,7 +367,7 @@ impl LogStoreWrite for LogManager {
 | 
			
		||||
        merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
 | 
			
		||||
        merkle.try_initialize(&self.flow_store)?;
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            Some(*merkle.last_chunk_merkle.root()),
 | 
			
		||||
            Some(merkle.last_chunk_merkle.root()),
 | 
			
		||||
            merkle
 | 
			
		||||
                .pora_chunks_merkle
 | 
			
		||||
                .leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
 | 
			
		||||
@ -579,7 +589,7 @@ impl LogStoreRead for LogManager {
 | 
			
		||||
    fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
 | 
			
		||||
        let merkle = self.merkle.read_recursive();
 | 
			
		||||
        Ok((
 | 
			
		||||
            *merkle.pora_chunks_merkle.root(),
 | 
			
		||||
            merkle.pora_chunks_merkle.root(),
 | 
			
		||||
            merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
 | 
			
		||||
        ))
 | 
			
		||||
    }
 | 
			
		||||
@ -628,13 +638,10 @@ impl LogManager {
 | 
			
		||||
        executor: task_executor::TaskExecutor,
 | 
			
		||||
    ) -> Result<Self> {
 | 
			
		||||
        let tx_store = TransactionStore::new(db.clone())?;
 | 
			
		||||
        let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
 | 
			
		||||
        let mut initial_data = flow_store.get_chunk_root_list()?;
 | 
			
		||||
        // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
 | 
			
		||||
        // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
 | 
			
		||||
        // and inserted later.
 | 
			
		||||
        let mut extra_leaves = Vec::new();
 | 
			
		||||
 | 
			
		||||
        let flow_db = Arc::new(FlowDBStore::new(db.clone()));
 | 
			
		||||
        let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
 | 
			
		||||
        // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
 | 
			
		||||
        // first and call `put_tx` later.
 | 
			
		||||
        let next_tx_seq = tx_store.next_tx_seq();
 | 
			
		||||
        let mut start_tx_seq = if next_tx_seq > 0 {
 | 
			
		||||
            Some(next_tx_seq - 1)
 | 
			
		||||
@ -642,15 +649,25 @@ impl LogManager {
 | 
			
		||||
            None
 | 
			
		||||
        };
 | 
			
		||||
        let mut last_tx_to_insert = None;
 | 
			
		||||
 | 
			
		||||
        let mut pora_chunks_merkle = Merkle::new_with_subtrees(
 | 
			
		||||
            flow_db,
 | 
			
		||||
            config.flow.merkle_node_cache_capacity,
 | 
			
		||||
            log2_pow2(PORA_CHUNK_SIZE),
 | 
			
		||||
        )?;
 | 
			
		||||
        if let Some(last_tx_seq) = start_tx_seq {
 | 
			
		||||
            if !tx_store.check_tx_completed(last_tx_seq)? {
 | 
			
		||||
                // Last tx not finalized, we need to check if its `put_tx` is completed.
 | 
			
		||||
                let last_tx = tx_store
 | 
			
		||||
                    .get_tx_by_seq_number(last_tx_seq)?
 | 
			
		||||
                    .expect("tx missing");
 | 
			
		||||
                let mut current_len = initial_data.leaves();
 | 
			
		||||
                let expected_len =
 | 
			
		||||
                    sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
 | 
			
		||||
                let current_len = pora_chunks_merkle.leaves();
 | 
			
		||||
                let expected_len = sector_to_segment(
 | 
			
		||||
                    last_tx.start_entry_index
 | 
			
		||||
                        + last_tx.num_entries() as u64
 | 
			
		||||
                        + PORA_CHUNK_SIZE as u64
 | 
			
		||||
                        - 1,
 | 
			
		||||
                );
 | 
			
		||||
                match expected_len.cmp(&(current_len)) {
 | 
			
		||||
                    Ordering::Less => {
 | 
			
		||||
                        bail!(
 | 
			
		||||
@ -678,43 +695,33 @@ impl LogManager {
 | 
			
		||||
                                previous_tx.start_entry_index + previous_tx.num_entries() as u64,
 | 
			
		||||
                            );
 | 
			
		||||
                            if current_len > expected_len {
 | 
			
		||||
                                while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
 | 
			
		||||
                                {
 | 
			
		||||
                                    current_len -= 1 << (subtree_depth - 1);
 | 
			
		||||
                                    if current_len == expected_len {
 | 
			
		||||
                                        break;
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                                pora_chunks_merkle.revert_to_leaves(expected_len)?;
 | 
			
		||||
                            } else {
 | 
			
		||||
                                warn!(
 | 
			
		||||
                                    "revert last tx with no-op: {} {}",
 | 
			
		||||
                                    current_len, expected_len
 | 
			
		||||
                                );
 | 
			
		||||
                                assert_eq!(current_len, expected_len);
 | 
			
		||||
                            }
 | 
			
		||||
                            assert_eq!(current_len, expected_len);
 | 
			
		||||
                            while let Some((index, h)) = initial_data.known_leaves.pop() {
 | 
			
		||||
                                if index < current_len {
 | 
			
		||||
                                    initial_data.known_leaves.push((index, h));
 | 
			
		||||
                                    break;
 | 
			
		||||
                                } else {
 | 
			
		||||
                                    extra_leaves.push((index, h));
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                            start_tx_seq = Some(last_tx_seq - 1);
 | 
			
		||||
                            start_tx_seq = Some(previous_tx.seq);
 | 
			
		||||
                        };
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let mut pora_chunks_merkle =
 | 
			
		||||
            Merkle::new_with_subtrees(initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq)?;
 | 
			
		||||
        let last_chunk_merkle = match start_tx_seq {
 | 
			
		||||
            Some(tx_seq) => {
 | 
			
		||||
                tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
 | 
			
		||||
                let tx = tx_store.get_tx_by_seq_number(tx_seq)?.expect("tx missing");
 | 
			
		||||
                if (tx.start_entry_index() + tx.num_entries() as u64) % PORA_CHUNK_SIZE as u64 == 0
 | 
			
		||||
                {
 | 
			
		||||
                    // The last chunk should be aligned, so it's empty.
 | 
			
		||||
                    Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None)
 | 
			
		||||
                } else {
 | 
			
		||||
                    tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves() - 1, tx_seq)?
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            // Initialize
 | 
			
		||||
            None => Merkle::new_with_depth(vec![], 1, None),
 | 
			
		||||
            None => {
 | 
			
		||||
                pora_chunks_merkle.reset();
 | 
			
		||||
                Merkle::new_with_depth(vec![], 1, None)
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        debug!(
 | 
			
		||||
@ -724,10 +731,10 @@ impl LogManager {
 | 
			
		||||
            last_chunk_merkle.leaves(),
 | 
			
		||||
        );
 | 
			
		||||
        if last_chunk_merkle.leaves() != 0 {
 | 
			
		||||
            pora_chunks_merkle.append(*last_chunk_merkle.root());
 | 
			
		||||
            // update the merkle root
 | 
			
		||||
            pora_chunks_merkle.commit(start_tx_seq);
 | 
			
		||||
            pora_chunks_merkle.update_last(last_chunk_merkle.root());
 | 
			
		||||
        }
 | 
			
		||||
        // update the merkle root
 | 
			
		||||
        pora_chunks_merkle.commit(start_tx_seq);
 | 
			
		||||
        let merkle = RwLock::new(MerkleManager {
 | 
			
		||||
            pora_chunks_merkle,
 | 
			
		||||
            last_chunk_merkle,
 | 
			
		||||
@ -746,18 +753,7 @@ impl LogManager {
 | 
			
		||||
        log_manager.start_receiver(receiver, executor);
 | 
			
		||||
 | 
			
		||||
        if let Some(tx) = last_tx_to_insert {
 | 
			
		||||
            log_manager.revert_to(tx.seq - 1)?;
 | 
			
		||||
            log_manager.put_tx(tx)?;
 | 
			
		||||
            let mut merkle = log_manager.merkle.write();
 | 
			
		||||
            for (index, h) in extra_leaves {
 | 
			
		||||
                if index < merkle.pora_chunks_merkle.leaves() {
 | 
			
		||||
                    merkle.pora_chunks_merkle.fill_leaf(index, h);
 | 
			
		||||
                } else {
 | 
			
		||||
                    error!("out of range extra leaf: index={} hash={:?}", index, h);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            assert!(extra_leaves.is_empty());
 | 
			
		||||
        }
 | 
			
		||||
        log_manager
 | 
			
		||||
            .merkle
 | 
			
		||||
@ -897,16 +893,16 @@ impl LogManager {
 | 
			
		||||
                    // `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
 | 
			
		||||
                    merkle
 | 
			
		||||
                        .pora_chunks_merkle
 | 
			
		||||
                        .append_subtree(1, *merkle.last_chunk_merkle.root())?;
 | 
			
		||||
                        .append_subtree(1, merkle.last_chunk_merkle.root())?;
 | 
			
		||||
                } else {
 | 
			
		||||
                    merkle
 | 
			
		||||
                        .pora_chunks_merkle
 | 
			
		||||
                        .update_last(*merkle.last_chunk_merkle.root());
 | 
			
		||||
                        .update_last(merkle.last_chunk_merkle.root());
 | 
			
		||||
                }
 | 
			
		||||
                if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
 | 
			
		||||
                    batch_root_map.insert(
 | 
			
		||||
                        merkle.pora_chunks_merkle.leaves() - 1,
 | 
			
		||||
                        (*merkle.last_chunk_merkle.root(), 1),
 | 
			
		||||
                        (merkle.last_chunk_merkle.root(), 1),
 | 
			
		||||
                    );
 | 
			
		||||
                    self.complete_last_chunk_merkle(
 | 
			
		||||
                        merkle.pora_chunks_merkle.leaves() - 1,
 | 
			
		||||
@ -965,7 +961,7 @@ impl LogManager {
 | 
			
		||||
                        .append_list(data_to_merkle_leaves(&pad_data)?);
 | 
			
		||||
                    merkle
 | 
			
		||||
                        .pora_chunks_merkle
 | 
			
		||||
                        .update_last(*merkle.last_chunk_merkle.root());
 | 
			
		||||
                        .update_last(merkle.last_chunk_merkle.root());
 | 
			
		||||
                } else {
 | 
			
		||||
                    if last_chunk_pad != 0 {
 | 
			
		||||
                        is_full_empty = false;
 | 
			
		||||
@ -975,10 +971,10 @@ impl LogManager {
 | 
			
		||||
                            .append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
 | 
			
		||||
                        merkle
 | 
			
		||||
                            .pora_chunks_merkle
 | 
			
		||||
                            .update_last(*merkle.last_chunk_merkle.root());
 | 
			
		||||
                            .update_last(merkle.last_chunk_merkle.root());
 | 
			
		||||
                        root_map.insert(
 | 
			
		||||
                            merkle.pora_chunks_merkle.leaves() - 1,
 | 
			
		||||
                            (*merkle.last_chunk_merkle.root(), 1),
 | 
			
		||||
                            (merkle.last_chunk_merkle.root(), 1),
 | 
			
		||||
                        );
 | 
			
		||||
                        completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
 | 
			
		||||
                    }
 | 
			
		||||
@ -986,12 +982,11 @@ impl LogManager {
 | 
			
		||||
                    // Pad with more complete chunks.
 | 
			
		||||
                    let mut start_index = last_chunk_pad / ENTRY_SIZE;
 | 
			
		||||
                    while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
 | 
			
		||||
                        let data = pad_data[start_index * ENTRY_SIZE
 | 
			
		||||
                            ..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
 | 
			
		||||
                            .to_vec();
 | 
			
		||||
                        let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
 | 
			
		||||
                        merkle.pora_chunks_merkle.append(root);
 | 
			
		||||
                        root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
 | 
			
		||||
                        merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
 | 
			
		||||
                        root_map.insert(
 | 
			
		||||
                            merkle.pora_chunks_merkle.leaves() - 1,
 | 
			
		||||
                            (*PAD_SEGMENT_ROOT, 1),
 | 
			
		||||
                        );
 | 
			
		||||
                        start_index += PORA_CHUNK_SIZE;
 | 
			
		||||
                    }
 | 
			
		||||
                    assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
 | 
			
		||||
@ -1069,7 +1064,7 @@ impl LogManager {
 | 
			
		||||
            }
 | 
			
		||||
            merkle
 | 
			
		||||
                .pora_chunks_merkle
 | 
			
		||||
                .update_last(*merkle.last_chunk_merkle.root());
 | 
			
		||||
                .update_last(merkle.last_chunk_merkle.root());
 | 
			
		||||
        }
 | 
			
		||||
        let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
 | 
			
		||||
        for (chunk_index, chunk_root) in chunk_roots {
 | 
			
		||||
 | 
			
		||||
@ -8,6 +8,7 @@ use ethereum_types::H256;
 | 
			
		||||
use rand::random;
 | 
			
		||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
 | 
			
		||||
use std::cmp;
 | 
			
		||||
 | 
			
		||||
use task_executor::test_utils::TestRuntime;
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
 | 
			
		||||
@ -301,6 +301,9 @@ impl TransactionStore {
 | 
			
		||||
            match tx.start_entry_index.cmp(&last_chunk_start_index) {
 | 
			
		||||
                cmp::Ordering::Greater => {
 | 
			
		||||
                    tx_list.push((tx_seq, tx.merkle_nodes));
 | 
			
		||||
                    if tx.start_entry_index >= last_chunk_start_index + PORA_CHUNK_SIZE as u64 {
 | 
			
		||||
                        break;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                cmp::Ordering::Equal => {
 | 
			
		||||
                    tx_list.push((tx_seq, tx.merkle_nodes));
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										0
									
								
								tests/crash_test.py
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							
							
						
						
									
										0
									
								
								tests/crash_test.py
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							
							
								
								
									
										43
									
								
								tests/node_cache_test.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										43
									
								
								tests/node_cache_test.py
									
									
									
									
									
										Executable file
									
								
							@ -0,0 +1,43 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from test_framework.test_framework import TestFramework
 | 
			
		||||
from utility.submission import create_submission, submit_data
 | 
			
		||||
from utility.utils import wait_until
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class NodeCacheTest(TestFramework):
 | 
			
		||||
    def setup_params(self):
 | 
			
		||||
        self.zgs_node_configs[0] = {
 | 
			
		||||
            "merkle_node_cache_capacity": 1024,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def run_test(self):
 | 
			
		||||
        client = self.nodes[0]
 | 
			
		||||
 | 
			
		||||
        chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
 | 
			
		||||
        submissions, data_root = create_submission(chunk_data)
 | 
			
		||||
        self.contract.submit(submissions)
 | 
			
		||||
        wait_until(lambda: self.contract.num_submissions() == 1)
 | 
			
		||||
        wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
 | 
			
		||||
 | 
			
		||||
        segment = submit_data(client, chunk_data)
 | 
			
		||||
        self.log.info("segment: %s", len(segment))
 | 
			
		||||
        wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
 | 
			
		||||
 | 
			
		||||
        self.stop_storage_node(0)
 | 
			
		||||
        self.start_storage_node(0)
 | 
			
		||||
        self.nodes[0].wait_for_rpc_connection()
 | 
			
		||||
 | 
			
		||||
        chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
 | 
			
		||||
        submissions, data_root = create_submission(chunk_data)
 | 
			
		||||
        self.contract.submit(submissions)
 | 
			
		||||
        wait_until(lambda: self.contract.num_submissions() == 2)
 | 
			
		||||
        wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
 | 
			
		||||
 | 
			
		||||
        segment = submit_data(client, chunk_data)
 | 
			
		||||
        self.log.info("segment: %s", len(segment))
 | 
			
		||||
        wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    NodeCacheTest().main()
 | 
			
		||||
@ -3,6 +3,7 @@
 | 
			
		||||
from test_framework.test_framework import TestFramework
 | 
			
		||||
from utility.utils import wait_until
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AutoRandomSyncV2Test(TestFramework):
 | 
			
		||||
    def setup_params(self):
 | 
			
		||||
        self.num_nodes = 4
 | 
			
		||||
@ -30,5 +31,6 @@ class AutoRandomSyncV2Test(TestFramework):
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    AutoRandomSyncV2Test().main()
 | 
			
		||||
 | 
			
		||||
@ -209,13 +209,11 @@ class TestFramework:
 | 
			
		||||
            if i > 0:
 | 
			
		||||
                time.sleep(1)
 | 
			
		||||
            node.start()
 | 
			
		||||
            node.wait_for_rpc_connection()
 | 
			
		||||
 | 
			
		||||
        self.log.info("Wait the zgs_node launch for %d seconds", self.launch_wait_seconds)
 | 
			
		||||
        time.sleep(self.launch_wait_seconds)
 | 
			
		||||
        
 | 
			
		||||
        for node in self.nodes:
 | 
			
		||||
            node.wait_for_rpc_connection()
 | 
			
		||||
 | 
			
		||||
    def add_arguments(self, parser: argparse.ArgumentParser):
 | 
			
		||||
        parser.add_argument(
 | 
			
		||||
            "--conflux-binary",
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user