From 1c4257f6925bacdaa92d6323bf60cbaea19a08bf Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Tue, 15 Oct 2024 00:59:25 +0800 Subject: [PATCH] Fix. --- common/append_merkle/src/lib.rs | 1 + common/append_merkle/src/node_manager.rs | 28 ++++++++++----- node/storage/src/log_store/flow_store.rs | 13 ++++--- tests/example_test.py | 21 +----------- tests/node_cache_test.py | 43 ++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 33 deletions(-) create mode 100755 tests/node_cache_test.py diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 12f8fcf..6623342 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -47,6 +47,7 @@ impl> AppendMerkleTree { leaf_height, _a: Default::default(), }; + merkle.node_manager.start_transaction(); merkle.node_manager.add_layer(); merkle.node_manager.append_nodes(0, &leaves); if merkle.leaves() == 0 { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 50c30f1..005787e 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -1,6 +1,7 @@ use crate::HashElement; use anyhow::Result; use lru::LruCache; +use std::any::Any; use std::num::NonZeroUsize; use std::sync::Arc; use tracing::error; @@ -39,19 +40,18 @@ impl NodeManager { pub fn push_node(&mut self, layer: usize, node: E) { self.add_node(layer, self.layer_size[layer], node); - self.layer_size[layer] += 1; + self.set_layer_size(layer, self.layer_size[layer] + 1); } pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) { - let pos = &mut self.layer_size[layer]; + let mut pos = self.layer_size[layer]; let mut saved_nodes = Vec::with_capacity(nodes.len()); for node in nodes { - self.cache.put((layer, *pos), node.clone()); - saved_nodes.push((layer, *pos, node)); - *pos += 1; + self.cache.put((layer, pos), node.clone()); + saved_nodes.push((layer, pos, node)); + pos += 1; } - let size = *pos; - self.db_tx().save_layer_size(layer, size); + self.set_layer_size(layer, pos); self.db_tx().save_node_list(&saved_nodes); } @@ -103,8 +103,7 @@ impl NodeManager { removed_nodes.push((layer, pos)); } self.db_tx().remove_node_list(&removed_nodes); - self.layer_size[layer] = pos_end; - self.db_tx().save_layer_size(layer, pos_end); + self.set_layer_size(layer, pos_end); } pub fn truncate_layer(&mut self, layer: usize) { @@ -138,6 +137,11 @@ impl NodeManager { fn db_tx(&mut self) -> &mut dyn NodeTransaction { (*self.db_tx.as_mut().expect("tx checked")).as_mut() } + + fn set_layer_size(&mut self, layer: usize, size: usize) { + self.layer_size[layer] = size; + self.db_tx().save_layer_size(layer, size); + } } pub struct NodeIterator<'a, E: HashElement> { @@ -175,6 +179,8 @@ pub trait NodeTransaction: Send + Sync { fn remove_node_list(&mut self, nodes: &[(usize, usize)]); fn save_layer_size(&mut self, layer: usize, size: usize); fn remove_layer_size(&mut self, layer: usize); + + fn into_any(self: Box) -> Box; } /// A dummy database structure for in-memory merkle tree that will not read/write db. @@ -205,4 +211,8 @@ impl NodeTransaction for EmptyNodeTransaction { fn save_layer_size(&mut self, _layer: usize, _size: usize) {} fn remove_layer_size(&mut self, _layer: usize) {} + + fn into_any(self: Box) -> Box { + self + } } diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index dda1688..88b11e9 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -700,10 +700,11 @@ impl NodeDatabase for FlowDBStore { } fn commit(&self, tx: Box>) -> Result<()> { - let db_tx: &NodeDBTransaction = (&tx as &dyn Any) - .downcast_ref() - .ok_or(anyhow!("downcast failed"))?; - self.kvdb.write(db_tx.0.clone()).map_err(Into::into) + let db_tx: Box = tx + .into_any() + .downcast() + .map_err(|e| anyhow!("downcast failed, e={:?}", e))?; + self.kvdb.write(db_tx.0).map_err(Into::into) } } @@ -746,4 +747,8 @@ impl NodeTransaction for NodeDBTransaction { fn remove_layer_size(&mut self, layer: usize) { self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer)); } + + fn into_any(self: Box) -> Box { + self + } } diff --git a/tests/example_test.py b/tests/example_test.py index 9fd6bf4..19ff56c 100755 --- a/tests/example_test.py +++ b/tests/example_test.py @@ -6,15 +6,10 @@ from utility.utils import wait_until class ExampleTest(TestFramework): - def setup_params(self): - self.zgs_node_configs[0] = { - "merkle_node_cache_capacity": 1024, - } - def run_test(self): client = self.nodes[0] - chunk_data = b"\x02" * 256 * 1024 * 1024 * 3 + chunk_data = b"\x02" * 5253123 submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions) wait_until(lambda: self.contract.num_submissions() == 1) @@ -24,20 +19,6 @@ class ExampleTest(TestFramework): self.log.info("segment: %s", len(segment)) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) - self.stop_storage_node(0) - self.start_storage_node(0) - self.nodes[0].wait_for_rpc_connection() - - chunk_data = b"\x03" * 256 * (1024 * 765 + 5) - submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) - wait_until(lambda: self.contract.num_submissions() == 2) - wait_until(lambda: client.zgs_get_file_info(data_root) is not None) - - segment = submit_data(client, chunk_data) - self.log.info("segment: %s", len(segment)) - wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) - if __name__ == "__main__": ExampleTest().main() diff --git a/tests/node_cache_test.py b/tests/node_cache_test.py new file mode 100755 index 0000000..354cc01 --- /dev/null +++ b/tests/node_cache_test.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +from test_framework.test_framework import TestFramework +from utility.submission import create_submission, submit_data +from utility.utils import wait_until + + +class NodeCacheTest(TestFramework): + def setup_params(self): + self.zgs_node_configs[0] = { + "merkle_node_cache_capacity": 1024, + } + + def run_test(self): + client = self.nodes[0] + + chunk_data = b"\x02" * 256 * 1024 * 1024 * 3 + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + wait_until(lambda: self.contract.num_submissions() == 1) + wait_until(lambda: client.zgs_get_file_info(data_root) is not None) + + segment = submit_data(client, chunk_data) + self.log.info("segment: %s", len(segment)) + wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + + self.stop_storage_node(0) + self.start_storage_node(0) + self.nodes[0].wait_for_rpc_connection() + + chunk_data = b"\x03" * 256 * (1024 * 765 + 5) + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + wait_until(lambda: self.contract.num_submissions() == 2) + wait_until(lambda: client.zgs_get_file_info(data_root) is not None) + + segment = submit_data(client, chunk_data) + self.log.info("segment: %s", len(segment)) + wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + + +if __name__ == "__main__": + NodeCacheTest().main()