mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
15 Commits
90650294af
...
703d926a23
Author | SHA1 | Date | |
---|---|---|---|
![]() |
703d926a23 | ||
![]() |
6a4b246e8b | ||
![]() |
1c4257f692 | ||
![]() |
9c2f6e9d7d | ||
![]() |
19829f1def | ||
![]() |
e701c8fdbd | ||
![]() |
a4b02a21b7 | ||
![]() |
3fc1543fb4 | ||
![]() |
82fd29968b | ||
![]() |
45fa344564 | ||
![]() |
48868b60db | ||
![]() |
649f6e5e9f | ||
![]() |
79d960d4ea | ||
![]() |
b131dc532f | ||
![]() |
4e2c5fa8a4 |
@ -16,7 +16,7 @@ use crate::merkle_tree::MerkleTreeWrite;
|
|||||||
pub use crate::merkle_tree::{
|
pub use crate::merkle_tree::{
|
||||||
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
|
||||||
};
|
};
|
||||||
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager};
|
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction};
|
||||||
pub use proof::{Proof, RangeProof};
|
pub use proof::{Proof, RangeProof};
|
||||||
pub use sha3::Sha3Algorithm;
|
pub use sha3::Sha3Algorithm;
|
||||||
|
|
||||||
@ -47,6 +47,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
leaf_height,
|
leaf_height,
|
||||||
_a: Default::default(),
|
_a: Default::default(),
|
||||||
};
|
};
|
||||||
|
merkle.node_manager.start_transaction();
|
||||||
merkle.node_manager.add_layer();
|
merkle.node_manager.add_layer();
|
||||||
merkle.node_manager.append_nodes(0, &leaves);
|
merkle.node_manager.append_nodes(0, &leaves);
|
||||||
if merkle.leaves() == 0 {
|
if merkle.leaves() == 0 {
|
||||||
@ -71,49 +72,33 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
pub fn new_with_subtrees(
|
pub fn new_with_subtrees(
|
||||||
node_db: Arc<dyn NodeDatabase<E>>,
|
node_db: Arc<dyn NodeDatabase<E>>,
|
||||||
node_cache_capacity: usize,
|
node_cache_capacity: usize,
|
||||||
initial_data: MerkleTreeInitialData<E>,
|
|
||||||
leaf_height: usize,
|
leaf_height: usize,
|
||||||
start_tx_seq: Option<u64>,
|
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
node_manager: NodeManager::new(node_db, node_cache_capacity),
|
node_manager: NodeManager::new(node_db, node_cache_capacity)?,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: None,
|
min_depth: None,
|
||||||
leaf_height,
|
leaf_height,
|
||||||
_a: Default::default(),
|
_a: Default::default(),
|
||||||
};
|
};
|
||||||
merkle.node_manager.add_layer();
|
if merkle.height() == 0 {
|
||||||
if initial_data.subtree_list.is_empty() {
|
merkle.node_manager.start_transaction();
|
||||||
if let Some(seq) = start_tx_seq {
|
merkle.node_manager.add_layer();
|
||||||
merkle.delta_nodes_map.insert(
|
merkle.node_manager.commit();
|
||||||
seq,
|
|
||||||
DeltaNodes {
|
|
||||||
right_most_nodes: vec![],
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return Ok(merkle);
|
|
||||||
}
|
|
||||||
merkle.append_subtree_list(initial_data.subtree_list)?;
|
|
||||||
merkle.commit(start_tx_seq);
|
|
||||||
for (index, h) in initial_data.known_leaves {
|
|
||||||
merkle.fill_leaf(index, h);
|
|
||||||
}
|
|
||||||
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
|
|
||||||
// TODO: Delete duplicate nodes from DB.
|
|
||||||
merkle.update_node(layer_index, position, h);
|
|
||||||
}
|
}
|
||||||
Ok(merkle)
|
Ok(merkle)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
||||||
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
||||||
|
let mut node_manager = NodeManager::new_dummy();
|
||||||
|
node_manager.start_transaction();
|
||||||
if leaves.is_empty() {
|
if leaves.is_empty() {
|
||||||
// Create an empty merkle tree with `depth`.
|
// Create an empty merkle tree with `depth`.
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
// dummy node manager for the last chunk.
|
// dummy node manager for the last chunk.
|
||||||
node_manager: NodeManager::new_dummy(),
|
node_manager,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: Some(depth),
|
min_depth: Some(depth),
|
||||||
@ -135,7 +120,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
} else {
|
} else {
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
// dummy node manager for the last chunk.
|
// dummy node manager for the last chunk.
|
||||||
node_manager: NodeManager::new_dummy(),
|
node_manager,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: Some(depth),
|
min_depth: Some(depth),
|
||||||
@ -160,8 +145,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
self.node_manager.push_node(0, new_leaf);
|
self.node_manager.push_node(0, new_leaf);
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||||
@ -169,9 +156,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.node_manager.append_nodes(0, &leaf_list);
|
self.node_manager.append_nodes(0, &leaf_list);
|
||||||
self.recompute_after_append_leaves(start_index);
|
self.recompute_after_append_leaves(start_index);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a leaf list by providing their intermediate node hash.
|
/// Append a leaf list by providing their intermediate node hash.
|
||||||
@ -184,9 +173,11 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_root is null");
|
bail!("subtree_root is null");
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -195,11 +186,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// appending null is not allowed.
|
// appending null is not allowed.
|
||||||
bail!("subtree_list contains null");
|
bail!("subtree_list contains null");
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
for (subtree_depth, subtree_root) in subtree_list {
|
for (subtree_depth, subtree_root) in subtree_list {
|
||||||
let start_index = self.leaves();
|
let start_index = self.leaves();
|
||||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||||
}
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,6 +203,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// updating to null is not allowed.
|
// updating to null is not allowed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
if self.layer_len(0) == 0 {
|
if self.layer_len(0) == 0 {
|
||||||
// Special case for the first data.
|
// Special case for the first data.
|
||||||
self.push_node(0, updated_leaf);
|
self.push_node(0, updated_leaf);
|
||||||
@ -217,6 +211,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
self.update_node(0, self.layer_len(0) - 1, updated_leaf);
|
||||||
}
|
}
|
||||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fill an unknown `null` leaf with its real value.
|
/// Fill an unknown `null` leaf with its real value.
|
||||||
@ -226,8 +221,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
if leaf == E::null() {
|
if leaf == E::null() {
|
||||||
// fill leaf with null is not allowed.
|
// fill leaf with null is not allowed.
|
||||||
} else if self.node(0, index) == E::null() {
|
} else if self.node(0, index) == E::null() {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
self.update_node(0, index, leaf);
|
self.update_node(0, index, leaf);
|
||||||
self.recompute_after_fill_leaves(index, index + 1);
|
self.recompute_after_fill_leaves(index, index + 1);
|
||||||
|
self.node_manager.commit();
|
||||||
} else if self.node(0, index) != leaf {
|
} else if self.node(0, index) != leaf {
|
||||||
panic!(
|
panic!(
|
||||||
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
"Fill with invalid leaf, index={} was={:?} get={:?}",
|
||||||
@ -246,18 +243,20 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
proof: RangeProof<E>,
|
proof: RangeProof<E>,
|
||||||
) -> Result<Vec<(usize, usize, E)>> {
|
) -> Result<Vec<(usize, usize, E)>> {
|
||||||
self.fill_with_proof(
|
self.node_manager.start_transaction();
|
||||||
proof
|
let mut updated_nodes = Vec::new();
|
||||||
.left_proof
|
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
|
||||||
.proof_nodes_in_tree()
|
if left_nodes.len() >= self.leaf_height {
|
||||||
.split_off(self.leaf_height),
|
updated_nodes
|
||||||
)?;
|
.append(&mut self.fill_with_proof(left_nodes.split_off(self.leaf_height))?);
|
||||||
self.fill_with_proof(
|
}
|
||||||
proof
|
let mut right_nodes = proof.right_proof.proof_nodes_in_tree();
|
||||||
.right_proof
|
if right_nodes.len() >= self.leaf_height {
|
||||||
.proof_nodes_in_tree()
|
updated_nodes
|
||||||
.split_off(self.leaf_height),
|
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
|
||||||
)
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(updated_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fill_with_file_proof(
|
pub fn fill_with_file_proof(
|
||||||
@ -282,13 +281,16 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
if tx_merkle_nodes.is_empty() {
|
if tx_merkle_nodes.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let mut position_and_data =
|
let mut position_and_data =
|
||||||
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
||||||
let start_index = (start_index >> self.leaf_height) as usize;
|
let start_index = (start_index >> self.leaf_height) as usize;
|
||||||
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
|
||||||
*position += start_index >> i;
|
*position += start_index >> i;
|
||||||
}
|
}
|
||||||
self.fill_with_proof(position_and_data)
|
let updated_nodes = self.fill_with_proof(position_and_data)?;
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(updated_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
||||||
@ -546,13 +548,14 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// Any previous state of an empty tree is always empty.
|
// Any previous state of an empty tree is always empty.
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let delta_nodes = self
|
let delta_nodes = self
|
||||||
.delta_nodes_map
|
.delta_nodes_map
|
||||||
.get(&tx_seq)
|
.get(&tx_seq)
|
||||||
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
|
.ok_or_else(|| anyhow!("tx_seq unavailable, root={:?}", tx_seq))?
|
||||||
.clone();
|
.clone();
|
||||||
// Dropping the upper layers that are not in the old merkle tree.
|
// Dropping the upper layers that are not in the old merkle tree.
|
||||||
for height in (delta_nodes.right_most_nodes.len()..(self.height() - 1)).rev() {
|
for height in (delta_nodes.right_most_nodes.len()..self.height()).rev() {
|
||||||
self.node_manager.truncate_layer(height);
|
self.node_manager.truncate_layer(height);
|
||||||
}
|
}
|
||||||
for (height, (last_index, right_most_node)) in
|
for (height, (last_index, right_most_node)) in
|
||||||
@ -562,6 +565,24 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.update_node(height, *last_index, right_most_node.clone())
|
self.update_node(height, *last_index, right_most_node.clone())
|
||||||
}
|
}
|
||||||
self.clear_after(tx_seq);
|
self.clear_after(tx_seq);
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revert to a tx_seq not in `delta_nodes_map`.
|
||||||
|
// This is needed to revert the last unfinished tx after restart.
|
||||||
|
pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
|
for height in (0..self.height()).rev() {
|
||||||
|
let kept_nodes = leaves >> height;
|
||||||
|
if kept_nodes == 0 {
|
||||||
|
self.node_manager.truncate_layer(height);
|
||||||
|
} else {
|
||||||
|
self.node_manager.truncate_nodes(height, kept_nodes + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.recompute_after_append_leaves(leaves);
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -588,6 +609,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn reset(&mut self) {
|
pub fn reset(&mut self) {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
for height in (0..self.height()).rev() {
|
for height in (0..self.height()).rev() {
|
||||||
self.node_manager.truncate_layer(height);
|
self.node_manager.truncate_layer(height);
|
||||||
}
|
}
|
||||||
@ -598,6 +620,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
} else {
|
} else {
|
||||||
self.node_manager.add_layer();
|
self.node_manager.add_layer();
|
||||||
}
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_after(&mut self, tx_seq: u64) {
|
fn clear_after(&mut self, tx_seq: u64) {
|
||||||
@ -757,11 +780,10 @@ macro_rules! ensure_eq {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::merkle_tree::MerkleTreeRead;
|
use crate::merkle_tree::MerkleTreeRead;
|
||||||
use crate::node_manager::EmptyNodeDatabase;
|
|
||||||
use crate::sha3::Sha3Algorithm;
|
use crate::sha3::Sha3Algorithm;
|
||||||
use crate::AppendMerkleTree;
|
use crate::AppendMerkleTree;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_proof() {
|
fn test_proof() {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use crate::HashElement;
|
use crate::HashElement;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
|
use std::any::Any;
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
@ -9,15 +10,23 @@ pub struct NodeManager<E: HashElement> {
|
|||||||
cache: LruCache<(usize, usize), E>,
|
cache: LruCache<(usize, usize), E>,
|
||||||
layer_size: Vec<usize>,
|
layer_size: Vec<usize>,
|
||||||
db: Arc<dyn NodeDatabase<E>>,
|
db: Arc<dyn NodeDatabase<E>>,
|
||||||
|
db_tx: Option<Box<dyn NodeTransaction<E>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: HashElement> NodeManager<E> {
|
impl<E: HashElement> NodeManager<E> {
|
||||||
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Self {
|
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Result<Self> {
|
||||||
Self {
|
let mut layer = 0;
|
||||||
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
let mut layer_size = Vec::new();
|
||||||
layer_size: vec![],
|
while let Some(size) = db.get_layer_size(layer)? {
|
||||||
db,
|
layer_size.push(size);
|
||||||
|
layer += 1;
|
||||||
}
|
}
|
||||||
|
Ok(Self {
|
||||||
|
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||||
|
layer_size,
|
||||||
|
db,
|
||||||
|
db_tx: None,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_dummy() -> Self {
|
pub fn new_dummy() -> Self {
|
||||||
@ -25,25 +34,25 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
cache: LruCache::unbounded(),
|
cache: LruCache::unbounded(),
|
||||||
layer_size: vec![],
|
layer_size: vec![],
|
||||||
db: Arc::new(EmptyNodeDatabase {}),
|
db: Arc::new(EmptyNodeDatabase {}),
|
||||||
|
db_tx: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push_node(&mut self, layer: usize, node: E) {
|
pub fn push_node(&mut self, layer: usize, node: E) {
|
||||||
self.add_node(layer, self.layer_size[layer], node);
|
self.add_node(layer, self.layer_size[layer], node);
|
||||||
self.layer_size[layer] += 1;
|
self.set_layer_size(layer, self.layer_size[layer] + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
|
pub fn append_nodes(&mut self, layer: usize, nodes: &[E]) {
|
||||||
let pos = &mut self.layer_size[layer];
|
let mut pos = self.layer_size[layer];
|
||||||
let mut saved_nodes = Vec::with_capacity(nodes.len());
|
let mut saved_nodes = Vec::with_capacity(nodes.len());
|
||||||
for node in nodes {
|
for node in nodes {
|
||||||
self.cache.put((layer, *pos), node.clone());
|
self.cache.put((layer, pos), node.clone());
|
||||||
saved_nodes.push((layer, *pos, node));
|
saved_nodes.push((layer, pos, node));
|
||||||
*pos += 1;
|
pos += 1;
|
||||||
}
|
|
||||||
if let Err(e) = self.db.save_node_list(&saved_nodes) {
|
|
||||||
error!("Failed to save node list: {:?}", e);
|
|
||||||
}
|
}
|
||||||
|
self.set_layer_size(layer, pos);
|
||||||
|
self.db_tx().save_node_list(&saved_nodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
|
||||||
@ -66,14 +75,17 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
pub fn add_node(&mut self, layer: usize, pos: usize, node: E) {
|
||||||
if let Err(e) = self.db.save_node(layer, pos, &node) {
|
// No need to insert if the value is unchanged.
|
||||||
error!("Failed to save node: {}", e);
|
if self.cache.get(&(layer, pos)) != Some(&node) {
|
||||||
|
self.db_tx().save_node(layer, pos, &node);
|
||||||
|
self.cache.put((layer, pos), node);
|
||||||
}
|
}
|
||||||
self.cache.put((layer, pos), node);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_layer(&mut self) {
|
pub fn add_layer(&mut self) {
|
||||||
self.layer_size.push(0);
|
self.layer_size.push(0);
|
||||||
|
let layer = self.layer_size.len() - 1;
|
||||||
|
self.db_tx().save_layer_size(layer, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn layer_size(&self, layer: usize) -> usize {
|
pub fn layer_size(&self, layer: usize) -> usize {
|
||||||
@ -90,18 +102,46 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
self.cache.pop(&(layer, pos));
|
self.cache.pop(&(layer, pos));
|
||||||
removed_nodes.push((layer, pos));
|
removed_nodes.push((layer, pos));
|
||||||
}
|
}
|
||||||
if let Err(e) = self.db.remove_node_list(&removed_nodes) {
|
self.db_tx().remove_node_list(&removed_nodes);
|
||||||
error!("Failed to remove node list: {:?}", e);
|
self.set_layer_size(layer, pos_end);
|
||||||
}
|
|
||||||
self.layer_size[layer] = pos_end;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn truncate_layer(&mut self, layer: usize) {
|
pub fn truncate_layer(&mut self, layer: usize) {
|
||||||
self.truncate_nodes(layer, 0);
|
self.truncate_nodes(layer, 0);
|
||||||
if layer == self.num_layers() - 1 {
|
if layer == self.num_layers() - 1 {
|
||||||
self.layer_size.pop();
|
self.layer_size.pop();
|
||||||
|
self.db_tx().remove_layer_size(layer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn start_transaction(&mut self) {
|
||||||
|
if self.db_tx.is_none() {
|
||||||
|
error!("start new tx before commit");
|
||||||
|
}
|
||||||
|
self.db_tx = Some(self.db.start_transaction());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn commit(&mut self) {
|
||||||
|
let tx = match self.db_tx.take() {
|
||||||
|
Some(tx) => tx,
|
||||||
|
None => {
|
||||||
|
error!("db_tx is None");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = self.db.commit(tx) {
|
||||||
|
error!("Failed to commit db transaction: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn db_tx(&mut self) -> &mut dyn NodeTransaction<E> {
|
||||||
|
(*self.db_tx.as_mut().expect("tx checked")).as_mut()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_layer_size(&mut self, layer: usize, size: usize) {
|
||||||
|
self.layer_size[layer] = size;
|
||||||
|
self.db_tx().save_layer_size(layer, size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NodeIterator<'a, E: HashElement> {
|
pub struct NodeIterator<'a, E: HashElement> {
|
||||||
@ -127,28 +167,52 @@ impl<'a, E: HashElement> Iterator for NodeIterator<'a, E> {
|
|||||||
|
|
||||||
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
pub trait NodeDatabase<E: HashElement>: Send + Sync {
|
||||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<E>>;
|
||||||
fn save_node(&self, layer: usize, pos: usize, node: &E) -> Result<()>;
|
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>>;
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>>;
|
||||||
|
fn commit(&self, tx: Box<dyn NodeTransaction<E>>) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait NodeTransaction<E: HashElement>: Send + Sync {
|
||||||
|
fn save_node(&mut self, layer: usize, pos: usize, node: &E);
|
||||||
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
/// `nodes` are a list of tuples `(layer, pos, node)`.
|
||||||
fn save_node_list(&self, nodes: &[(usize, usize, &E)]) -> Result<()>;
|
fn save_node_list(&mut self, nodes: &[(usize, usize, &E)]);
|
||||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()>;
|
fn remove_node_list(&mut self, nodes: &[(usize, usize)]);
|
||||||
|
fn save_layer_size(&mut self, layer: usize, size: usize);
|
||||||
|
fn remove_layer_size(&mut self, layer: usize);
|
||||||
|
|
||||||
|
fn into_any(self: Box<Self>) -> Box<dyn Any>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
/// A dummy database structure for in-memory merkle tree that will not read/write db.
|
||||||
pub struct EmptyNodeDatabase {}
|
pub struct EmptyNodeDatabase {}
|
||||||
|
pub struct EmptyNodeTransaction {}
|
||||||
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
impl<E: HashElement> NodeDatabase<E> for EmptyNodeDatabase {
|
||||||
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
fn get_node(&self, _layer: usize, _pos: usize) -> Result<Option<E>> {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
fn get_layer_size(&self, _layer: usize) -> Result<Option<usize>> {
|
||||||
fn save_node(&self, _layer: usize, _pos: usize, _node: &E) -> Result<()> {
|
Ok(None)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<E>> {
|
||||||
fn save_node_list(&self, _nodes: &[(usize, usize, &E)]) -> Result<()> {
|
Box::new(EmptyNodeTransaction {})
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
fn commit(&self, _tx: Box<dyn NodeTransaction<E>>) -> Result<()> {
|
||||||
fn remove_node_list(&self, _nodes: &[(usize, usize)]) -> Result<()> {
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<E: HashElement> NodeTransaction<E> for EmptyNodeTransaction {
|
||||||
|
fn save_node(&mut self, _layer: usize, _pos: usize, _node: &E) {}
|
||||||
|
|
||||||
|
fn save_node_list(&mut self, _nodes: &[(usize, usize, &E)]) {}
|
||||||
|
|
||||||
|
fn remove_node_list(&mut self, _nodes: &[(usize, usize)]) {}
|
||||||
|
|
||||||
|
fn save_layer_size(&mut self, _layer: usize, _size: usize) {}
|
||||||
|
|
||||||
|
fn remove_layer_size(&mut self, _layer: usize) {}
|
||||||
|
|
||||||
|
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -9,5 +9,5 @@ exit-future = "0.2.0"
|
|||||||
futures = "0.3.21"
|
futures = "0.3.21"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
lighthouse_metrics = { path = "../lighthouse_metrics" }
|
lighthouse_metrics = { path = "../lighthouse_metrics" }
|
||||||
tokio = { version = "1.19.2", features = ["rt"] }
|
tokio = { version = "1.38.0", features = ["full"] }
|
||||||
tracing = "0.1.35"
|
tracing = "0.1.35"
|
||||||
|
@ -178,7 +178,10 @@ impl LogEntryFetcher {
|
|||||||
|
|
||||||
if let Some(finalized_block_number) = finalized_block_number {
|
if let Some(finalized_block_number) = finalized_block_number {
|
||||||
let safe_block_number = std::cmp::min(
|
let safe_block_number = std::cmp::min(
|
||||||
std::cmp::min(log_latest_block_number, finalized_block_number),
|
std::cmp::min(
|
||||||
|
log_latest_block_number.saturating_sub(1),
|
||||||
|
finalized_block_number,
|
||||||
|
),
|
||||||
processed_block_number,
|
processed_block_number,
|
||||||
);
|
);
|
||||||
let mut pending_keys = vec![];
|
let mut pending_keys = vec![];
|
||||||
@ -219,7 +222,7 @@ impl LogEntryFetcher {
|
|||||||
) -> UnboundedReceiver<LogFetchProgress> {
|
) -> UnboundedReceiver<LogFetchProgress> {
|
||||||
let provider = self.provider.clone();
|
let provider = self.provider.clone();
|
||||||
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
|
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
let contract = ZgsFlow::new(self.contract_address, provider.clone());
|
let contract = self.flow_contract();
|
||||||
let log_page_size = self.log_page_size;
|
let log_page_size = self.log_page_size;
|
||||||
|
|
||||||
executor.spawn(
|
executor.spawn(
|
||||||
@ -302,7 +305,7 @@ impl LogEntryFetcher {
|
|||||||
mut watch_progress_rx: UnboundedReceiver<u64>,
|
mut watch_progress_rx: UnboundedReceiver<u64>,
|
||||||
) -> UnboundedReceiver<LogFetchProgress> {
|
) -> UnboundedReceiver<LogFetchProgress> {
|
||||||
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
|
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
let contract = self.flow_contract();
|
||||||
let provider = self.provider.clone();
|
let provider = self.provider.clone();
|
||||||
let confirmation_delay = self.confirmation_delay;
|
let confirmation_delay = self.confirmation_delay;
|
||||||
let log_page_size = self.log_page_size;
|
let log_page_size = self.log_page_size;
|
||||||
@ -580,6 +583,10 @@ impl LogEntryFetcher {
|
|||||||
pub fn provider(&self) -> &Provider<RetryClient<Http>> {
|
pub fn provider(&self) -> &Provider<RetryClient<Http>> {
|
||||||
self.provider.as_ref()
|
self.provider.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn flow_contract(&self) -> ZgsFlow<Provider<RetryClient<Http>>> {
|
||||||
|
ZgsFlow::new(self.contract_address, self.provider.clone())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_watch_process(
|
async fn check_watch_process(
|
||||||
@ -655,17 +662,24 @@ async fn check_watch_process(
|
|||||||
"get block hash for block {} from RPC, assume there is no org",
|
"get block hash for block {} from RPC, assume there is no org",
|
||||||
*progress - 1
|
*progress - 1
|
||||||
);
|
);
|
||||||
match provider.get_block(*progress - 1).await {
|
let hash = loop {
|
||||||
Ok(Some(v)) => {
|
match provider.get_block(*progress - 1).await {
|
||||||
v.hash.expect("parent block hash expect exist");
|
Ok(Some(v)) => {
|
||||||
|
break v.hash.expect("parent block hash expect exist");
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
panic!("parent block {} expect exist", *progress - 1);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if e.to_string().contains("server is too busy") {
|
||||||
|
warn!("server busy, wait for parent block {}", *progress - 1);
|
||||||
|
} else {
|
||||||
|
panic!("parent block {} expect exist, error {}", *progress - 1, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
};
|
||||||
panic!("parent block {} expect exist", *progress - 1);
|
break hash;
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
panic!("parent block {} expect exist, error {}", *progress - 1, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -510,6 +510,41 @@ impl LogSyncManager {
|
|||||||
}
|
}
|
||||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||||
self.next_tx_seq += 1;
|
self.next_tx_seq += 1;
|
||||||
|
|
||||||
|
// Check if the computed data root matches on-chain state.
|
||||||
|
// If the call fails, we won't check the root here and return `true` directly.
|
||||||
|
let flow_contract = self.log_fetcher.flow_contract();
|
||||||
|
match flow_contract
|
||||||
|
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||||
|
.call()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(contract_root_bytes) => {
|
||||||
|
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||||
|
// contract_root is zero for tx submitted before upgrading.
|
||||||
|
if !contract_root.is_zero() {
|
||||||
|
match self.store.get_context() {
|
||||||
|
Ok((local_root, _)) => {
|
||||||
|
if contract_root != local_root {
|
||||||
|
error!(
|
||||||
|
?contract_root,
|
||||||
|
?local_root,
|
||||||
|
"local flow root and on-chain flow root mismatch"
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(?e, "fail to read the local flow root");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(?e, "fail to read the on-chain flow root");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ mod service;
|
|||||||
use duration_str::deserialize_duration;
|
use duration_str::deserialize_duration;
|
||||||
use network::Multiaddr;
|
use network::Multiaddr;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::time::Duration;
|
use std::{net::IpAddr, time::Duration};
|
||||||
|
|
||||||
pub use crate::service::RouterService;
|
pub use crate::service::RouterService;
|
||||||
|
|
||||||
@ -26,6 +26,7 @@ pub struct Config {
|
|||||||
pub libp2p_nodes: Vec<Multiaddr>,
|
pub libp2p_nodes: Vec<Multiaddr>,
|
||||||
pub private_ip_enabled: bool,
|
pub private_ip_enabled: bool,
|
||||||
pub check_announced_ip: bool,
|
pub check_announced_ip: bool,
|
||||||
|
pub public_address: Option<IpAddr>,
|
||||||
|
|
||||||
// batcher
|
// batcher
|
||||||
/// Timeout to publish messages in batch
|
/// Timeout to publish messages in batch
|
||||||
@ -47,6 +48,7 @@ impl Default for Config {
|
|||||||
libp2p_nodes: vec![],
|
libp2p_nodes: vec![],
|
||||||
private_ip_enabled: false,
|
private_ip_enabled: false,
|
||||||
check_announced_ip: false,
|
check_announced_ip: false,
|
||||||
|
public_address: None,
|
||||||
|
|
||||||
batcher_timeout: Duration::from_secs(1),
|
batcher_timeout: Duration::from_secs(1),
|
||||||
batcher_file_capacity: 1,
|
batcher_file_capacity: 1,
|
||||||
|
@ -348,17 +348,26 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> {
|
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
|
||||||
|
// public address configured
|
||||||
|
if let Some(ip) = self.config.public_address {
|
||||||
|
let mut addr = Multiaddr::empty();
|
||||||
|
addr.push(ip.into());
|
||||||
|
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
|
||||||
|
return Some(addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// public listen address
|
||||||
if let Some(addr) = self.get_listen_addr() {
|
if let Some(addr) = self.get_listen_addr() {
|
||||||
return Some(addr);
|
return Some(addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// auto detect public IP address
|
||||||
let ipv4_addr = public_ip::addr_v4().await?;
|
let ipv4_addr = public_ip::addr_v4().await?;
|
||||||
|
|
||||||
let mut addr = Multiaddr::empty();
|
let mut addr = Multiaddr::empty();
|
||||||
addr.push(Protocol::Ip4(ipv4_addr));
|
addr.push(Protocol::Ip4(ipv4_addr));
|
||||||
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
|
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
|
||||||
addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
|
|
||||||
|
|
||||||
self.network_globals
|
self.network_globals
|
||||||
.listen_multiaddrs
|
.listen_multiaddrs
|
||||||
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
|
|||||||
|
|
||||||
let peer_id = *self.network_globals.peer_id.read();
|
let peer_id = *self.network_globals.peer_id.read();
|
||||||
|
|
||||||
let addr = self.get_listen_addr_or_add().await?;
|
let addr = self.construct_announced_ip().await?;
|
||||||
|
|
||||||
let timestamp = timestamp_now();
|
let timestamp = timestamp_now();
|
||||||
let shard_config = self.store.get_store().get_shard_config();
|
let shard_config = self.store.get_store().get_shard_config();
|
||||||
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
|
|||||||
shard_config: ShardConfig,
|
shard_config: ShardConfig,
|
||||||
) -> Option<PubsubMessage> {
|
) -> Option<PubsubMessage> {
|
||||||
let peer_id = *self.network_globals.peer_id.read();
|
let peer_id = *self.network_globals.peer_id.read();
|
||||||
let addr = self.get_listen_addr_or_add().await?;
|
let addr = self.construct_announced_ip().await?;
|
||||||
let timestamp = timestamp_now();
|
let timestamp = timestamp_now();
|
||||||
|
|
||||||
let msg = AnnounceShardConfig {
|
let msg = AnnounceShardConfig {
|
||||||
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
|
|||||||
index_end: u64,
|
index_end: u64,
|
||||||
) -> Option<PubsubMessage> {
|
) -> Option<PubsubMessage> {
|
||||||
let peer_id = *self.network_globals.peer_id.read();
|
let peer_id = *self.network_globals.peer_id.read();
|
||||||
let addr = self.get_listen_addr_or_add().await?;
|
let addr = self.construct_announced_ip().await?;
|
||||||
let timestamp = timestamp_now();
|
let timestamp = timestamp_now();
|
||||||
|
|
||||||
let msg = AnnounceChunks {
|
let msg = AnnounceChunks {
|
||||||
|
@ -12,9 +12,23 @@ pub trait Rpc {
|
|||||||
#[method(name = "uploadSegment")]
|
#[method(name = "uploadSegment")]
|
||||||
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()>;
|
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()>;
|
||||||
|
|
||||||
|
#[method(name = "uploadSegmentByTxSeq")]
|
||||||
|
async fn upload_segment_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
segment: SegmentWithProof,
|
||||||
|
tx_seq: u64,
|
||||||
|
) -> RpcResult<()>;
|
||||||
|
|
||||||
#[method(name = "uploadSegments")]
|
#[method(name = "uploadSegments")]
|
||||||
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()>;
|
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()>;
|
||||||
|
|
||||||
|
#[method(name = "uploadSegmentsByTxSeq")]
|
||||||
|
async fn upload_segments_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
segments: Vec<SegmentWithProof>,
|
||||||
|
tx_seq: u64,
|
||||||
|
) -> RpcResult<()>;
|
||||||
|
|
||||||
#[method(name = "downloadSegment")]
|
#[method(name = "downloadSegment")]
|
||||||
async fn download_segment(
|
async fn download_segment(
|
||||||
&self,
|
&self,
|
||||||
@ -23,6 +37,14 @@ pub trait Rpc {
|
|||||||
end_index: usize,
|
end_index: usize,
|
||||||
) -> RpcResult<Option<Segment>>;
|
) -> RpcResult<Option<Segment>>;
|
||||||
|
|
||||||
|
#[method(name = "downloadSegmentByTxSeq")]
|
||||||
|
async fn download_segment_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
tx_seq: u64,
|
||||||
|
start_index: usize,
|
||||||
|
end_index: usize,
|
||||||
|
) -> RpcResult<Option<Segment>>;
|
||||||
|
|
||||||
#[method(name = "downloadSegmentWithProof")]
|
#[method(name = "downloadSegmentWithProof")]
|
||||||
async fn download_segment_with_proof(
|
async fn download_segment_with_proof(
|
||||||
&self,
|
&self,
|
||||||
@ -30,6 +52,13 @@ pub trait Rpc {
|
|||||||
index: usize,
|
index: usize,
|
||||||
) -> RpcResult<Option<SegmentWithProof>>;
|
) -> RpcResult<Option<SegmentWithProof>>;
|
||||||
|
|
||||||
|
#[method(name = "downloadSegmentWithProofByTxSeq")]
|
||||||
|
async fn download_segment_with_proof_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
tx_seq: u64,
|
||||||
|
index: usize,
|
||||||
|
) -> RpcResult<Option<SegmentWithProof>>;
|
||||||
|
|
||||||
#[method(name = "checkFileFinalized")]
|
#[method(name = "checkFileFinalized")]
|
||||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
|
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
|
||||||
|
|
||||||
|
@ -42,6 +42,16 @@ impl RpcServer for RpcServerImpl {
|
|||||||
self.put_segment(segment).await
|
self.put_segment(segment).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn upload_segment_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
segment: SegmentWithProof,
|
||||||
|
tx_seq: u64,
|
||||||
|
) -> RpcResult<()> {
|
||||||
|
info!(tx_seq = %tx_seq, index = %segment.index, "zgs_uploadSegmentByTxSeq");
|
||||||
|
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||||
|
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
|
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
|
||||||
let root = match segments.first() {
|
let root = match segments.first() {
|
||||||
None => return Ok(()),
|
None => return Ok(()),
|
||||||
@ -57,6 +67,23 @@ impl RpcServer for RpcServerImpl {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn upload_segments_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
segments: Vec<SegmentWithProof>,
|
||||||
|
tx_seq: u64,
|
||||||
|
) -> RpcResult<()> {
|
||||||
|
let indices = SegmentIndexArray::new(&segments);
|
||||||
|
info!(%tx_seq, ?indices, "zgs_uploadSegmentsByTxSeq");
|
||||||
|
|
||||||
|
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||||
|
for segment in segments.into_iter() {
|
||||||
|
self.put_segment_with_maybe_tx(segment, maybe_tx.clone())
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn download_segment(
|
async fn download_segment(
|
||||||
&self,
|
&self,
|
||||||
data_root: DataRoot,
|
data_root: DataRoot,
|
||||||
@ -65,34 +92,26 @@ impl RpcServer for RpcServerImpl {
|
|||||||
) -> RpcResult<Option<Segment>> {
|
) -> RpcResult<Option<Segment>> {
|
||||||
info!(%data_root, %start_index, %end_index, "zgs_downloadSegment");
|
info!(%data_root, %start_index, %end_index, "zgs_downloadSegment");
|
||||||
|
|
||||||
if start_index >= end_index {
|
|
||||||
return Err(error::invalid_params("end_index", "invalid chunk index"));
|
|
||||||
}
|
|
||||||
|
|
||||||
if end_index - start_index > self.ctx.config.chunks_per_segment {
|
|
||||||
return Err(error::invalid_params(
|
|
||||||
"end_index",
|
|
||||||
format!(
|
|
||||||
"exceeds maximum chunks {}",
|
|
||||||
self.ctx.config.chunks_per_segment
|
|
||||||
),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let tx_seq = try_option!(
|
let tx_seq = try_option!(
|
||||||
self.ctx
|
self.ctx
|
||||||
.log_store
|
.log_store
|
||||||
.get_tx_seq_by_data_root(&data_root)
|
.get_tx_seq_by_data_root(&data_root)
|
||||||
.await?
|
.await?
|
||||||
);
|
);
|
||||||
let segment = try_option!(
|
|
||||||
self.ctx
|
|
||||||
.log_store
|
|
||||||
.get_chunks_by_tx_and_index_range(tx_seq, start_index, end_index)
|
|
||||||
.await?
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(Some(Segment(segment.data)))
|
self.get_segment_by_tx_seq(tx_seq, start_index, end_index)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn download_segment_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
tx_seq: u64,
|
||||||
|
start_index: usize,
|
||||||
|
end_index: usize,
|
||||||
|
) -> RpcResult<Option<Segment>> {
|
||||||
|
info!(%tx_seq, %start_index, %end_index, "zgs_downloadSegmentByTxSeq");
|
||||||
|
self.get_segment_by_tx_seq(tx_seq, start_index, end_index)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn download_segment_with_proof(
|
async fn download_segment_with_proof(
|
||||||
@ -104,40 +123,19 @@ impl RpcServer for RpcServerImpl {
|
|||||||
|
|
||||||
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
|
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
|
||||||
|
|
||||||
// validate index
|
self.get_segment_with_proof_by_tx(tx, index).await
|
||||||
let chunks_per_segment = self.ctx.config.chunks_per_segment;
|
}
|
||||||
let (num_segments, last_segment_size) =
|
|
||||||
SegmentWithProof::split_file_into_segments(tx.size as usize, chunks_per_segment)?;
|
|
||||||
|
|
||||||
if index >= num_segments {
|
async fn download_segment_with_proof_by_tx_seq(
|
||||||
return Err(error::invalid_params("index", "index out of bound"));
|
&self,
|
||||||
}
|
tx_seq: u64,
|
||||||
|
index: usize,
|
||||||
|
) -> RpcResult<Option<SegmentWithProof>> {
|
||||||
|
info!(%tx_seq, %index, "zgs_downloadSegmentWithProofByTxSeq");
|
||||||
|
|
||||||
// calculate chunk start and end index
|
let tx = try_option!(self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?);
|
||||||
let start_index = index * chunks_per_segment;
|
|
||||||
let end_index = if index == num_segments - 1 {
|
|
||||||
// last segment without padding chunks by flow
|
|
||||||
start_index + last_segment_size / CHUNK_SIZE
|
|
||||||
} else {
|
|
||||||
start_index + chunks_per_segment
|
|
||||||
};
|
|
||||||
|
|
||||||
let segment = try_option!(
|
self.get_segment_with_proof_by_tx(tx, index).await
|
||||||
self.ctx
|
|
||||||
.log_store
|
|
||||||
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index, None)
|
|
||||||
.await?
|
|
||||||
);
|
|
||||||
|
|
||||||
let proof = tx.compute_segment_proof(&segment, chunks_per_segment)?;
|
|
||||||
|
|
||||||
Ok(Some(SegmentWithProof {
|
|
||||||
root: data_root,
|
|
||||||
data: segment.chunks.data,
|
|
||||||
index,
|
|
||||||
proof,
|
|
||||||
file_size: tx.size as usize,
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
|
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
|
||||||
@ -277,15 +275,29 @@ impl RpcServerImpl {
|
|||||||
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||||
debug!(root = %segment.root, index = %segment.index, "putSegment");
|
debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||||
|
|
||||||
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
|
||||||
|
|
||||||
let maybe_tx = self
|
let maybe_tx = self
|
||||||
.ctx
|
.ctx
|
||||||
.log_store
|
.log_store
|
||||||
.get_tx_by_data_root(&segment.root)
|
.get_tx_by_data_root(&segment.root)
|
||||||
.await?;
|
.await?;
|
||||||
let mut need_cache = false;
|
|
||||||
|
|
||||||
|
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn put_segment_with_maybe_tx(
|
||||||
|
&self,
|
||||||
|
segment: SegmentWithProof,
|
||||||
|
maybe_tx: Option<Transaction>,
|
||||||
|
) -> RpcResult<()> {
|
||||||
|
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||||
|
|
||||||
|
if let Some(tx) = &maybe_tx {
|
||||||
|
if tx.data_merkle_root != segment.root {
|
||||||
|
return Err(error::internal_error("data root and tx seq not match"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut need_cache = false;
|
||||||
if self
|
if self
|
||||||
.ctx
|
.ctx
|
||||||
.chunk_pool
|
.chunk_pool
|
||||||
@ -323,6 +335,77 @@ impl RpcServerImpl {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_segment_by_tx_seq(
|
||||||
|
&self,
|
||||||
|
tx_seq: u64,
|
||||||
|
start_index: usize,
|
||||||
|
end_index: usize,
|
||||||
|
) -> RpcResult<Option<Segment>> {
|
||||||
|
if start_index >= end_index {
|
||||||
|
return Err(error::invalid_params("end_index", "invalid chunk index"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if end_index - start_index > self.ctx.config.chunks_per_segment {
|
||||||
|
return Err(error::invalid_params(
|
||||||
|
"end_index",
|
||||||
|
format!(
|
||||||
|
"exceeds maximum chunks {}",
|
||||||
|
self.ctx.config.chunks_per_segment
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let segment = try_option!(
|
||||||
|
self.ctx
|
||||||
|
.log_store
|
||||||
|
.get_chunks_by_tx_and_index_range(tx_seq, start_index, end_index)
|
||||||
|
.await?
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(Some(Segment(segment.data)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_segment_with_proof_by_tx(
|
||||||
|
&self,
|
||||||
|
tx: Transaction,
|
||||||
|
index: usize,
|
||||||
|
) -> RpcResult<Option<SegmentWithProof>> {
|
||||||
|
// validate index
|
||||||
|
let chunks_per_segment = self.ctx.config.chunks_per_segment;
|
||||||
|
let (num_segments, last_segment_size) =
|
||||||
|
SegmentWithProof::split_file_into_segments(tx.size as usize, chunks_per_segment)?;
|
||||||
|
|
||||||
|
if index >= num_segments {
|
||||||
|
return Err(error::invalid_params("index", "index out of bound"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate chunk start and end index
|
||||||
|
let start_index = index * chunks_per_segment;
|
||||||
|
let end_index = if index == num_segments - 1 {
|
||||||
|
// last segment without padding chunks by flow
|
||||||
|
start_index + last_segment_size / CHUNK_SIZE
|
||||||
|
} else {
|
||||||
|
start_index + chunks_per_segment
|
||||||
|
};
|
||||||
|
|
||||||
|
let segment = try_option!(
|
||||||
|
self.ctx
|
||||||
|
.log_store
|
||||||
|
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index, None)
|
||||||
|
.await?
|
||||||
|
);
|
||||||
|
|
||||||
|
let proof = tx.compute_segment_proof(&segment, chunks_per_segment)?;
|
||||||
|
|
||||||
|
Ok(Some(SegmentWithProof {
|
||||||
|
root: tx.data_merkle_root,
|
||||||
|
data: segment.chunks.data,
|
||||||
|
index,
|
||||||
|
proof,
|
||||||
|
file_size: tx.size as usize,
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum SegmentIndex {
|
enum SegmentIndex {
|
||||||
|
@ -113,12 +113,16 @@ impl Transaction {
|
|||||||
1 << (depth - 1)
|
1 << (depth - 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_entries(&self) -> usize {
|
pub fn num_entries_of_list(merkle_nodes: &[(usize, DataRoot)]) -> usize {
|
||||||
self.merkle_nodes.iter().fold(0, |size, &(depth, _)| {
|
merkle_nodes.iter().fold(0, |size, &(depth, _)| {
|
||||||
size + Transaction::num_entries_of_node(depth)
|
size + Transaction::num_entries_of_node(depth)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn num_entries(&self) -> usize {
|
||||||
|
Self::num_entries_of_list(&self.merkle_nodes)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn hash(&self) -> H256 {
|
pub fn hash(&self) -> H256 {
|
||||||
let bytes = self.as_ssz_bytes();
|
let bytes = self.as_ssz_bytes();
|
||||||
let mut h = Keccak::v256();
|
let mut h = Keccak::v256();
|
||||||
|
@ -2,7 +2,7 @@ use super::{Client, RuntimeContext};
|
|||||||
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
|
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
|
||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
||||||
use miner::{MineService, MinerConfig, MinerMessage};
|
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
|
||||||
use network::{
|
use network::{
|
||||||
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
|
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
|
||||||
Service as LibP2PService,
|
Service as LibP2PService,
|
||||||
@ -216,6 +216,16 @@ impl ClientBuilder {
|
|||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
|
||||||
|
self.async_store
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.update_shard_config(config)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(self)
|
||||||
|
}
|
||||||
|
|
||||||
/// Starts the networking stack.
|
/// Starts the networking stack.
|
||||||
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
|
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
|
||||||
let executor = require!("router", self, runtime_context).clone().executor;
|
let executor = require!("router", self, runtime_context).clone().executor;
|
||||||
|
@ -204,6 +204,13 @@ impl ZgsConfig {
|
|||||||
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
|
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
|
||||||
let mut router_config = self.router.clone();
|
let mut router_config = self.router.clone();
|
||||||
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
|
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
|
||||||
|
|
||||||
|
if router_config.public_address.is_none() {
|
||||||
|
if let Some(addr) = &self.network_enr_address {
|
||||||
|
router_config.public_address = Some(addr.parse().unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(router_config)
|
Ok(router_config)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,7 +239,7 @@ impl ZgsConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shard_config(&self) -> Result<ShardConfig, String> {
|
pub fn shard_config(&self) -> Result<ShardConfig, String> {
|
||||||
self.shard_position.clone().try_into()
|
self.shard_position.clone().try_into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
|||||||
let miner_config = config.mine_config()?;
|
let miner_config = config.mine_config()?;
|
||||||
let router_config = config.router_config(&network_config)?;
|
let router_config = config.router_config(&network_config)?;
|
||||||
let pruner_config = config.pruner_config()?;
|
let pruner_config = config.pruner_config()?;
|
||||||
|
let shard_config = config.shard_config()?;
|
||||||
|
|
||||||
ClientBuilder::default()
|
ClientBuilder::default()
|
||||||
.with_runtime_context(context)
|
.with_runtime_context(context)
|
||||||
@ -30,6 +31,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
|||||||
.await?
|
.await?
|
||||||
.with_miner(miner_config)
|
.with_miner(miner_config)
|
||||||
.await?
|
.await?
|
||||||
|
.with_shard(shard_config)
|
||||||
|
.await?
|
||||||
.with_pruner(pruner_config)
|
.with_pruner(pruner_config)
|
||||||
.await?
|
.await?
|
||||||
.with_rpc(config.rpc, config.chunk_pool_config()?)
|
.with_rpc(config.rpc, config.chunk_pool_config()?)
|
||||||
|
@ -29,7 +29,7 @@ itertools = "0.13.0"
|
|||||||
serde = { version = "1.0.197", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
parking_lot = "0.12.3"
|
parking_lot = "0.12.3"
|
||||||
serde_json = "1.0.127"
|
serde_json = "1.0.127"
|
||||||
tokio = { version = "1.10.0", features = ["sync"] }
|
tokio = { version = "1.38.0", features = ["full"] }
|
||||||
task_executor = { path = "../../common/task_executor" }
|
task_executor = { path = "../../common/task_executor" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -9,9 +9,11 @@ use crate::log_store::log_manager::{
|
|||||||
};
|
};
|
||||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
|
use any::Any;
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase};
|
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use kvdb::DBTransaction;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
@ -20,7 +22,7 @@ use std::cmp::Ordering;
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{cmp, mem};
|
use std::{any, cmp, mem};
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, error, trace};
|
||||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||||
|
|
||||||
@ -670,6 +672,14 @@ fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
|||||||
Ok((layer_index, position))
|
Ok((layer_index, position))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn layer_size_key(layer: usize) -> Vec<u8> {
|
||||||
|
let mut key = "layer_size".as_bytes().to_vec();
|
||||||
|
key.extend_from_slice(&layer.to_be_bytes());
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NodeDBTransaction(DBTransaction);
|
||||||
|
|
||||||
impl NodeDatabase<DataRoot> for FlowDBStore {
|
impl NodeDatabase<DataRoot> for FlowDBStore {
|
||||||
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
@ -678,36 +688,67 @@ impl NodeDatabase<DataRoot> for FlowDBStore {
|
|||||||
.map(|v| DataRoot::from_slice(&v)))
|
.map(|v| DataRoot::from_slice(&v)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_node(&self, layer: usize, pos: usize, node: &DataRoot) -> Result<()> {
|
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
|
||||||
let mut tx = self.kvdb.transaction();
|
match self.kvdb.get(COL_FLOW_MPT_NODES, &layer_size_key(layer))? {
|
||||||
tx.put(
|
Some(v) => Ok(Some(try_decode_usize(&v)?)),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_transaction(&self) -> Box<dyn NodeTransaction<DataRoot>> {
|
||||||
|
Box::new(NodeDBTransaction(self.kvdb.transaction()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
|
||||||
|
let db_tx: Box<NodeDBTransaction> = tx
|
||||||
|
.into_any()
|
||||||
|
.downcast()
|
||||||
|
.map_err(|e| anyhow!("downcast failed, e={:?}", e))?;
|
||||||
|
self.kvdb.write(db_tx.0).map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeTransaction<DataRoot> for NodeDBTransaction {
|
||||||
|
fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) {
|
||||||
|
self.0.put(
|
||||||
COL_FLOW_MPT_NODES,
|
COL_FLOW_MPT_NODES,
|
||||||
&encode_mpt_node_key(layer, pos),
|
&encode_mpt_node_key(layer, pos),
|
||||||
node.as_bytes(),
|
node.as_bytes(),
|
||||||
);
|
);
|
||||||
Ok(self.kvdb.write(tx)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_node_list(&self, nodes: &[(usize, usize, &DataRoot)]) -> Result<()> {
|
fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) {
|
||||||
let mut tx = self.kvdb.transaction();
|
|
||||||
for (layer_index, position, data) in nodes {
|
for (layer_index, position, data) in nodes {
|
||||||
tx.put(
|
self.0.put(
|
||||||
COL_FLOW_MPT_NODES,
|
COL_FLOW_MPT_NODES,
|
||||||
&encode_mpt_node_key(*layer_index, *position),
|
&encode_mpt_node_key(*layer_index, *position),
|
||||||
data.as_bytes(),
|
data.as_bytes(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(self.kvdb.write(tx)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_node_list(&self, nodes: &[(usize, usize)]) -> Result<()> {
|
fn remove_node_list(&mut self, nodes: &[(usize, usize)]) {
|
||||||
let mut tx = self.kvdb.transaction();
|
|
||||||
for (layer_index, position) in nodes {
|
for (layer_index, position) in nodes {
|
||||||
tx.delete(
|
self.0.delete(
|
||||||
COL_FLOW_MPT_NODES,
|
COL_FLOW_MPT_NODES,
|
||||||
&encode_mpt_node_key(*layer_index, *position),
|
&encode_mpt_node_key(*layer_index, *position),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(self.kvdb.write(tx)?)
|
}
|
||||||
|
|
||||||
|
fn save_layer_size(&mut self, layer: usize, size: usize) {
|
||||||
|
self.0.put(
|
||||||
|
COL_FLOW_MPT_NODES,
|
||||||
|
&layer_size_key(layer),
|
||||||
|
&size.to_be_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_layer_size(&mut self, layer: usize) {
|
||||||
|
self.0.delete(COL_FLOW_MPT_NODES, &layer_size_key(layer));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_any(self: Box<Self>) -> Box<dyn Any> {
|
||||||
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -258,7 +258,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
}
|
}
|
||||||
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
|
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
|
||||||
// TODO(zz): Should we validate received tx?
|
// TODO(zz): Should we validate received tx?
|
||||||
self.append_subtree_list(tx.merkle_nodes.clone(), &mut merkle)?;
|
self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
|
||||||
merkle.commit_merkle(tx.seq)?;
|
merkle.commit_merkle(tx.seq)?;
|
||||||
debug!(
|
debug!(
|
||||||
"commit flow root: root={:?}",
|
"commit flow root: root={:?}",
|
||||||
@ -629,12 +629,8 @@ impl LogManager {
|
|||||||
let tx_store = TransactionStore::new(db.clone())?;
|
let tx_store = TransactionStore::new(db.clone())?;
|
||||||
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
||||||
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
||||||
let mut initial_data = flow_store.get_chunk_root_list()?;
|
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||||
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
// first and call `put_tx` later.
|
||||||
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
|
||||||
// and inserted later.
|
|
||||||
let mut extra_leaves = Vec::new();
|
|
||||||
|
|
||||||
let next_tx_seq = tx_store.next_tx_seq();
|
let next_tx_seq = tx_store.next_tx_seq();
|
||||||
let mut start_tx_seq = if next_tx_seq > 0 {
|
let mut start_tx_seq = if next_tx_seq > 0 {
|
||||||
Some(next_tx_seq - 1)
|
Some(next_tx_seq - 1)
|
||||||
@ -642,13 +638,19 @@ impl LogManager {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
let mut last_tx_to_insert = None;
|
let mut last_tx_to_insert = None;
|
||||||
|
|
||||||
|
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
||||||
|
flow_db,
|
||||||
|
config.flow.merkle_node_cache_capacity,
|
||||||
|
log2_pow2(PORA_CHUNK_SIZE),
|
||||||
|
)?;
|
||||||
if let Some(last_tx_seq) = start_tx_seq {
|
if let Some(last_tx_seq) = start_tx_seq {
|
||||||
if !tx_store.check_tx_completed(last_tx_seq)? {
|
if !tx_store.check_tx_completed(last_tx_seq)? {
|
||||||
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
||||||
let last_tx = tx_store
|
let last_tx = tx_store
|
||||||
.get_tx_by_seq_number(last_tx_seq)?
|
.get_tx_by_seq_number(last_tx_seq)?
|
||||||
.expect("tx missing");
|
.expect("tx missing");
|
||||||
let mut current_len = initial_data.leaves();
|
let current_len = pora_chunks_merkle.leaves();
|
||||||
let expected_len =
|
let expected_len =
|
||||||
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
||||||
match expected_len.cmp(&(current_len)) {
|
match expected_len.cmp(&(current_len)) {
|
||||||
@ -678,48 +680,21 @@ impl LogManager {
|
|||||||
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
||||||
);
|
);
|
||||||
if current_len > expected_len {
|
if current_len > expected_len {
|
||||||
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
|
pora_chunks_merkle.revert_to_leaves(expected_len)?;
|
||||||
{
|
|
||||||
current_len -= 1 << (subtree_depth - 1);
|
|
||||||
if current_len == expected_len {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!(
|
|
||||||
"revert last tx with no-op: {} {}",
|
|
||||||
current_len, expected_len
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
assert_eq!(current_len, expected_len);
|
start_tx_seq = Some(previous_tx.seq);
|
||||||
while let Some((index, h)) = initial_data.known_leaves.pop() {
|
|
||||||
if index < current_len {
|
|
||||||
initial_data.known_leaves.push((index, h));
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
extra_leaves.push((index, h));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
start_tx_seq = Some(last_tx_seq - 1);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
|
||||||
flow_db,
|
|
||||||
config.flow.merkle_node_cache_capacity,
|
|
||||||
initial_data,
|
|
||||||
log2_pow2(PORA_CHUNK_SIZE),
|
|
||||||
start_tx_seq,
|
|
||||||
)?;
|
|
||||||
let last_chunk_merkle = match start_tx_seq {
|
let last_chunk_merkle = match start_tx_seq {
|
||||||
Some(tx_seq) => {
|
Some(tx_seq) => {
|
||||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
||||||
}
|
}
|
||||||
// Initialize
|
// Initialize
|
||||||
None => Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None),
|
None => Merkle::new_with_depth(vec![], 1, None),
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
@ -751,23 +726,16 @@ impl LogManager {
|
|||||||
log_manager.start_receiver(receiver, executor);
|
log_manager.start_receiver(receiver, executor);
|
||||||
|
|
||||||
if let Some(tx) = last_tx_to_insert {
|
if let Some(tx) = last_tx_to_insert {
|
||||||
log_manager.revert_to(tx.seq - 1)?;
|
|
||||||
log_manager.put_tx(tx)?;
|
log_manager.put_tx(tx)?;
|
||||||
let mut merkle = log_manager.merkle.write();
|
|
||||||
for (index, h) in extra_leaves {
|
|
||||||
if index < merkle.pora_chunks_merkle.leaves() {
|
|
||||||
merkle.pora_chunks_merkle.fill_leaf(index, h);
|
|
||||||
} else {
|
|
||||||
error!("out of range extra leaf: index={} hash={:?}", index, h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
assert!(extra_leaves.is_empty());
|
|
||||||
}
|
}
|
||||||
log_manager
|
log_manager
|
||||||
.merkle
|
.merkle
|
||||||
.write()
|
.write()
|
||||||
.try_initialize(&log_manager.flow_store)?;
|
.try_initialize(&log_manager.flow_store)?;
|
||||||
|
info!(
|
||||||
|
"Log manager initialized, state={:?}",
|
||||||
|
log_manager.get_context()?
|
||||||
|
);
|
||||||
Ok(log_manager)
|
Ok(log_manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -875,6 +843,7 @@ impl LogManager {
|
|||||||
#[instrument(skip(self, merkle))]
|
#[instrument(skip(self, merkle))]
|
||||||
fn append_subtree_list(
|
fn append_subtree_list(
|
||||||
&self,
|
&self,
|
||||||
|
tx_start_index: u64,
|
||||||
merkle_list: Vec<(usize, DataRoot)>,
|
merkle_list: Vec<(usize, DataRoot)>,
|
||||||
merkle: &mut MerkleManager,
|
merkle: &mut MerkleManager,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
@ -882,7 +851,7 @@ impl LogManager {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.pad_tx(1 << (merkle_list[0].0 - 1), &mut *merkle)?;
|
self.pad_tx(tx_start_index, &mut *merkle)?;
|
||||||
|
|
||||||
let mut batch_root_map = BTreeMap::new();
|
let mut batch_root_map = BTreeMap::new();
|
||||||
for (subtree_depth, subtree_root) in merkle_list {
|
for (subtree_depth, subtree_root) in merkle_list {
|
||||||
@ -930,18 +899,18 @@ impl LogManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self, merkle))]
|
#[instrument(skip(self, merkle))]
|
||||||
fn pad_tx(&self, first_subtree_size: u64, merkle: &mut MerkleManager) -> Result<()> {
|
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
||||||
// Check if we need to pad the flow.
|
// Check if we need to pad the flow.
|
||||||
let mut tx_start_flow_index =
|
let mut tx_start_flow_index =
|
||||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
||||||
let extra = tx_start_flow_index % first_subtree_size;
|
let pad_size = tx_start_index - tx_start_flow_index;
|
||||||
trace!(
|
trace!(
|
||||||
"before pad_tx {} {}",
|
"before pad_tx {} {}",
|
||||||
merkle.pora_chunks_merkle.leaves(),
|
merkle.pora_chunks_merkle.leaves(),
|
||||||
merkle.last_chunk_merkle.leaves()
|
merkle.last_chunk_merkle.leaves()
|
||||||
);
|
);
|
||||||
if extra != 0 {
|
if pad_size != 0 {
|
||||||
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
|
for pad_data in Self::padding(pad_size as usize) {
|
||||||
let mut is_full_empty = true;
|
let mut is_full_empty = true;
|
||||||
let mut root_map = BTreeMap::new();
|
let mut root_map = BTreeMap::new();
|
||||||
|
|
||||||
@ -1004,12 +973,10 @@ impl LogManager {
|
|||||||
// Update the flow database.
|
// Update the flow database.
|
||||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||||
// subtrees with data known.
|
// subtrees with data known.
|
||||||
self.flow_store
|
self.flow_store.append_entries(ChunkArray {
|
||||||
.append_entries(ChunkArray {
|
data: pad_data.to_vec(),
|
||||||
data: pad_data.to_vec(),
|
start_index: tx_start_flow_index,
|
||||||
start_index: tx_start_flow_index,
|
})?;
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tx_start_flow_index += data_size as u64;
|
tx_start_flow_index += data_size as u64;
|
||||||
|
@ -3,14 +3,12 @@ use crate::log_store::log_manager::{
|
|||||||
PORA_CHUNK_SIZE,
|
PORA_CHUNK_SIZE,
|
||||||
};
|
};
|
||||||
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
|
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
|
||||||
use append_merkle::{
|
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||||
Algorithm, AppendMerkleTree, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm,
|
|
||||||
};
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use rand::random;
|
use rand::random;
|
||||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::sync::Arc;
|
|
||||||
use task_executor::test_utils::TestRuntime;
|
use task_executor::test_utils::TestRuntime;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -335,11 +335,7 @@ impl TransactionStore {
|
|||||||
}
|
}
|
||||||
let mut merkle = if last_chunk_start_index == 0 {
|
let mut merkle = if last_chunk_start_index == 0 {
|
||||||
// The first entry hash is initialized as zero.
|
// The first entry hash is initialized as zero.
|
||||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
|
||||||
vec![H256::zero()],
|
|
||||||
log2_pow2(PORA_CHUNK_SIZE) + 1,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
||||||
vec![],
|
vec![],
|
||||||
|
@ -33,7 +33,7 @@
|
|||||||
|
|
||||||
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
|
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
|
||||||
# configured as well to enable UDP discovery.
|
# configured as well to enable UDP discovery.
|
||||||
network_boot_nodes = ["/ip4/35.95.5.134/udp/1234/p2p/16Uiu2HAmFGrDV8wKToa1dd8uh6bz8bSY28n33iRP3pvfeBU6ysCw","/ip4/35.84.189.77/udp/1234/p2p/16Uiu2HAmF7t5iuRoWLMvQVfHbbJr5TFgHo2oU1CDxJm56eLdxRAY","/ip4/8.154.34.28/udp/1234/p2p/16Uiu2HAmBb7PQzvfZjHBENcF7E7mZaiHSrpBoH7mKTyNijYdqMM6"]
|
network_boot_nodes = ["/ip4/47.251.88.201/udp/1234/p2p/16Uiu2HAmFGrDV8wKToa1dd8uh6bz8bSY28n33iRP3pvfeBU6ysCw","/ip4/47.76.49.188/udp/1234/p2p/16Uiu2HAmBb7PQzvfZjHBENcF7E7mZaiHSrpBoH7mKTyNijYdqMM6"]
|
||||||
|
|
||||||
# List of libp2p nodes to initially connect to.
|
# List of libp2p nodes to initially connect to.
|
||||||
# network_libp2p_nodes = []
|
# network_libp2p_nodes = []
|
||||||
|
@ -33,7 +33,7 @@
|
|||||||
|
|
||||||
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
|
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
|
||||||
# configured as well to enable UDP discovery.
|
# configured as well to enable UDP discovery.
|
||||||
network_boot_nodes = ["/ip4/54.219.26.22/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQWu3i1FL1aFYeh9wiQTNu4pDCgps","/ip4/52.52.127.117/udp/1234/p2p/16Uiu2HAkzRjxK2gorngB1Xq84qDrT4hSVznYDHj6BkbaE4SGx9oS","/ip4/8.154.47.100/udp/1234/p2p/16Uiu2HAm2k6ua2mGgvZ8rTMV8GhpW71aVzkQWy7D37TTDuLCpgmX"]
|
network_boot_nodes = ["/ip4/47.251.117.133/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQWu3i1FL1aFYeh9wiQTNu4pDCgps","/ip4/47.76.61.226/udp/1234/p2p/16Uiu2HAm2k6ua2mGgvZ8rTMV8GhpW71aVzkQWy7D37TTDuLCpgmX"]
|
||||||
|
|
||||||
# List of libp2p nodes to initially connect to.
|
# List of libp2p nodes to initially connect to.
|
||||||
# network_libp2p_nodes = []
|
# network_libp2p_nodes = []
|
||||||
|
@ -1 +1 @@
|
|||||||
a0b536c6acff24b5d4bf20d9db4e95c399e61196
|
bea58429e436e4952ae69235d9079cfc4ac5f3b3
|
||||||
|
File diff suppressed because one or more lines are too long
@ -25,10 +25,23 @@
|
|||||||
"outputs": [],
|
"outputs": [],
|
||||||
"stateMutability": "nonpayable",
|
"stateMutability": "nonpayable",
|
||||||
"type": "function"
|
"type": "function"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"inputs": [],
|
||||||
|
"name": "pricePerSector",
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"internalType": "uint256",
|
||||||
|
"name": "",
|
||||||
|
"type": "uint256"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"stateMutability": "pure",
|
||||||
|
"type": "function"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"bytecode": "0x6080604052348015600f57600080fd5b5060a08061001e6000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c8063da6eb36a14602d575b600080fd5b603d6038366004603f565b505050565b005b600080600060608486031215605357600080fd5b50508135936020830135935060409092013591905056fea2646970667358221220fba54ab16c6496385cdd933e87b05b9e545a857b82ffa918f0d0e4a34ae41d7164736f6c63430008100033",
|
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||||
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060285760003560e01c8063da6eb36a14602d575b600080fd5b603d6038366004603f565b505050565b005b600080600060608486031215605357600080fd5b50508135936020830135935060409092013591905056fea2646970667358221220fba54ab16c6496385cdd933e87b05b9e545a857b82ffa918f0d0e4a34ae41d7164736f6c63430008100033",
|
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||||
"linkReferences": {},
|
"linkReferences": {},
|
||||||
"deployedLinkReferences": {}
|
"deployedLinkReferences": {}
|
||||||
}
|
}
|
||||||
|
@ -70,8 +70,8 @@
|
|||||||
"type": "function"
|
"type": "function"
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ebb4f7274983bea96e7fd68a63e91f4ad67260ff76111312d8c8559b9b5b621064736f6c63430008100033",
|
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||||
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ebb4f7274983bea96e7fd68a63e91f4ad67260ff76111312d8c8559b9b5b621064736f6c63430008100033",
|
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||||
"linkReferences": {},
|
"linkReferences": {},
|
||||||
"deployedLinkReferences": {}
|
"deployedLinkReferences": {}
|
||||||
}
|
}
|
||||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
43
tests/node_cache_test.py
Executable file
43
tests/node_cache_test.py
Executable file
@ -0,0 +1,43 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from utility.submission import create_submission, submit_data
|
||||||
|
from utility.utils import wait_until
|
||||||
|
|
||||||
|
|
||||||
|
class NodeCacheTest(TestFramework):
|
||||||
|
def setup_params(self):
|
||||||
|
self.zgs_node_configs[0] = {
|
||||||
|
"merkle_node_cache_capacity": 1024,
|
||||||
|
}
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
client = self.nodes[0]
|
||||||
|
|
||||||
|
chunk_data = b"\x02" * 256 * 1024 * 1024 * 3
|
||||||
|
submissions, data_root = create_submission(chunk_data)
|
||||||
|
self.contract.submit(submissions)
|
||||||
|
wait_until(lambda: self.contract.num_submissions() == 1)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
|
||||||
|
segment = submit_data(client, chunk_data)
|
||||||
|
self.log.info("segment: %s", len(segment))
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
self.stop_storage_node(0)
|
||||||
|
self.start_storage_node(0)
|
||||||
|
self.nodes[0].wait_for_rpc_connection()
|
||||||
|
|
||||||
|
chunk_data = b"\x03" * 256 * (1024 * 765 + 5)
|
||||||
|
submissions, data_root = create_submission(chunk_data)
|
||||||
|
self.contract.submit(submissions)
|
||||||
|
wait_until(lambda: self.contract.num_submissions() == 2)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
|
||||||
|
segment = submit_data(client, chunk_data)
|
||||||
|
self.log.info("segment: %s", len(segment))
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
NodeCacheTest().main()
|
81
tests/shard_submission_test.py
Normal file
81
tests/shard_submission_test.py
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import time
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import random
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from utility.submission import ENTRY_SIZE, submit_data
|
||||||
|
from utility.submission import create_submission
|
||||||
|
from utility.utils import (
|
||||||
|
assert_equal,
|
||||||
|
wait_until,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ShardSubmitTest(TestFramework):
|
||||||
|
|
||||||
|
def setup_params(self):
|
||||||
|
self.num_blockchain_nodes = 1
|
||||||
|
self.num_nodes = 4
|
||||||
|
self.zgs_node_configs[0] = {
|
||||||
|
"db_max_num_sectors": 2 ** 30,
|
||||||
|
"shard_position": "0/4"
|
||||||
|
}
|
||||||
|
self.zgs_node_configs[1] = {
|
||||||
|
"db_max_num_sectors": 2 ** 30,
|
||||||
|
"shard_position": "1/4"
|
||||||
|
}
|
||||||
|
self.zgs_node_configs[2] = {
|
||||||
|
"db_max_num_sectors": 2 ** 30,
|
||||||
|
"shard_position": "2/4"
|
||||||
|
}
|
||||||
|
self.zgs_node_configs[3] = {
|
||||||
|
"db_max_num_sectors": 2 ** 30,
|
||||||
|
"shard_position": "3/4"
|
||||||
|
}
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
data_size = [
|
||||||
|
256*960,
|
||||||
|
256*1024,
|
||||||
|
2,
|
||||||
|
255,
|
||||||
|
256*960,
|
||||||
|
256*120,
|
||||||
|
256,
|
||||||
|
257,
|
||||||
|
1023,
|
||||||
|
1024,
|
||||||
|
1025,
|
||||||
|
256 * 1023,
|
||||||
|
256 * 1023 + 1,
|
||||||
|
256 * 1024,
|
||||||
|
256 * 1024 + 1,
|
||||||
|
256 * 1025,
|
||||||
|
256 * 2048 - 1,
|
||||||
|
256 * 2048,
|
||||||
|
256 * 16385,
|
||||||
|
256 * 1024 * 256,
|
||||||
|
]
|
||||||
|
|
||||||
|
for i, v in enumerate(data_size):
|
||||||
|
self.submission_data(v, i + 1, True)
|
||||||
|
|
||||||
|
def submission_data(self, size, submission_index, rand_data=True):
|
||||||
|
self.log.info("file size: %d", size)
|
||||||
|
chunk_data = random.randbytes(size) if rand_data else b"\x10" * size
|
||||||
|
|
||||||
|
submissions, data_root = create_submission(chunk_data)
|
||||||
|
self.log.info("data root: %s, submissions: %s", data_root, submissions)
|
||||||
|
self.contract.submit(submissions)
|
||||||
|
|
||||||
|
wait_until(lambda: self.contract.num_submissions() == submission_index)
|
||||||
|
|
||||||
|
for i in range(4):
|
||||||
|
client = self.nodes[i]
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
submit_data(client, chunk_data)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
ShardSubmitTest().main()
|
@ -194,13 +194,16 @@ def generate_merkle_tree_by_batch(data):
|
|||||||
|
|
||||||
|
|
||||||
def submit_data(client, data):
|
def submit_data(client, data):
|
||||||
|
# NOTE: we assume the data is unique in this function, otherwise zgs_getFileInfo will only get the information of the first data with same root
|
||||||
shard_config = client.rpc.zgs_getShardConfig()
|
shard_config = client.rpc.zgs_getShardConfig()
|
||||||
shard_id = int(shard_config["shardId"])
|
shard_id = int(shard_config["shardId"])
|
||||||
num_shard = int(shard_config["numShard"])
|
num_shard = int(shard_config["numShard"])
|
||||||
|
|
||||||
segments = data_to_segments(data)
|
segments = data_to_segments(data)
|
||||||
|
file_info = client.zgs_get_file_info(segments[0]["root"])
|
||||||
|
start_seg_index = file_info["tx"]["startEntryIndex"] // 1024
|
||||||
for index, segment in enumerate(segments):
|
for index, segment in enumerate(segments):
|
||||||
if index % num_shard == shard_id:
|
if (start_seg_index + index) % num_shard == shard_id:
|
||||||
client.zgs_upload_segment(segment)
|
client.zgs_upload_segment(segment)
|
||||||
return segments
|
return segments
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user