mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Save layer size.
This commit is contained in:
parent
90650294af
commit
19829f1def
@ -16,7 +16,7 @@ use crate::merkle_tree::MerkleTreeWrite;
|
|||||||
pub use crate::merkle_tree::{
|
pub use crate::merkle_tree::{
|
||||||
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
||||||
};
|
};
|
||||||
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager};
|
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction};
|
||||||
pub use proof::{Proof, RangeProof};
|
pub use proof::{Proof, RangeProof};
|
||||||
pub use sha3::Sha3Algorithm;
|
pub use sha3::Sha3Algorithm;
|
||||||
|
|
||||||
@ -160,8 +160,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
self.node_manager.push_node(0, new_leaf);
|
self.node_manager.push_node(0, new_leaf);
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||||
@ -169,9 +171,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.node_manager.append_nodes(0, &leaf_list);
|
self.node_manager.append_nodes(0, &leaf_list);
|
||||||
self.recompute_after_append_leaves(start_index);
|
self.recompute_after_append_leaves(start_index);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a leaf list by providing their intermediate node hash.
|
/// Append a leaf list by providing their intermediate node hash.
|
||||||
@ -184,9 +188,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_root is null");
|
bail!("subtree_root is null");
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -195,11 +201,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_list contains null");
|
bail!("subtree_list contains null");
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
for (subtree_depth, subtree_root) in subtree_list {
|
for (subtree_depth, subtree_root) in subtree_list {
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
}
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,6 +218,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// updating to null is not allowed.
|
// updating to null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
if self.layer_len(0) == 0 {
|
if self.layer_len(0) == 0 {
|
||||||
// Special case for the first data.
|
// Special case for the first data.
|
||||||
self.push_node(0, updated_leaf);
|
self.push_node(0, updated_leaf);
|
||||||
@ -217,6 +226,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
||||||
}
|
}
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fill an unknown `null` leaf with its real value.
|
/// Fill an unknown `null` leaf with its real value.
|
||||||
@ -226,8 +236,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
if leaf == E::null() {
|
if leaf == E::null() {
|
||||||
// fill leaf with null is not allowed.
|
// fill leaf with null is not allowed.
|
||||||
} else if self.node(0, index) == E::null() {
|
} else if self.node(0, index) == E::null() {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
self.update_node(0, index, leaf);
|
self.update_node(0, index, leaf);
|
||||||
self.recompute_after_fill_leaves(index, index + 1);
|
self.recompute_after_fill_leaves(index, index + 1);
|
||||||
|
self.node_manager.commit();
|
||||||
} else if self.node(0, index) != leaf {
|
} else if self.node(0, index) != leaf {
|
||||||
panic!(
|
panic!(
|
||||||
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
||||||
@ -246,18 +258,26 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
proof: RangeProof<E>,
|
proof: RangeProof<E>,
|
||||||
) -> Result<Vec<(usize, usize, E)>> {
|
) -> Result<Vec<(usize, usize, E)>> {
|
||||||
self.fill_with_proof(
|
self.node_manager.start_transaction();
|
||||||
proof
|
let mut updated_nodes = Vec::new();
|
||||||
.left_proof
|
updated_nodes.append(
|
||||||
.proof_nodes_in_tree()
|
&mut self.fill_with_proof(
|
||||||
.split_off(self.leaf_height),
|
proof
|
||||||
)?;
|
.left_proof
|
||||||
self.fill_with_proof(
|
.proof_nodes_in_tree()
|
||||||
proof
|
.split_off(self.leaf_height),
|
||||||
.right_proof
|
)?,
|
||||||
.proof_nodes_in_tree()
|
);
|
||||||
.split_off(self.leaf_height),
|
updated_nodes.append(
|
||||||
)
|
&mut self.fill_with_proof(
|
||||||
|
proof
|
||||||
|
.right_proof
|
||||||
|
.proof_nodes_in_tree()
|
||||||
|
.split_off(self.leaf_height),
|
||||||
|
)?,
|
||||||
|
);
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(updated_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fill_with_file_proof(
|
pub fn fill_with_file_proof(
|
||||||
@ -282,13 +302,16 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
if tx_merkle_nodes.is_empty() {
|
if tx_merkle_nodes.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let mut position_and_data =
|
let mut position_and_data =
|
||||||
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
||||||
let start_index = (start_index >> self.leaf_height) as usize;
|
let start_index = (start_index >> self.leaf_height) as usize;
|
||||||
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
||||||
*position += start_index >> i;
|
*position += start_index >> i;
|
||||||
}
|
}
|
||||||
self.fill_with_proof(position_and_data)
|
let updated_nodes = self.fill_with_proof(position_and_data)?;
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(updated_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
||||||
@ -757,11 +780,10 @@ macro_rules! ensure_eq {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::merkle_tree::MerkleTreeRead;
|
use crate::merkle_tree::MerkleTreeRead;
|
||||||
use crate::node_manager::EmptyNodeDatabase;
|
|
||||||
use crate::sha3::Sha3Algorithm;
|
use crate::sha3::Sha3Algorithm;
|
||||||
use crate::AppendMerkleTree;
|
use crate::AppendMerkleTree;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_proof() {
|
fn test_proof() {
|
||||||
|
@ -9,6 +9,7 @@ pub struct NodeManager<E: HashElement> {
|
|||||||
cache: LruCache<(usize, usize), E>,
|
cache: LruCache<(usize, usize), E>,
|
||||||
layer_size: Vec<usize>,
|
layer_size: Vec<usize>,
|
||||||
db: Arc<dyn NodeDatabase<E>>,
|
db: Arc<dyn NodeDatabase<E>>,
|
||||||
|
db_tx: Option<Box<dyn NodeTransaction<E>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: HashElement> NodeManager<E> {
|
impl<E: HashElement> NodeManager<E> {
|
||||||
@ -17,6 +18,7 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||||
layer_size: vec![],
|
layer_size: vec![],
|
||||||
db,
|
db,
|
||||||
|
db_tx: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,6 +27,7 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
cache: LruCache::unbounded(),
|
cache: LruCache::unbounded(),
|
||||||
layer_size: vec![],
|
layer_size: vec![],
|
||||||
db: Arc::new(EmptyNodeDatabase {}),
|
db: Arc::new(EmptyNodeDatabase {}),
|
||||||
|
db_tx: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -41,9 +44,9 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
saved_nodes.push((layer, *pos, node));
|
saved_nodes.push((layer, *pos, node));
|
||||||
*pos += 1;
|
*pos += 1;
|
||||||
}
|
}
|
||||||
if let Err(e) = self.db.save_node_list(&saved_nodes) {
|
let size = *pos;
|
||||||
error!("Failed to save node list: {:?}", e);
|
self.db_tx().save_layer_size(layer, size);
|
||||||
}
|
self.db_tx().save_node_list(&saved_nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
||||||
@ -66,14 +69,17 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
||||||
if let Err(e) = self.db.save_node(layer, pos, &node) {
|
// No need to insert if the value is unchanged.
|
||||||
error!("Failed to save node: {}", e);
|
if self.cache.get(&(layer, pos)) != Some(&node) {
|
||||||
|
self.db_tx().save_node(layer, pos, &node);
|
||||||
|
self.cache.put((layer, pos), node);
|
||||||
}
|
}
|
||||||
self.cache.put((layer, pos), node);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_layer(&mut self) {
|
pub fn add_layer(&mut self) {
|
||||||
self.layer_size.push(0);
|
self.layer_size.push(0);
|
||||||
|
let layer = self.layer_size.len() - 1;
|
||||||
|
self.db_tx().save_layer_size(layer, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn layer_size(&self, layer: usize) -> usize {
|
pub fn layer_size(&self, layer: usize) -> usize {
|
||||||
@ -90,18 +96,42 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
self.cache.pop(&(layer, pos));
|
self.cache.pop(&(layer, pos));
|
||||||
removed_nodes.push((layer, pos));
|
removed_nodes.push((layer, pos));
|
||||||
}
|
}
|
||||||
if let Err(e) = self.db.remove_node_list(&removed_nodes) {
|
self.db_tx().remove_node_list(&removed_nodes);
|
||||||
error!("Failed to remove node list: {:?}", e);
|
|
||||||
}
|
|
||||||
self.layer_size[layer] = pos_end;
|
self.layer_size[layer] = pos_end;
|
||||||
|
self.db_tx().save_layer_size(layer, pos_end);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn truncate_layer(&mut self, layer: usize) {
|
pub fn truncate_layer(&mut self, layer: usize) {
|
||||||
self.truncate_nodes(layer, 0);
|
self.truncate_nodes(layer, 0);
|
||||||
if layer == self.num_layers() - 1 {
|
if layer == self.num_layers() - 1 {
|
||||||
self.layer_size.pop();
|
self.layer_size.pop();
|
||||||
|
self.db_tx().remove_layer_size(layer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn start_transaction(&mut self) {
|
||||||
|
if self.db_tx.is_none() {
|
||||||
|
error!("start new tx before commit");
|
||||||
|
}
|
||||||
|
self.db_tx = Some(self.db.start_transaction());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn commit(&mut self) {
|
||||||
|
let tx = match self.db_tx.take() {
|
||||||
|
Some(tx) => tx,
|
||||||
|
None => {
|
||||||
|
error!("db_tx is None");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = self.db.commit(tx) {
|
||||||
|
error!("Failed to commit db transaction: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn db_tx(&mut self) -> &mut dyn NodeTransaction<E> {
|
||||||
|
(*self.db_tx.as_mut().expect("tx checked")).as_mut()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NodeIterator<'a, E: HashElement> {
|
pub struct NodeIterator<'a, E: HashElement> {
|
||||||
@ -127,28 +157,46 @@ impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
|
|||||||
|
|
||||||
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
||||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
||||||
fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>;
|
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>>;
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>>;
|
||||||
|
fn commit(&self, tx: Box<dyn NodeTransaction<E>>) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait NodeTransaction<E: HashElement>: Send + Sync {
|
||||||
|
fn save_node(&mut self, layer: usize, pos: usize, node: &E);
|
||||||
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
||||||
fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>;
|
fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]);
|
||||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>;
|
fn remove_node_list(&mut self, nodes: &[(usize, usize)]);
|
||||||
|
fn save_layer_size(&mut self, layer: usize, size: usize);
|
||||||
|
fn remove_layer_size(&mut self, layer: usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
||||||
pub struct EmptyNodeDatabase {}
|
pub struct EmptyNodeDatabase {}
|
||||||
|
pub struct EmptyNodeTransaction {}
|
||||||
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
||||||
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
fn get_layer_size(&self, _layer: usize) -> Result<Option<usize>> {
|
||||||
fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> {
|
Ok(None)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>> {
|
||||||
fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> {
|
Box::new(EmptyNodeTransaction {})
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
fn commit(&self, _tx: Box<dyn NodeTransaction<E>>) -> Result<()> {
|
||||||
fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> {
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<E: HashElement> NodeTransaction<E> for EmptyNodeTransaction {
|
||||||
|
fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {}
|
||||||
|
|
||||||
|
fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {}
|
||||||
|
|
||||||
|
fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {}
|
||||||
|
|
||||||
|
fn save_layer_size(&mut self, _layer: usize, _size: usize) {}
|
||||||
|
|
||||||
|
fn remove_layer_size(&mut self, _layer: usize) {}
|
||||||
|
}
|
||||||
|
@ -9,9 +9,11 @@ use crate::log_store::log_manager::{
|
|||||||
};
|
};
|
||||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
|
use any::Any;
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase};
|
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use kvdb::DBTransaction;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
@ -20,7 +22,7 @@ use std::cmp::Ordering;
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{cmp, mem};
|
use std::{any, cmp, mem};
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, error, trace};
|
||||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||||
|
|
||||||
@ -670,6 +672,14 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
|||||||
Ok((layer_index, position))
|
Ok((layer_index, position))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn layer_size_key(layer: usize) -> Vec<u8> {
|
||||||
|
let mut key = "layer_size".as_bytes().to_vec();
|
||||||
|
key.extend_from_slice(&layer.to_be_bytes());
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NodeDBTransaction(DBTransaction);
|
||||||
|
|
||||||
impl NodeDatabase<DataRoot> for FlowDBStore {
|
impl NodeDatabase<DataRoot> for FlowDBStore {
|
||||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
@ -678,36 +688,62 @@ impl NodeDatabase<DataRoot> for FlowDBStore {
|
|||||||
.map(|v| DataRoot::from_slice(&v)))
|
.map(|v| DataRoot::from_slice(&v)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> {
|
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
|
||||||
let mut tx = self.kvdb.transaction();
|
match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? {
|
||||||
tx.put(
|
Some(v) => Ok(Some(try_decode_usize(&v)?)),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<DataRoot>> {
|
||||||
|
Box::new(NodeDBTransaction(self.kvdb.transaction()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
|
||||||
|
let db_tx: &NodeDBTransaction = (&tx as &dyn Any)
|
||||||
|
.downcast_ref()
|
||||||
|
.ok_or(anyhow!("downcast failed"))?;
|
||||||
|
self.kvdb.write(db_tx.0.clone()).map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeTransaction<DataRoot> for NodeDBTransaction {
|
||||||
|
fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) {
|
||||||
|
self.0.put(
|
||||||
COL_FLOW_MPT_NODES,
|
COL_FLOW_MPT_NODES,
|
||||||
&encode_mpt_node_key(layer, pos),
|
&encode_mpt_node_key(layer, pos),
|
||||||
node.as_bytes(),
|
node.as_bytes(),
|
||||||
);
|
);
|
||||||
Ok(self.kvdb.write(tx)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> {
|
fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) {
|
||||||
let mut tx = self.kvdb.transaction();
|
|
||||||
for (layer_index, position, data) in nodes {
|
for (layer_index, position, data) in nodes {
|
||||||
tx.put(
|
self.0.put(
|
||||||
COL_FLOW_MPT_NODES,
|
COL_FLOW_MPT_NODES,
|
||||||
&encode_mpt_node_key(*layer_index, *position),
|
&encode_mpt_node_key(*layer_index, *position),
|
||||||
data.as_bytes(),
|
data.as_bytes(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(self.kvdb.write(tx)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> {
|
fn remove_node_list(&mut self, nodes: &[(usize, usize)]) {
|
||||||
let mut tx = self.kvdb.transaction();
|
|
||||||
for (layer_index, position) in nodes {
|
for (layer_index, position) in nodes {
|
||||||
tx.delete(
|
self.0.delete(
|
||||||
COL_FLOW_MPT_NODES,
|
COL_FLOW_MPT_NODES,
|
||||||
&encode_mpt_node_key(*layer_index, *position),
|
&encode_mpt_node_key(*layer_index, *position),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(self.kvdb.write(tx)?)
|
}
|
||||||
|
|
||||||
|
fn save_layer_size(&mut self, layer: usize, size: usize) {
|
||||||
|
self.0.put(
|
||||||
|
COL_FLOW_MPT_NODES,
|
||||||
|
&layer_size_key(layer),
|
||||||
|
&size.to_be_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_layer_size(&mut self, layer: usize) {
|
||||||
|
self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,14 +3,12 @@ use crate::log_store::log_manager::{
|
|||||||
PORA_CHUNK_SIZE,
|
PORA_CHUNK_SIZE,
|
||||||
};
|
};
|
||||||
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
|
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
|
||||||
use append_merkle::{
|
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||||
Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm,
|
|
||||||
};
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use rand::random;
|
use rand::random;
|
||||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::sync::Arc;
|
|
||||||
use task_executor::test_utils::TestRuntime;
|
use task_executor::test_utils::TestRuntime;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -6,10 +6,15 @@ from utility.utils import wait_until
|
|||||||
|
|
||||||
|
|
||||||
class ExampleTest(TestFramework):
|
class ExampleTest(TestFramework):
|
||||||
|
def setup_params(self):
|
||||||
|
self.zgs_node_configs[0] = {
|
||||||
|
"merkle_node_cache_capacity": 1024,
|
||||||
|
}
|
||||||
|
|
||||||
def run_test(self):
|
def run_test(self):
|
||||||
client = self.nodes[0]
|
client = self.nodes[0]
|
||||||
|
|
||||||
chunk_data = b"\x02" * 5253123
|
chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
|
||||||
submissions, data_root = create_submission(chunk_data)
|
submissions, data_root = create_submission(chunk_data)
|
||||||
self.contract.submit(submissions)
|
self.contract.submit(submissions)
|
||||||
wait_until(lambda: self.contract.num_submissions() == 1)
|
wait_until(lambda: self.contract.num_submissions() == 1)
|
||||||
@ -19,6 +24,20 @@ class ExampleTest(TestFramework):
|
|||||||
self.log.info("segment: %s", len(segment))
|
self.log.info("segment: %s", len(segment))
|
||||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
self.stop_storage_node(0)
|
||||||
|
self.start_storage_node(0)
|
||||||
|
self.nodes[0].wait_for_rpc_connection()
|
||||||
|
|
||||||
|
chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
|
||||||
|
submissions, data_root = create_submission(chunk_data)
|
||||||
|
self.contract.submit(submissions)
|
||||||
|
wait_until(lambda: self.contract.num_submissions() == 2)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
|
||||||
|
segment = submit_data(client, chunk_data)
|
||||||
|
self.log.info("segment: %s", len(segment))
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
ExampleTest().main()
|
ExampleTest().main()
|
||||||
|
Loading…
Reference in New Issue
Block a user