Compare commits

..

3 Commits

Author SHA1 Message Date
peilun-conflux
40104de891
Return the first finalized tx by data root if possible. (#278)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Only use tx seq for tx status.

* Return the first finalized tx by data root if possible.

This index is used for upload/download segments and file status check.
In all the cases, if there is a finalized transaction, we should use it.
2024-11-19 11:59:50 +08:00
0g-peterzhb
0da3c374db
add snapshot test (#276)
* add snapshot test
2024-11-19 11:18:58 +08:00
Bo QIU
2a24bbde18
Fix protocol version issue (#277) 2024-11-19 09:56:59 +08:00
12 changed files with 79 additions and 51 deletions

View File

@ -96,7 +96,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
/// Defines the current P2P protocol version. /// Defines the current P2P protocol version.
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey. /// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
/// - v2: Publish NewFile to neighbors only and announce file via RPC message. /// - v2: Publish NewFile to neighbors only and announce file via RPC message.
pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0]; pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 0];
pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 0];
/// Application level requests sent to the network. /// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]

View File

@ -246,12 +246,7 @@ impl RpcServerImpl {
} }
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> { async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
let (finalized, pruned) = match self let (finalized, pruned) = match self.ctx.log_store.get_store().get_tx_status(tx.seq)? {
.ctx
.log_store
.get_store()
.get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))?
{
Some(TxStatus::Finalized) => (true, false), Some(TxStatus::Finalized) => (true, false),
Some(TxStatus::Pruned) => (false, true), Some(TxStatus::Pruned) => (false, true),
None => (false, false), None => (false, false),

View File

@ -39,13 +39,18 @@ impl ZgsConfig {
.await .await
.map_err(|e| format!("Unable to get chain id: {:?}", e))? .map_err(|e| format!("Unable to get chain id: {:?}", e))?
.as_u64(); .as_u64();
let network_protocol_version = if self.sync.neighbors_only {
network::PROTOCOL_VERSION_V2
} else {
network::PROTOCOL_VERSION_V1
};
let local_network_id = NetworkIdentity { let local_network_id = NetworkIdentity {
chain_id, chain_id,
flow_address, flow_address,
p2p_protocol_version: ProtocolVersion { p2p_protocol_version: ProtocolVersion {
major: network::PROTOCOL_VERSION[0], major: network_protocol_version[0],
minor: network::PROTOCOL_VERSION[1], minor: network_protocol_version[1],
build: network::PROTOCOL_VERSION[2], build: network_protocol_version[2],
}, },
}; };
network_config.network_id = local_network_id.clone(); network_config.network_id = local_network_id.clone();

View File

@ -21,7 +21,6 @@ use rayon::prelude::ParallelSlice;
use shared_types::{ use shared_types::{
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray, bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
TxSeqOrRoot,
}; };
use std::cmp::Ordering; use std::cmp::Ordering;
@ -538,7 +537,15 @@ impl LogStoreRead for LogManager {
} }
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> { fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> {
self.tx_store.get_first_tx_seq_by_data_root(data_root) let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
for tx_seq in &seq_list {
if self.tx_store.check_tx_completed(*tx_seq)? {
// Return the first finalized tx if possible.
return Ok(Some(*tx_seq));
}
}
// No tx is finalized, return the first one.
Ok(seq_list.first().cloned())
} }
fn get_chunk_with_proof_by_tx_and_index( fn get_chunk_with_proof_by_tx_and_index(
@ -582,14 +589,7 @@ impl LogStoreRead for LogManager {
})) }))
} }
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>> { fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
let tx_seq = match tx_seq_or_data_root {
TxSeqOrRoot::TxSeq(v) => v,
TxSeqOrRoot::Root(root) => {
try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?)
}
};
self.tx_store.get_tx_status(tx_seq) self.tx_store.get_tx_status(tx_seq)
} }

View File

@ -4,7 +4,7 @@ use ethereum_types::H256;
use flow_store::PadPair; use flow_store::PadPair;
use shared_types::{ use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
Transaction, TxSeqOrRoot, Transaction,
}; };
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD}; use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
@ -31,8 +31,12 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>; fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
/// Get a transaction by the data root of its data. /// Get a transaction by the data root of its data.
/// If all txs are not finalized, return the first one.
/// Otherwise, return the first finalized tx.
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>; fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>;
/// If all txs are not finalized, return the first one.
/// Otherwise, return the first finalized tx.
fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> { fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
match self.get_tx_seq_by_data_root(data_root)? { match self.get_tx_seq_by_data_root(data_root)? {
Some(seq) => self.get_tx_by_seq_number(seq), Some(seq) => self.get_tx_by_seq_number(seq),
@ -58,7 +62,7 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>; fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>>; fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>>;
fn next_tx_seq(&self) -> u64; fn next_tx_seq(&self) -> u64;

View File

@ -181,14 +181,6 @@ impl TransactionStore {
Ok(Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?) Ok(Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?)
} }
pub fn get_first_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>> {
let value = try_option!(self
.kvdb
.get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())?);
let seq_list = Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?;
Ok(seq_list.first().cloned())
}
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self.kvdb.put( Ok(self.kvdb.put(

View File

@ -1,7 +1,6 @@
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender}; use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use shared_types::TxSeqOrRoot;
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration}; use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
use storage_async::Store; use storage_async::Store;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -86,11 +85,7 @@ impl Batcher {
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> { async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
// file already finalized or even pruned // file already finalized or even pruned
if let Some(tx_status) = self if let Some(tx_status) = self.store.get_store().get_tx_status(tx_seq)? {
.store
.get_store()
.get_tx_status(TxSeqOrRoot::TxSeq(tx_seq))?
{
let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await; let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await;
if num_terminated > 0 { if num_terminated > 0 {
info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db"); info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db");

View File

@ -87,12 +87,7 @@ impl HistoricalTxWriter {
} }
// write tx in sync store if not finalized or pruned // write tx in sync store if not finalized or pruned
if self if self.store.get_store().get_tx_status(next_tx_seq)?.is_none() {
.store
.get_store()
.get_tx_status(shared_types::TxSeqOrRoot::TxSeq(next_tx_seq))?
.is_none()
{
self.sync_store.insert(next_tx_seq, Queue::Ready).await?; self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
} }

View File

@ -1,10 +1,10 @@
jsonrpcclient==4.0.3 jsonrpcclient==4.0.3
pyyaml==6.0.1 pyyaml==6.0.1
pysha3==1.0.2 safe-pysha3==1.0.4
coincurve==18.0.0 coincurve==20.0.0
eth-utils==3.0.0 eth-utils==5.1.0
py-ecc==7.0.0 py-ecc==7.0.0
web3==6.14.0 web3==7.5.0
eth_tester eth_tester
cffi==1.16.0 cffi==1.16.0
rtoml==0.10.0 rtoml==0.11.0

41
tests/snapshot_test.py Normal file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
import os
import shutil
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class SnapshotTask(TestFramework):
def setup_params(self):
self.num_nodes = 2
# Enable random auto sync only
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True,
"max_sequential_workers": 3,
"max_random_workers": 3,
"neighbors_only": True,
}
}
def run_test(self):
# Submit and upload files on node 0
data_root_1 = self.__upload_file__(0, 256 * 1024)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
# Start the last node to verify historical file sync
self.nodes[1].shutdown()
shutil.rmtree(os.path.join(self.nodes[1].data_dir, 'db/data_db'))
self.start_storage_node(1)
self.nodes[1].wait_for_rpc_connection()
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
if __name__ == "__main__":
SnapshotTask().main()

View File

@ -4,7 +4,7 @@ import tempfile
import time import time
from web3 import Web3, HTTPProvider from web3 import Web3, HTTPProvider
from web3.middleware import construct_sign_and_send_raw_middleware from web3.middleware import SignAndSendRawMiddlewareBuilder
from enum import Enum, unique from enum import Enum, unique
from config.node_config import ( from config.node_config import (
GENESIS_PRIV_KEY, GENESIS_PRIV_KEY,
@ -262,7 +262,7 @@ class BlockchainNode(TestNode):
account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY) account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1) account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
w3.middleware_onion.add( w3.middleware_onion.add(
construct_sign_and_send_raw_middleware([account1, account2]) SignAndSendRawMiddlewareBuilder.build([account1, account2])
) )
# account = w3.eth.account.from_key(GENESIS_PRIV_KEY1) # account = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
# w3.middleware_onion.add(construct_sign_and_send_raw_middleware(account)) # w3.middleware_onion.add(construct_sign_and_send_raw_middleware(account))
@ -365,7 +365,7 @@ class BlockchainNode(TestNode):
account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY) account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1) account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
w3.middleware_onion.add( w3.middleware_onion.add(
construct_sign_and_send_raw_middleware([account1, account2]) SignAndSendRawMiddlewareBuilder.build([account1, account2])
) )
contract_interface = load_contract_metadata(self.contract_path, "Flow") contract_interface = load_contract_metadata(self.contract_path, "Flow")

View File

@ -44,7 +44,7 @@ class TestFramework:
self.num_blockchain_nodes = 1 self.num_blockchain_nodes = 1
self.num_nodes = 1 self.num_nodes = 1
self.blockchain_nodes = [] self.blockchain_nodes = []
self.nodes = [] self.nodes: list[ZgsNode] = []
self.contract = None self.contract = None
self.blockchain_node_configs = {} self.blockchain_node_configs = {}
self.zgs_node_configs = {} self.zgs_node_configs = {}