Compare commits

...

12 Commits

Author SHA1 Message Date
0xroy
2a5a230f21
Merge fd9c033176 into 8f17a7ad72 2024-10-23 08:53:44 -07:00
Bo QIU
8f17a7ad72
Fix metrics config deser (#245)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-22 14:14:33 +08:00
Joel Liu
2947cb7ac6
Optimizing recover perf by reducing sync progress events (#244)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Optimizing recover perf by reducing sync progress events

* add log

* add log
2024-10-21 21:24:50 +08:00
peilun-conflux
39efb721c5
Remove sender from contract call. (#242)
This allows the RPC services to cache the results.
2024-10-21 16:58:50 +08:00
Joel Liu
9fe5a2c18b
Stop recovery when sending via the channel fails (#240)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-18 16:08:35 +08:00
MiniFrenchBread
80b4d63cba
fix: error handling (#235)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-15 19:27:22 +08:00
bruno-valante
b2a70501c2
Test flow root consistency (#230) 2024-10-15 14:24:56 +08:00
Bo QIU
e701c8fdbd
Supports custom public ip to announce file (#233)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Supports custom public ip to announce file

* Fix comment
2024-10-14 14:57:42 +08:00
0g-peterzhb
a4b02a21b7
add retry (#232) 2024-10-14 14:19:05 +08:00
MiniFrenchBread
3fc1543fb4
chore: update abi (#234)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-10-14 12:38:13 +08:00
bruno-valante
82fd29968b
Support shard in case the mining is not enabled (#231)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-12 17:03:47 +08:00
peilun-conflux
45fa344564
Check the local flow root against the contract state. (#229)
* Check the local flow root against the contract state.

* Check zero contract root.

* Fix wrong root before the first segment.

* Update contracts.

* Fix proof insertion.
2024-10-12 16:50:31 +08:00
34 changed files with 325 additions and 87 deletions

3
Cargo.lock generated
View File

@ -4715,9 +4715,10 @@ dependencies = [
[[package]]
name = "metrics"
version = "0.1.0"
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=992ebc5483d937c8f6b883e266f8ed2a67a7fa9a#992ebc5483d937c8f6b883e266f8ed2a67a7fa9a"
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603#c4734e337c66d38e6396742cd5117b596e8d2603"
dependencies = [
"chrono",
"duration-str",
"futures",
"influx_db_client",
"lazy_static",

View File

@ -28,7 +28,7 @@ members = [
resolver = "2"
[workspace.dependencies]
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" }
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "c4734e337c66d38e6396742cd5117b596e8d2603" }
[patch.crates-io]
discv5 = { path = "version-meld/discv5" }

View File

@ -226,18 +226,18 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
&mut self,
proof: RangeProof<E>,
) -> Result<Vec<(usize, usize, E)>> {
self.fill_with_proof(
proof
.left_proof
.proof_nodes_in_tree()
.split_off(self.leaf_height),
)?;
self.fill_with_proof(
proof
.right_proof
.proof_nodes_in_tree()
.split_off(self.leaf_height),
)
let mut updated_nodes = Vec::new();
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
if left_nodes.len() >= self.leaf_height {
updated_nodes
.append(&mut self.fill_with_proof(left_nodes.split_off(self.leaf_height))?);
}
let mut right_nodes = proof.right_proof.proof_nodes_in_tree();
if right_nodes.len() >= self.leaf_height {
updated_nodes
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
}
Ok(updated_nodes)
}
pub fn fill_with_file_proof(

View File

@ -9,5 +9,5 @@ exit-future = "0.2.0"
futures = "0.3.21"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../lighthouse_metrics" }
tokio = { version = "1.19.2", features = ["rt"] }
tokio = { version = "1.38.0", features = ["full"] }
tracing = "0.1.35"

View File

@ -222,7 +222,7 @@ impl LogEntryFetcher {
) -> UnboundedReceiver<LogFetchProgress> {
let provider = self.provider.clone();
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, provider.clone());
let contract = self.flow_contract();
let log_page_size = self.log_page_size;
executor.spawn(
@ -236,22 +236,29 @@ impl LogEntryFetcher {
.filter;
let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
.with_page_size(log_page_size);
debug!(
info!(
"start_recover starts, start={} end={}",
start_block_number, end_block_number
);
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await {
match maybe_log {
Ok(log) => {
let sync_progress =
if log.block_hash.is_some() && log.block_number.is_some() {
let synced_block = LogFetchProgress::SyncedBlock((
log.block_number.unwrap().as_u64(),
log.block_hash.unwrap(),
None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
if block_hash_sent != log.block_hash
|| block_number_sent != log.block_number
{
let synced_block = LogFetchProgress::SyncedBlock((
log.block_number.unwrap().as_u64(),
log.block_hash.unwrap(),
None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
} else {
None
}
} else {
None
};
@ -268,11 +275,17 @@ impl LogEntryFetcher {
log.block_number.expect("block number exist").as_u64(),
))
.and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b),
Some(b) => {
recover_tx.send(b)?;
block_hash_sent = log.block_hash;
block_number_sent = log.block_number;
Ok(())
}
None => Ok(()),
})
{
error!("send error: e={:?}", e);
break;
}
}
Err(e) => {
@ -289,6 +302,8 @@ impl LogEntryFetcher {
}
}
}
info!("log recover end");
},
"log recover",
);
@ -305,7 +320,7 @@ impl LogEntryFetcher {
mut watch_progress_rx: UnboundedReceiver<u64>,
) -> UnboundedReceiver<LogFetchProgress> {
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
let contract = self.flow_contract();
let provider = self.provider.clone();
let confirmation_delay = self.confirmation_delay;
let log_page_size = self.log_page_size;
@ -583,6 +598,10 @@ impl LogEntryFetcher {
pub fn provider(&self) -> &Provider<RetryClient<Http>> {
self.provider.as_ref()
}
pub fn flow_contract(&self) -> ZgsFlow<Provider<RetryClient<Http>>> {
ZgsFlow::new(self.contract_address, self.provider.clone())
}
}
async fn check_watch_process(
@ -658,17 +677,24 @@ async fn check_watch_process(
"get block hash for block {} from RPC, assume there is no org",
*progress - 1
);
match provider.get_block(*progress - 1).await {
Ok(Some(v)) => {
break v.hash.expect("parent block hash expect exist");
let hash = loop {
match provider.get_block(*progress - 1).await {
Ok(Some(v)) => {
break v.hash.expect("parent block hash expect exist");
}
Ok(None) => {
panic!("parent block {} expect exist", *progress - 1);
}
Err(e) => {
if e.to_string().contains("server is too busy") {
warn!("server busy, wait for parent block {}", *progress - 1);
} else {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
}
Ok(None) => {
panic!("parent block {} expect exist", *progress - 1);
}
Err(e) => {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
};
break hash;
}
};
}

View File

@ -510,6 +510,41 @@ impl LogSyncManager {
}
self.data_cache.garbage_collect(self.next_tx_seq);
self.next_tx_seq += 1;
// Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
}
}
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
}
}
true
}
}

View File

@ -67,8 +67,8 @@ impl MinerConfig {
})
}
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = Arc::new(Provider::new(
pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
Ok(Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(self.rate_limit_retries)
.timeout_retries(self.timeout_retries)
@ -78,7 +78,11 @@ impl MinerConfig {
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
Box::new(HttpRateLimitRetryPolicy),
),
));
)))
}
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = self.make_provider()?;
let chain_id = provider
.get_chainid()
.await

View File

@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};
use ethereum_types::H256;
use ethers::prelude::{Http, Provider, RetryClient};
use tokio::time::{sleep, Duration, Instant};
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
@ -12,14 +13,14 @@ use storage_async::Store;
use task_executor::TaskExecutor;
use zgs_spec::SECTORS_PER_SEAL;
use crate::config::{MineServiceMiddleware, MinerConfig};
use crate::config::MinerConfig;
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
store: Arc<Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64,
@ -29,7 +30,7 @@ pub struct Sealer {
impl Sealer {
pub fn spawn(
executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
store: Arc<Store>,
config: &MinerConfig,
miner_id: H256,

View File

@ -33,11 +33,13 @@ impl MineService {
config: MinerConfig,
store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?);
let provider = config.make_provider()?;
let signing_provider = Arc::new(config.make_signing_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
let miner_id =
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn(
@ -61,6 +63,7 @@ impl MineService {
mine_answer_receiver,
mine_context_receiver,
provider.clone(),
signing_provider,
store.clone(),
&config,
);

View File

@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256;
use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::PendingTransaction;
use hex::ToHex;
use shared_types::FlowRangeProof;
@ -24,7 +25,7 @@ pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>,
store: Arc<Store>,
}
@ -34,11 +35,12 @@ impl Submitter {
executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
signing_provider: Arc<MineServiceMiddleware>,
store: Arc<Store>,
config: &MinerConfig,
) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider);
let default_gas_limit = config.submission_gas;

View File

@ -14,13 +14,13 @@ use tokio::{
try_join,
};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
use ethers::prelude::{Http, RetryClient};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::{ops::DerefMut, str::FromStr};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
pub type MineContextMessage = Option<PoraPuzzle>;
lazy_static! {
@ -29,9 +29,9 @@ lazy_static! {
}
pub struct MineContextWatcher {
provider: Arc<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
mine_contract: PoraMine<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
mine_contract: PoraMine<Provider<RetryClient<Http>>>,
mine_context_sender: broadcast::Sender<MineContextMessage>,
last_report: MineContextMessage,
@ -44,7 +44,7 @@ impl MineContextWatcher {
pub fn spawn(
executor: TaskExecutor,
msg_recv: broadcast::Receiver<MinerMessage>,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
config: &MinerConfig,
) -> broadcast::Receiver<MineContextMessage> {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());

View File

@ -10,7 +10,7 @@ mod service;
use duration_str::deserialize_duration;
use network::Multiaddr;
use serde::Deserialize;
use std::time::Duration;
use std::{net::IpAddr, time::Duration};
pub use crate::service::RouterService;
@ -26,6 +26,7 @@ pub struct Config {
pub libp2p_nodes: Vec<Multiaddr>,
pub private_ip_enabled: bool,
pub check_announced_ip: bool,
pub public_address: Option<IpAddr>,
// batcher
/// Timeout to publish messages in batch
@ -47,6 +48,7 @@ impl Default for Config {
libp2p_nodes: vec![],
private_ip_enabled: false,
check_announced_ip: false,
public_address: None,
batcher_timeout: Duration::from_secs(1),
batcher_file_capacity: 1,

View File

@ -348,17 +348,26 @@ impl Libp2pEventHandler {
}
}
async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> {
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
// public address configured
if let Some(ip) = self.config.public_address {
let mut addr = Multiaddr::empty();
addr.push(ip.into());
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
return Some(addr);
}
// public listen address
if let Some(addr) = self.get_listen_addr() {
return Some(addr);
}
// auto detect public IP address
let ipv4_addr = public_ip::addr_v4().await?;
let mut addr = Multiaddr::empty();
addr.push(Protocol::Ip4(ipv4_addr));
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
self.network_globals
.listen_multiaddrs
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?;
let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now();
let shard_config = self.store.get_store().get_shard_config();
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
shard_config: ShardConfig,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?;
let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now();
let msg = AnnounceShardConfig {
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
index_end: u64,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?;
let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now();
let msg = AnnounceChunks {

View File

@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
use storage::config::ShardConfig;
use storage::{config::ShardConfig, H256};
#[rpc(server, client, namespace = "zgs")]
pub trait Rpc {
@ -77,4 +77,7 @@ pub trait Rpc {
sector_index: u64,
flow_root: Option<DataRoot>,
) -> RpcResult<FlowProof>;
#[method(name = "getFlowContext")]
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
}

View File

@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig;
use storage::try_option;
use storage::{try_option, H256};
pub struct RpcServerImpl {
pub ctx: Context,
@ -198,6 +198,10 @@ impl RpcServer for RpcServerImpl {
assert_eq!(proof.left_proof, proof.right_proof);
Ok(proof.right_proof)
}
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
Ok(self.ctx.log_store.get_context().await?)
}
}
impl RpcServerImpl {

View File

@ -2,7 +2,7 @@ use super::{Client, RuntimeContext};
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage};
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
use network::{
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService,
@ -216,6 +216,16 @@ impl ClientBuilder {
Ok(self)
}
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
self.async_store
.as_ref()
.unwrap()
.update_shard_config(config)
.await;
Ok(self)
}
/// Starts the networking stack.
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
let executor = require!("router", self, runtime_context).clone().executor;

View File

@ -200,6 +200,13 @@ impl ZgsConfig {
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
let mut router_config = self.router.clone();
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
if router_config.public_address.is_none() {
if let Some(addr) = &self.network_enr_address {
router_config.public_address = Some(addr.parse().unwrap());
}
}
Ok(router_config)
}
@ -228,7 +235,7 @@ impl ZgsConfig {
}
}
fn shard_config(&self) -> Result<ShardConfig, String> {
pub fn shard_config(&self) -> Result<ShardConfig, String> {
self.shard_position.clone().try_into()
}
}

View File

@ -17,6 +17,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
let miner_config = config.mine_config()?;
let router_config = config.router_config(&network_config)?;
let pruner_config = config.pruner_config()?;
let shard_config = config.shard_config()?;
ClientBuilder::default()
.with_runtime_context(context)
@ -30,6 +31,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await?
.with_miner(miner_config)
.await?
.with_shard(shard_config)
.await?
.with_pruner(pruner_config)
.await?
.with_rpc(config.rpc, config.chunk_pool_config()?)

View File

@ -29,7 +29,7 @@ itertools = "0.13.0"
serde = { version = "1.0.197", features = ["derive"] }
parking_lot = "0.12.3"
serde_json = "1.0.127"
tokio = { version = "1.10.0", features = ["sync"] }
tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" }
[dev-dependencies]

View File

@ -712,7 +712,7 @@ impl LogManager {
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
}
// Initialize
None => Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None),
None => Merkle::new_with_depth(vec![], 1, None),
};
debug!(
@ -761,6 +761,10 @@ impl LogManager {
.merkle
.write()
.try_initialize(&log_manager.flow_store)?;
info!(
"Log manager initialized, state={:?}",
log_manager.get_context()?
);
Ok(log_manager)
}
@ -788,7 +792,8 @@ impl LogManager {
.unwrap();
}
std::result::Result::Err(_) => {
error!("Receiver error");
debug!("Log manager inner channel closed");
break;
}
};
}

View File

@ -335,11 +335,7 @@ impl TransactionStore {
}
let mut merkle = if last_chunk_start_index == 0 {
// The first entry hash is initialized as zero.
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
vec![H256::zero()],
log2_pow2(PORA_CHUNK_SIZE) + 1,
None,
)
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
} else {
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
vec![],

View File

@ -324,7 +324,7 @@ auto_sync_enabled = true
# enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = ""
# report_interval = "10s"
# File name to output metrics periodically.
# file_report_output = ""

View File

@ -336,7 +336,7 @@ auto_sync_enabled = true
# enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = ""
# report_interval = "10s"
# File name to output metrics periodically.
# file_report_output = ""

View File

@ -338,7 +338,7 @@
# enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = ""
# report_interval = "10s"
# File name to output metrics periodically.
# file_report_output = ""

View File

@ -1 +1 @@
75c251804a29ab22adced50d92478cf0baf834bc
bea58429e436e4952ae69235d9079cfc4ac5f3b3

View File

@ -40,8 +40,8 @@
"type": "function"
}
],
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"linkReferences": {},
"deployedLinkReferences": {}
}

View File

@ -70,8 +70,8 @@
"type": "function"
}
],
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"linkReferences": {},
"deployedLinkReferences": {}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

55
tests/root_consistency_test.py Executable file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
from utility.submission import create_submission, submit_data
from utility.utils import wait_until, assert_equal
from test_framework.blockchain_node import BlockChainNodeType
class RootConsistencyTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 1
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
chunk_data = item * 256 * size
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
def assert_flow_status(self, expected_length):
contract_root = self.contract.get_flow_root().hex()
contract_length = self.contract.get_flow_length()
(node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
assert_equal(contract_length, node_length)
assert_equal(contract_length, expected_length)
assert_equal(contract_root, node_root[2:])
def run_test(self):
self.assert_flow_status(1)
self.submit_data(b"\x11", 1)
self.assert_flow_status(2)
self.submit_data(b"\x11", 8 + 4 + 2)
self.assert_flow_status(16 + 4 + 2)
self.submit_data(b"\x12", 128 + 64)
self.assert_flow_status(256 + 64)
self.submit_data(b"\x13", 512 + 256)
self.assert_flow_status(1024 + 512 + 256)
if __name__ == "__main__":
RootConsistencyTest().main()

View File

@ -91,7 +91,12 @@ class FlowContractProxy(ContractProxy):
def get_mine_context(self, node_idx=0):
return self._call("makeContextWithResult", node_idx)
def get_flow_root(self, node_idx=0):
return self._call("computeFlowRoot", node_idx)
def get_flow_length(self, node_idx=0):
return self._call("tree", node_idx)[0]
class MineContractProxy(ContractProxy):
def last_mined_epoch(self, node_idx=0):

View File

@ -100,6 +100,9 @@ class ZgsNode(TestNode):
def zgs_get_file_info_by_tx_seq(self, tx_seq):
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
def zgs_get_flow_context(self, tx_seq):
return self.rpc.zgs_getFlowContext([tx_seq])
def shutdown(self):
self.rpc.admin_shutdown()