diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index eb02147..70faf2e 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -16,7 +16,7 @@ use crate::merkle_tree::MerkleTreeWrite; pub use crate::merkle_tree::{ Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES, }; -pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager}; +pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction}; pub use proof::{Proof, RangeProof}; pub use sha3::Sha3Algorithm; @@ -160,8 +160,10 @@ impl> AppendMerkleTree { // appending null is not allowed. return; } + self.node_manager.start_transaction(); self.node_manager.push_node(0, new_leaf); self.recompute_after_append_leaves(self.leaves() - 1); + self.node_manager.commit(); } pub fn append_list(&mut self, leaf_list: Vec) { @@ -169,9 +171,11 @@ impl> AppendMerkleTree { // appending null is not allowed. return; } + self.node_manager.start_transaction(); let start_index = self.leaves(); self.node_manager.append_nodes(0, &leaf_list); self.recompute_after_append_leaves(start_index); + self.node_manager.commit(); } /// Append a leaf list by providing their intermediate node hash. @@ -184,9 +188,11 @@ impl> AppendMerkleTree { // appending null is not allowed. bail!("subtree_root is null"); } + self.node_manager.start_transaction(); let start_index = self.leaves(); self.append_subtree_inner(subtree_depth, subtree_root)?; self.recompute_after_append_subtree(start_index, subtree_depth - 1); + self.node_manager.commit(); Ok(()) } @@ -195,11 +201,13 @@ impl> AppendMerkleTree { // appending null is not allowed. bail!("subtree_list contains null"); } + self.node_manager.start_transaction(); for (subtree_depth, subtree_root) in subtree_list { let start_index = self.leaves(); self.append_subtree_inner(subtree_depth, subtree_root)?; self.recompute_after_append_subtree(start_index, subtree_depth - 1); } + self.node_manager.commit(); Ok(()) } @@ -210,6 +218,7 @@ impl> AppendMerkleTree { // updating to null is not allowed. return; } + self.node_manager.start_transaction(); if self.layer_len(0) == 0 { // Special case for the first data. self.push_node(0, updated_leaf); @@ -217,6 +226,7 @@ impl> AppendMerkleTree { self.update_node(0, self.layer_len(0) - 1, updated_leaf); } self.recompute_after_append_leaves(self.leaves() - 1); + self.node_manager.commit(); } /// Fill an unknown `null` leaf with its real value. @@ -226,8 +236,10 @@ impl> AppendMerkleTree { if leaf == E::null() { // fill leaf with null is not allowed. } else if self.node(0, index) == E::null() { + self.node_manager.start_transaction(); self.update_node(0, index, leaf); self.recompute_after_fill_leaves(index, index + 1); + self.node_manager.commit(); } else if self.node(0, index) != leaf { panic!( "Fill with invalid leaf, index={} was={:?} get={:?}", @@ -246,18 +258,26 @@ impl> AppendMerkleTree { &mut self, proof: RangeProof, ) -> Result> { - self.fill_with_proof( - proof - .left_proof - .proof_nodes_in_tree() - .split_off(self.leaf_height), - )?; - self.fill_with_proof( - proof - .right_proof - .proof_nodes_in_tree() - .split_off(self.leaf_height), - ) + self.node_manager.start_transaction(); + let mut updated_nodes = Vec::new(); + updated_nodes.append( + &mut self.fill_with_proof( + proof + .left_proof + .proof_nodes_in_tree() + .split_off(self.leaf_height), + )?, + ); + updated_nodes.append( + &mut self.fill_with_proof( + proof + .right_proof + .proof_nodes_in_tree() + .split_off(self.leaf_height), + )?, + ); + self.node_manager.commit(); + Ok(updated_nodes) } pub fn fill_with_file_proof( @@ -282,13 +302,16 @@ impl> AppendMerkleTree { if tx_merkle_nodes.is_empty() { return Ok(Vec::new()); } + self.node_manager.start_transaction(); let mut position_and_data = proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size); let start_index = (start_index >> self.leaf_height) as usize; for (i, (position, _)) in position_and_data.iter_mut().enumerate() { *position += start_index >> i; } - self.fill_with_proof(position_and_data) + let updated_nodes = self.fill_with_proof(position_and_data)?; + self.node_manager.commit(); + Ok(updated_nodes) } /// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof. @@ -757,11 +780,10 @@ macro_rules! ensure_eq { #[cfg(test)] mod tests { use crate::merkle_tree::MerkleTreeRead; - use crate::node_manager::EmptyNodeDatabase; + use crate::sha3::Sha3Algorithm; use crate::AppendMerkleTree; use ethereum_types::H256; - use std::sync::Arc; #[test] fn test_proof() { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 259e812..a583467 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -9,6 +9,7 @@ pub struct NodeManager { cache: LruCache<(usize, usize), E>, layer_size: Vec, db: Arc>, + db_tx: Option>>, } impl NodeManager { @@ -17,6 +18,7 @@ impl NodeManager { cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")), layer_size: vec![], db, + db_tx: None, } } @@ -25,6 +27,7 @@ impl NodeManager { cache: LruCache::unbounded(), layer_size: vec![], db: Arc::new(EmptyNodeDatabase {}), + db_tx: None, } } @@ -41,9 +44,9 @@ impl NodeManager { saved_nodes.push((layer, *pos, node)); *pos += 1; } - if let Err(e) = self.db.save_node_list(&saved_nodes) { - error!("Failed to save node list: {:?}", e); - } + let size = *pos; + self.db_tx().save_layer_size(layer, size); + self.db_tx().save_node_list(&saved_nodes); } pub fn get_node(&self, layer: usize, pos: usize) -> Option { @@ -66,14 +69,17 @@ impl NodeManager { } pub fn add_node(&mut self, layer: usize, pos: usize, node: E) { - if let Err(e) = self.db.save_node(layer, pos, &node) { - error!("Failed to save node: {}", e); + // No need to insert if the value is unchanged. + if self.cache.get(&(layer, pos)) != Some(&node) { + self.db_tx().save_node(layer, pos, &node); + self.cache.put((layer, pos), node); } - self.cache.put((layer, pos), node); } pub fn add_layer(&mut self) { self.layer_size.push(0); + let layer = self.layer_size.len() - 1; + self.db_tx().save_layer_size(layer, 0); } pub fn layer_size(&self, layer: usize) -> usize { @@ -90,18 +96,42 @@ impl NodeManager { self.cache.pop(&(layer, pos)); removed_nodes.push((layer, pos)); } - if let Err(e) = self.db.remove_node_list(&removed_nodes) { - error!("Failed to remove node list: {:?}", e); - } + self.db_tx().remove_node_list(&removed_nodes); self.layer_size[layer] = pos_end; + self.db_tx().save_layer_size(layer, pos_end); } pub fn truncate_layer(&mut self, layer: usize) { self.truncate_nodes(layer, 0); if layer == self.num_layers() - 1 { self.layer_size.pop(); + self.db_tx().remove_layer_size(layer); } } + + pub fn start_transaction(&mut self) { + if self.db_tx.is_none() { + error!("start new tx before commit"); + } + self.db_tx = Some(self.db.start_transaction()); + } + + pub fn commit(&mut self) { + let tx = match self.db_tx.take() { + Some(tx) => tx, + None => { + error!("db_tx is None"); + return; + } + }; + if let Err(e) = self.db.commit(tx) { + error!("Failed to commit db transaction: {}", e); + } + } + + fn db_tx(&mut self) -> &mut dyn NodeTransaction { + (*self.db_tx.as_mut().expect("tx checked")).as_mut() + } } pub struct NodeIterator<'a, E: HashElement> { @@ -127,28 +157,46 @@ impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> { pub trait NodeDatabase: Send + Sync { fn get_node(&self, layer: usize, pos: usize) -> Result>; - fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>; + fn get_layer_size(&self, layer: usize) -> Result>; + fn start_transaction(&self) -> Box>; + fn commit(&self, tx: Box>) -> Result<()>; +} + +pub trait NodeTransaction: Send + Sync { + fn save_node(&mut self, layer: usize, pos: usize, node: &E); /// `nodes` are a list of tuples `(layer, pos, node)`. - fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>; - fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>; + fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]); + fn remove_node_list(&mut self, nodes: &[(usize, usize)]); + fn save_layer_size(&mut self, layer: usize, size: usize); + fn remove_layer_size(&mut self, layer: usize); } /// A dummy database structure for in-memory merkle tree that will not read/write db. pub struct EmptyNodeDatabase {} +pub struct EmptyNodeTransaction {} impl NodeDatabase for EmptyNodeDatabase { fn get_node(&self, _layer: usize, _pos: usize) -> Result> { Ok(None) } - - fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> { - Ok(()) + fn get_layer_size(&self, _layer: usize) -> Result> { + Ok(None) } - - fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> { - Ok(()) + fn start_transaction(&self) -> Box> { + Box::new(EmptyNodeTransaction {}) } - - fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> { + fn commit(&self, _tx: Box>) -> Result<()> { Ok(()) } } + +impl NodeTransaction for EmptyNodeTransaction { + fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {} + + fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {} + + fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {} + + fn save_layer_size(&mut self, _layer: usize, _size: usize) {} + + fn remove_layer_size(&mut self, _layer: usize) {} +} diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index e6c454a..dda1688 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -9,9 +9,11 @@ use crate::log_store::log_manager::{ }; use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::{try_option, ZgsKeyValueDB}; +use any::Any; use anyhow::{anyhow, bail, Result}; -use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase}; +use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction}; use itertools::Itertools; +use kvdb::DBTransaction; use parking_lot::RwLock; use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle}; use ssz::{Decode, Encode}; @@ -20,7 +22,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Debug; use std::sync::Arc; -use std::{cmp, mem}; +use std::{any, cmp, mem}; use tracing::{debug, error, trace}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; @@ -670,6 +672,14 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> { Ok((layer_index, position)) } +fn layer_size_key(layer: usize) -> Vec { + let mut key = "layer_size".as_bytes().to_vec(); + key.extend_from_slice(&layer.to_be_bytes()); + key +} + +pub struct NodeDBTransaction(DBTransaction); + impl NodeDatabase for FlowDBStore { fn get_node(&self, layer: usize, pos: usize) -> Result> { Ok(self @@ -678,36 +688,62 @@ impl NodeDatabase for FlowDBStore { .map(|v| DataRoot::from_slice(&v))) } - fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> { - let mut tx = self.kvdb.transaction(); - tx.put( + fn get_layer_size(&self, layer: usize) -> Result> { + match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? { + Some(v) => Ok(Some(try_decode_usize(&v)?)), + None => Ok(None), + } + } + + fn start_transaction(&self) -> Box> { + Box::new(NodeDBTransaction(self.kvdb.transaction())) + } + + fn commit(&self, tx: Box>) -> Result<()> { + let db_tx: &NodeDBTransaction = (&tx as &dyn Any) + .downcast_ref() + .ok_or(anyhow!("downcast failed"))?; + self.kvdb.write(db_tx.0.clone()).map_err(Into::into) + } +} + +impl NodeTransaction for NodeDBTransaction { + fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) { + self.0.put( COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos), node.as_bytes(), ); - Ok(self.kvdb.write(tx)?) } - fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> { - let mut tx = self.kvdb.transaction(); + fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) { for (layer_index, position, data) in nodes { - tx.put( + self.0.put( COL_FLOW_MPT_NODES, &encode_mpt_node_key(*layer_index, *position), data.as_bytes(), ); } - Ok(self.kvdb.write(tx)?) } - fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> { - let mut tx = self.kvdb.transaction(); + fn remove_node_list(&mut self, nodes: &[(usize, usize)]) { for (layer_index, position) in nodes { - tx.delete( + self.0.delete( COL_FLOW_MPT_NODES, &encode_mpt_node_key(*layer_index, *position), ); } - Ok(self.kvdb.write(tx)?) + } + + fn save_layer_size(&mut self, layer: usize, size: usize) { + self.0.put( + COL_FLOW_MPT_NODES, + &layer_size_key(layer), + &size.to_be_bytes(), + ); + } + + fn remove_layer_size(&mut self, layer: usize) { + self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer)); } } diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index d77b5f5..50b4cc5 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -3,14 +3,12 @@ use crate::log_store::log_manager::{ PORA_CHUNK_SIZE, }; use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite}; -use append_merkle::{ - Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm, -}; +use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm}; use ethereum_types::H256; use rand::random; use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE}; use std::cmp; -use std::sync::Arc; + use task_executor::test_utils::TestRuntime; #[test] diff --git a/tests/example_test.py b/tests/example_test.py index 19ff56c..9fd6bf4 100755 --- a/tests/example_test.py +++ b/tests/example_test.py @@ -6,10 +6,15 @@ from utility.utils import wait_until class ExampleTest(TestFramework): + def setup_params(self): + self.zgs_node_configs[0] = { + "merkle_node_cache_capacity": 1024, + } + def run_test(self): client = self.nodes[0] - chunk_data = b"\x02" * 5253123 + chunk_data = b"\x02" * 256 * 1024 * 1024 * 3 submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions) wait_until(lambda: self.contract.num_submissions() == 1) @@ -19,6 +24,20 @@ class ExampleTest(TestFramework): self.log.info("segment: %s", len(segment)) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + self.stop_storage_node(0) + self.start_storage_node(0) + self.nodes[0].wait_for_rpc_connection() + + chunk_data = b"\x03" * 256 * (1024 * 765 + 5) + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + wait_until(lambda: self.contract.num_submissions() == 2) + wait_until(lambda: client.zgs_get_file_info(data_root) is not None) + + segment = submit_data(client, chunk_data) + self.log.info("segment: %s", len(segment)) + wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + if __name__ == "__main__": ExampleTest().main()