Compare commits

..

1 Commits

Author SHA1 Message Date
peilun-conflux
065b436fba
Merge 703d926a23 into e701c8fdbd 2024-10-14 17:52:53 +00:00
10 changed files with 11 additions and 106 deletions

View File

@ -59,12 +59,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}, },
); );
} }
merkle.node_manager.commit();
return merkle; return merkle;
} }
// Reconstruct the whole tree. // Reconstruct the whole tree.
merkle.recompute(0, 0, None); merkle.recompute(0, 0, None);
merkle.node_manager.commit();
// Commit the first version in memory. // Commit the first version in memory.
// TODO(zz): Check when the roots become available. // TODO(zz): Check when the roots become available.
merkle.commit(start_tx_seq); merkle.commit(start_tx_seq);
@ -118,7 +116,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}, },
); );
} }
merkle.node_manager.commit();
merkle merkle
} else { } else {
let mut merkle = Self { let mut merkle = Self {
@ -137,7 +134,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
} }
// Reconstruct the whole tree. // Reconstruct the whole tree.
merkle.recompute(0, 0, None); merkle.recompute(0, 0, None);
merkle.node_manager.commit();
// Commit the first version in memory. // Commit the first version in memory.
merkle.commit(start_tx_seq); merkle.commit(start_tx_seq);
merkle merkle

View File

@ -115,9 +115,8 @@ impl<E: HashElement> NodeManager<E> {
} }
pub fn start_transaction(&mut self) { pub fn start_transaction(&mut self) {
if self.db_tx.is_some() { if self.db_tx.is_none() {
error!("start new tx before commit"); error!("start new tx before commit");
panic!("start new tx before commit");
} }
self.db_tx = Some(self.db.start_transaction()); self.db_tx = Some(self.db.start_transaction());
} }

View File

@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot}; use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
use storage::{config::ShardConfig, H256}; use storage::config::ShardConfig;
#[rpc(server, client, namespace = "zgs")] #[rpc(server, client, namespace = "zgs")]
pub trait Rpc { pub trait Rpc {
@ -77,7 +77,4 @@ pub trait Rpc {
sector_index: u64, sector_index: u64,
flow_root: Option<DataRoot>, flow_root: Option<DataRoot>,
) -> RpcResult<FlowProof>; ) -> RpcResult<FlowProof>;
#[method(name = "getFlowContext")]
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
} }

View File

@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE}; use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result}; use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig; use storage::config::ShardConfig;
use storage::{try_option, H256}; use storage::try_option;
pub struct RpcServerImpl { pub struct RpcServerImpl {
pub ctx: Context, pub ctx: Context,
@ -198,10 +198,6 @@ impl RpcServer for RpcServerImpl {
assert_eq!(proof.left_proof, proof.right_proof); assert_eq!(proof.left_proof, proof.right_proof);
Ok(proof.right_proof) Ok(proof.right_proof)
} }
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
Ok(self.ctx.log_store.get_context().await?)
}
} }
impl RpcServerImpl { impl RpcServerImpl {

View File

@ -651,12 +651,8 @@ impl LogManager {
.get_tx_by_seq_number(last_tx_seq)? .get_tx_by_seq_number(last_tx_seq)?
.expect("tx missing"); .expect("tx missing");
let current_len = pora_chunks_merkle.leaves(); let current_len = pora_chunks_merkle.leaves();
let expected_len = sector_to_segment( let expected_len =
last_tx.start_entry_index sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
+ last_tx.num_entries() as u64
+ PORA_CHUNK_SIZE as u64
- 1,
);
match expected_len.cmp(&(current_len)) { match expected_len.cmp(&(current_len)) {
Ordering::Less => { Ordering::Less => {
bail!( bail!(
@ -685,8 +681,6 @@ impl LogManager {
); );
if current_len > expected_len { if current_len > expected_len {
pora_chunks_merkle.revert_to_leaves(expected_len)?; pora_chunks_merkle.revert_to_leaves(expected_len)?;
} else {
assert_eq!(current_len, expected_len);
} }
start_tx_seq = Some(previous_tx.seq); start_tx_seq = Some(previous_tx.seq);
}; };
@ -697,20 +691,10 @@ impl LogManager {
let last_chunk_merkle = match start_tx_seq { let last_chunk_merkle = match start_tx_seq {
Some(tx_seq) => { Some(tx_seq) => {
let tx = tx_store.get_tx_by_seq_number(tx_seq)?.expect("tx missing"); tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
if (tx.start_entry_index() + tx.num_entries() as u64) % PORA_CHUNK_SIZE as u64 == 0
{
// The last chunk should be aligned, so it's empty.
Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None)
} else {
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves() - 1, tx_seq)?
}
} }
// Initialize // Initialize
None => { None => Merkle::new_with_depth(vec![], 1, None),
pora_chunks_merkle.reset();
Merkle::new_with_depth(vec![], 1, None)
}
}; };
debug!( debug!(
@ -720,10 +704,10 @@ impl LogManager {
last_chunk_merkle.leaves(), last_chunk_merkle.leaves(),
); );
if last_chunk_merkle.leaves() != 0 { if last_chunk_merkle.leaves() != 0 {
pora_chunks_merkle.update_last(last_chunk_merkle.root()); pora_chunks_merkle.append(last_chunk_merkle.root());
}
// update the merkle root // update the merkle root
pora_chunks_merkle.commit(start_tx_seq); pora_chunks_merkle.commit(start_tx_seq);
}
let merkle = RwLock::new(MerkleManager { let merkle = RwLock::new(MerkleManager {
pora_chunks_merkle, pora_chunks_merkle,
last_chunk_merkle, last_chunk_merkle,
@ -779,8 +763,7 @@ impl LogManager {
.unwrap(); .unwrap();
} }
std::result::Result::Err(_) => { std::result::Result::Err(_) => {
debug!("Log manager inner channel closed"); error!("Receiver error");
break;
} }
}; };
} }

View File

@ -292,9 +292,6 @@ impl TransactionStore {
match tx.start_entry_index.cmp(&last_chunk_start_index) { match tx.start_entry_index.cmp(&last_chunk_start_index) {
cmp::Ordering::Greater => { cmp::Ordering::Greater => {
tx_list.push((tx_seq, tx.merkle_nodes)); tx_list.push((tx_seq, tx.merkle_nodes));
if tx.start_entry_index >= last_chunk_start_index + PORA_CHUNK_SIZE as u64 {
break;
}
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
tx_list.push((tx_seq, tx.merkle_nodes)); tx_list.push((tx_seq, tx.merkle_nodes));

0
tests/crash_test.py Executable file → Normal file
View File

View File

@ -1,55 +0,0 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
from utility.submission import create_submission, submit_data
from utility.utils import wait_until, assert_equal
from test_framework.blockchain_node import BlockChainNodeType
class RootConsistencyTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 1
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
chunk_data = item * 256 * size
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
def assert_flow_status(self, expected_length):
contract_root = self.contract.get_flow_root().hex()
contract_length = self.contract.get_flow_length()
(node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
assert_equal(contract_length, node_length)
assert_equal(contract_length, expected_length)
assert_equal(contract_root, node_root[2:])
def run_test(self):
self.assert_flow_status(1)
self.submit_data(b"\x11", 1)
self.assert_flow_status(2)
self.submit_data(b"\x11", 8 + 4 + 2)
self.assert_flow_status(16 + 4 + 2)
self.submit_data(b"\x12", 128 + 64)
self.assert_flow_status(256 + 64)
self.submit_data(b"\x13", 512 + 256)
self.assert_flow_status(1024 + 512 + 256)
if __name__ == "__main__":
RootConsistencyTest().main()

View File

@ -92,11 +92,6 @@ class FlowContractProxy(ContractProxy):
def get_mine_context(self, node_idx=0): def get_mine_context(self, node_idx=0):
return self._call("makeContextWithResult", node_idx) return self._call("makeContextWithResult", node_idx)
def get_flow_root(self, node_idx=0):
return self._call("computeFlowRoot", node_idx)
def get_flow_length(self, node_idx=0):
return self._call("tree", node_idx)[0]
class MineContractProxy(ContractProxy): class MineContractProxy(ContractProxy):
def last_mined_epoch(self, node_idx=0): def last_mined_epoch(self, node_idx=0):

View File

@ -101,9 +101,6 @@ class ZgsNode(TestNode):
def zgs_get_file_info_by_tx_seq(self, tx_seq): def zgs_get_file_info_by_tx_seq(self, tx_seq):
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq]) return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
def zgs_get_flow_context(self, tx_seq):
return self.rpc.zgs_getFlowContext([tx_seq])
def shutdown(self): def shutdown(self):
self.rpc.admin_shutdown() self.rpc.admin_shutdown()
self.wait_until_stopped() self.wait_until_stopped()