This commit is contained in:
Peilun Li 2024-10-15 00:59:25 +08:00
parent 9c2f6e9d7d
commit 1c4257f692
5 changed files with 73 additions and 33 deletions

View File

@ -47,6 +47,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
leaf_height,
_a: Default::default(),
};
merkle.node_manager.start_transaction();
merkle.node_manager.add_layer();
merkle.node_manager.append_nodes(0, &leaves);
if merkle.leaves() == 0 {

View File

@ -1,6 +1,7 @@
use crate::HashElement;
use anyhow::Result;
use lru::LruCache;
use std::any::Any;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::error;
@ -39,19 +40,18 @@ impl<E: HashElement> NodeManager<E> {
pub fn push_node(&mut self, layer: usize, node: E) {
self.add_node(layer, self.layer_size[layer], node);
self.layer_size[layer] += 1;
self.set_layer_size(layer, self.layer_size[layer] + 1);
}
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
let pos = &mut self.layer_size[layer];
let mut pos = self.layer_size[layer];
let mut saved_nodes = Vec::with_capacity(nodes.len());
for node in nodes {
self.cache.put((layer, *pos), node.clone());
saved_nodes.push((layer, *pos, node));
*pos += 1;
self.cache.put((layer, pos), node.clone());
saved_nodes.push((layer, pos, node));
pos += 1;
}
let size = *pos;
self.db_tx().save_layer_size(layer, size);
self.set_layer_size(layer, pos);
self.db_tx().save_node_list(&saved_nodes);
}
@ -103,8 +103,7 @@ impl<E: HashElement> NodeManager<E> {
removed_nodes.push((layer, pos));
}
self.db_tx().remove_node_list(&removed_nodes);
self.layer_size[layer] = pos_end;
self.db_tx().save_layer_size(layer, pos_end);
self.set_layer_size(layer, pos_end);
}
pub fn truncate_layer(&mut self, layer: usize) {
@ -138,6 +137,11 @@ impl<E: HashElement> NodeManager<E> {
fn db_tx(&mut self) -> &mut dyn NodeTransaction<E> {
(*self.db_tx.as_mut().expect("tx checked")).as_mut()
}
fn set_layer_size(&mut self, layer: usize, size: usize) {
self.layer_size[layer] = size;
self.db_tx().save_layer_size(layer, size);
}
}
pub struct NodeIterator<'a, E: HashElement> {
@ -175,6 +179,8 @@ pub trait NodeTransaction<E: HashElement>: Send + Sync {
fn remove_node_list(&mut self, nodes: &[(usize, usize)]);
fn save_layer_size(&mut self, layer: usize, size: usize);
fn remove_layer_size(&mut self, layer: usize);
fn into_any(self: Box<Self>) -> Box<dyn Any>;
}
/// A dummy database structure for in-memory merkle tree that will not read/write db.
@ -205,4 +211,8 @@ impl<E: HashElement> NodeTransaction<E> for EmptyNodeTransaction {
fn save_layer_size(&mut self, _layer: usize, _size: usize) {}
fn remove_layer_size(&mut self, _layer: usize) {}
fn into_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}

View File

@ -700,10 +700,11 @@ impl NodeDatabase<DataRoot> for FlowDBStore {
}
fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
let db_tx: &NodeDBTransaction = (&tx as &dyn Any)
.downcast_ref()
.ok_or(anyhow!("downcast failed"))?;
self.kvdb.write(db_tx.0.clone()).map_err(Into::into)
let db_tx: Box<NodeDBTransaction> = tx
.into_any()
.downcast()
.map_err(|e| anyhow!("downcast failed, e={:?}", e))?;
self.kvdb.write(db_tx.0).map_err(Into::into)
}
}
@ -746,4 +747,8 @@ impl NodeTransaction<DataRoot> for NodeDBTransaction {
fn remove_layer_size(&mut self, layer: usize) {
self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer));
}
fn into_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}

View File

@ -6,15 +6,10 @@ from utility.utils import wait_until
class ExampleTest(TestFramework):
def setup_params(self):
self.zgs_node_configs[0] = {
"merkle_node_cache_capacity": 1024,
}
def run_test(self):
client = self.nodes[0]
chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
chunk_data = b"\x02" * 5253123
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 1)
@ -24,20 +19,6 @@ class ExampleTest(TestFramework):
self.log.info("segment: %s", len(segment))
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
self.stop_storage_node(0)
self.start_storage_node(0)
self.nodes[0].wait_for_rpc_connection()
chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 2)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
self.log.info("segment: %s", len(segment))
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
if __name__ == "__main__":
ExampleTest().main()

43
tests/node_cache_test.py Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from utility.submission import create_submission, submit_data
from utility.utils import wait_until
class NodeCacheTest(TestFramework):
def setup_params(self):
self.zgs_node_configs[0] = {
"merkle_node_cache_capacity": 1024,
}
def run_test(self):
client = self.nodes[0]
chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
self.log.info("segment: %s", len(segment))
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
self.stop_storage_node(0)
self.start_storage_node(0)
self.nodes[0].wait_for_rpc_connection()
chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 2)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
self.log.info("segment: %s", len(segment))
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
if __name__ == "__main__":
NodeCacheTest().main()