mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
4 Commits
065b436fba
...
be5b997c89
Author | SHA1 | Date | |
---|---|---|---|
![]() |
be5b997c89 | ||
![]() |
80b4d63cba | ||
![]() |
b2a70501c2 | ||
![]() |
005e28266a |
@ -59,10 +59,12 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
merkle.node_manager.commit();
|
||||||
return merkle;
|
return merkle;
|
||||||
}
|
}
|
||||||
// Reconstruct the whole tree.
|
// Reconstruct the whole tree.
|
||||||
merkle.recompute(0, 0, None);
|
merkle.recompute(0, 0, None);
|
||||||
|
merkle.node_manager.commit();
|
||||||
// Commit the first version in memory.
|
// Commit the first version in memory.
|
||||||
// TODO(zz): Check when the roots become available.
|
// TODO(zz): Check when the roots become available.
|
||||||
merkle.commit(start_tx_seq);
|
merkle.commit(start_tx_seq);
|
||||||
@ -116,6 +118,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
merkle.node_manager.commit();
|
||||||
merkle
|
merkle
|
||||||
} else {
|
} else {
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
@ -134,6 +137,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
// Reconstruct the whole tree.
|
// Reconstruct the whole tree.
|
||||||
merkle.recompute(0, 0, None);
|
merkle.recompute(0, 0, None);
|
||||||
|
merkle.node_manager.commit();
|
||||||
// Commit the first version in memory.
|
// Commit the first version in memory.
|
||||||
merkle.commit(start_tx_seq);
|
merkle.commit(start_tx_seq);
|
||||||
merkle
|
merkle
|
||||||
|
@ -115,8 +115,9 @@ impl<E: HashElement> NodeManager<E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_transaction(&mut self) {
|
pub fn start_transaction(&mut self) {
|
||||||
if self.db_tx.is_none() {
|
if self.db_tx.is_some() {
|
||||||
error!("start new tx before commit");
|
error!("start new tx before commit");
|
||||||
|
panic!("start new tx before commit");
|
||||||
}
|
}
|
||||||
self.db_tx = Some(self.db.start_transaction());
|
self.db_tx = Some(self.db.start_transaction());
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
|
|||||||
use jsonrpsee::core::RpcResult;
|
use jsonrpsee::core::RpcResult;
|
||||||
use jsonrpsee::proc_macros::rpc;
|
use jsonrpsee::proc_macros::rpc;
|
||||||
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
|
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
|
||||||
use storage::config::ShardConfig;
|
use storage::{config::ShardConfig, H256};
|
||||||
|
|
||||||
#[rpc(server, client, namespace = "zgs")]
|
#[rpc(server, client, namespace = "zgs")]
|
||||||
pub trait Rpc {
|
pub trait Rpc {
|
||||||
@ -77,4 +77,7 @@ pub trait Rpc {
|
|||||||
sector_index: u64,
|
sector_index: u64,
|
||||||
flow_root: Option<DataRoot>,
|
flow_root: Option<DataRoot>,
|
||||||
) -> RpcResult<FlowProof>;
|
) -> RpcResult<FlowProof>;
|
||||||
|
|
||||||
|
#[method(name = "getFlowContext")]
|
||||||
|
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
|
|||||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||||
use std::fmt::{Debug, Formatter, Result};
|
use std::fmt::{Debug, Formatter, Result};
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
use storage::try_option;
|
use storage::{try_option, H256};
|
||||||
|
|
||||||
pub struct RpcServerImpl {
|
pub struct RpcServerImpl {
|
||||||
pub ctx: Context,
|
pub ctx: Context,
|
||||||
@ -198,6 +198,10 @@ impl RpcServer for RpcServerImpl {
|
|||||||
assert_eq!(proof.left_proof, proof.right_proof);
|
assert_eq!(proof.left_proof, proof.right_proof);
|
||||||
Ok(proof.right_proof)
|
Ok(proof.right_proof)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
|
||||||
|
Ok(self.ctx.log_store.get_context().await?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcServerImpl {
|
impl RpcServerImpl {
|
||||||
|
@ -651,8 +651,12 @@ impl LogManager {
|
|||||||
.get_tx_by_seq_number(last_tx_seq)?
|
.get_tx_by_seq_number(last_tx_seq)?
|
||||||
.expect("tx missing");
|
.expect("tx missing");
|
||||||
let current_len = pora_chunks_merkle.leaves();
|
let current_len = pora_chunks_merkle.leaves();
|
||||||
let expected_len =
|
let expected_len = sector_to_segment(
|
||||||
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
last_tx.start_entry_index
|
||||||
|
+ last_tx.num_entries() as u64
|
||||||
|
+ PORA_CHUNK_SIZE as u64
|
||||||
|
- 1,
|
||||||
|
);
|
||||||
match expected_len.cmp(&(current_len)) {
|
match expected_len.cmp(&(current_len)) {
|
||||||
Ordering::Less => {
|
Ordering::Less => {
|
||||||
bail!(
|
bail!(
|
||||||
@ -681,6 +685,8 @@ impl LogManager {
|
|||||||
);
|
);
|
||||||
if current_len > expected_len {
|
if current_len > expected_len {
|
||||||
pora_chunks_merkle.revert_to_leaves(expected_len)?;
|
pora_chunks_merkle.revert_to_leaves(expected_len)?;
|
||||||
|
} else {
|
||||||
|
assert_eq!(current_len, expected_len);
|
||||||
}
|
}
|
||||||
start_tx_seq = Some(previous_tx.seq);
|
start_tx_seq = Some(previous_tx.seq);
|
||||||
};
|
};
|
||||||
@ -691,10 +697,20 @@ impl LogManager {
|
|||||||
|
|
||||||
let last_chunk_merkle = match start_tx_seq {
|
let last_chunk_merkle = match start_tx_seq {
|
||||||
Some(tx_seq) => {
|
Some(tx_seq) => {
|
||||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
let tx = tx_store.get_tx_by_seq_number(tx_seq)?.expect("tx missing");
|
||||||
|
if (tx.start_entry_index() + tx.num_entries() as u64) % PORA_CHUNK_SIZE as u64 == 0
|
||||||
|
{
|
||||||
|
// The last chunk should be aligned, so it's empty.
|
||||||
|
Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None)
|
||||||
|
} else {
|
||||||
|
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves() - 1, tx_seq)?
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Initialize
|
// Initialize
|
||||||
None => Merkle::new_with_depth(vec![], 1, None),
|
None => {
|
||||||
|
pora_chunks_merkle.reset();
|
||||||
|
Merkle::new_with_depth(vec![], 1, None)
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
@ -704,10 +720,10 @@ impl LogManager {
|
|||||||
last_chunk_merkle.leaves(),
|
last_chunk_merkle.leaves(),
|
||||||
);
|
);
|
||||||
if last_chunk_merkle.leaves() != 0 {
|
if last_chunk_merkle.leaves() != 0 {
|
||||||
pora_chunks_merkle.append(last_chunk_merkle.root());
|
pora_chunks_merkle.update_last(last_chunk_merkle.root());
|
||||||
// update the merkle root
|
|
||||||
pora_chunks_merkle.commit(start_tx_seq);
|
|
||||||
}
|
}
|
||||||
|
// update the merkle root
|
||||||
|
pora_chunks_merkle.commit(start_tx_seq);
|
||||||
let merkle = RwLock::new(MerkleManager {
|
let merkle = RwLock::new(MerkleManager {
|
||||||
pora_chunks_merkle,
|
pora_chunks_merkle,
|
||||||
last_chunk_merkle,
|
last_chunk_merkle,
|
||||||
@ -763,7 +779,8 @@ impl LogManager {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
std::result::Result::Err(_) => {
|
std::result::Result::Err(_) => {
|
||||||
error!("Receiver error");
|
debug!("Log manager inner channel closed");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -292,6 +292,9 @@ impl TransactionStore {
|
|||||||
match tx.start_entry_index.cmp(&last_chunk_start_index) {
|
match tx.start_entry_index.cmp(&last_chunk_start_index) {
|
||||||
cmp::Ordering::Greater => {
|
cmp::Ordering::Greater => {
|
||||||
tx_list.push((tx_seq, tx.merkle_nodes));
|
tx_list.push((tx_seq, tx.merkle_nodes));
|
||||||
|
if tx.start_entry_index >= last_chunk_start_index + PORA_CHUNK_SIZE as u64 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cmp::Ordering::Equal => {
|
cmp::Ordering::Equal => {
|
||||||
tx_list.push((tx_seq, tx.merkle_nodes));
|
tx_list.push((tx_seq, tx.merkle_nodes));
|
||||||
|
0
tests/crash_test.py
Normal file → Executable file
0
tests/crash_test.py
Normal file → Executable file
55
tests/root_consistency_test.py
Executable file
55
tests/root_consistency_test.py
Executable file
@ -0,0 +1,55 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
|
||||||
|
from utility.submission import create_submission, submit_data
|
||||||
|
from utility.utils import wait_until, assert_equal
|
||||||
|
from test_framework.blockchain_node import BlockChainNodeType
|
||||||
|
|
||||||
|
|
||||||
|
class RootConsistencyTest(TestFramework):
|
||||||
|
def setup_params(self):
|
||||||
|
self.num_blockchain_nodes = 1
|
||||||
|
self.num_nodes = 1
|
||||||
|
|
||||||
|
def submit_data(self, item, size):
|
||||||
|
submissions_before = self.contract.num_submissions()
|
||||||
|
client = self.nodes[0]
|
||||||
|
chunk_data = item * 256 * size
|
||||||
|
submissions, data_root = create_submission(chunk_data)
|
||||||
|
self.contract.submit(submissions)
|
||||||
|
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||||
|
|
||||||
|
segment = submit_data(client, chunk_data)
|
||||||
|
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
|
def assert_flow_status(self, expected_length):
|
||||||
|
contract_root = self.contract.get_flow_root().hex()
|
||||||
|
contract_length = self.contract.get_flow_length()
|
||||||
|
(node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
|
||||||
|
|
||||||
|
assert_equal(contract_length, node_length)
|
||||||
|
assert_equal(contract_length, expected_length)
|
||||||
|
assert_equal(contract_root, node_root[2:])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
self.assert_flow_status(1)
|
||||||
|
|
||||||
|
self.submit_data(b"\x11", 1)
|
||||||
|
self.assert_flow_status(2)
|
||||||
|
|
||||||
|
self.submit_data(b"\x11", 8 + 4 + 2)
|
||||||
|
self.assert_flow_status(16 + 4 + 2)
|
||||||
|
|
||||||
|
self.submit_data(b"\x12", 128 + 64)
|
||||||
|
self.assert_flow_status(256 + 64)
|
||||||
|
|
||||||
|
self.submit_data(b"\x13", 512 + 256)
|
||||||
|
self.assert_flow_status(1024 + 512 + 256)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
RootConsistencyTest().main()
|
@ -92,6 +92,11 @@ class FlowContractProxy(ContractProxy):
|
|||||||
def get_mine_context(self, node_idx=0):
|
def get_mine_context(self, node_idx=0):
|
||||||
return self._call("makeContextWithResult", node_idx)
|
return self._call("makeContextWithResult", node_idx)
|
||||||
|
|
||||||
|
def get_flow_root(self, node_idx=0):
|
||||||
|
return self._call("computeFlowRoot", node_idx)
|
||||||
|
|
||||||
|
def get_flow_length(self, node_idx=0):
|
||||||
|
return self._call("tree", node_idx)[0]
|
||||||
|
|
||||||
class MineContractProxy(ContractProxy):
|
class MineContractProxy(ContractProxy):
|
||||||
def last_mined_epoch(self, node_idx=0):
|
def last_mined_epoch(self, node_idx=0):
|
||||||
|
@ -101,6 +101,9 @@ class ZgsNode(TestNode):
|
|||||||
def zgs_get_file_info_by_tx_seq(self, tx_seq):
|
def zgs_get_file_info_by_tx_seq(self, tx_seq):
|
||||||
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
|
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
|
||||||
|
|
||||||
|
def zgs_get_flow_context(self, tx_seq):
|
||||||
|
return self.rpc.zgs_getFlowContext([tx_seq])
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
self.rpc.admin_shutdown()
|
self.rpc.admin_shutdown()
|
||||||
self.wait_until_stopped()
|
self.wait_until_stopped()
|
||||||
|
Loading…
Reference in New Issue
Block a user