Compare commits

...

12 Commits

Author SHA1 Message Date
0xroy
2a5a230f21
Merge fd9c033176 into 8f17a7ad72 2024-10-23 08:53:44 -07:00
Bo QIU
8f17a7ad72
Fix metrics config deser (#245)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-22 14:14:33 +08:00
Joel Liu
2947cb7ac6
Optimizing recover perf by reducing sync progress events (#244)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Optimizing recover perf by reducing sync progress events

* add log

* add log
2024-10-21 21:24:50 +08:00
peilun-conflux
39efb721c5
Remove sender from contract call. (#242)
This allows the RPC services to cache the results.
2024-10-21 16:58:50 +08:00
Joel Liu
9fe5a2c18b
Stop recovery when sending via the channel fails (#240)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-18 16:08:35 +08:00
MiniFrenchBread
80b4d63cba
fix: error handling (#235)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-15 19:27:22 +08:00
bruno-valante
b2a70501c2
Test flow root consistency (#230) 2024-10-15 14:24:56 +08:00
Bo QIU
e701c8fdbd
Supports custom public ip to announce file (#233)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Supports custom public ip to announce file

* Fix comment
2024-10-14 14:57:42 +08:00
0g-peterzhb
a4b02a21b7
add retry (#232) 2024-10-14 14:19:05 +08:00
MiniFrenchBread
3fc1543fb4
chore: update abi (#234)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-10-14 12:38:13 +08:00
bruno-valante
82fd29968b
Support shard in case the mining is not enabled (#231)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-10-12 17:03:47 +08:00
peilun-conflux
45fa344564
Check the local flow root against the contract state. (#229)
* Check the local flow root against the contract state.

* Check zero contract root.

* Fix wrong root before the first segment.

* Update contracts.

* Fix proof insertion.
2024-10-12 16:50:31 +08:00
34 changed files with 325 additions and 87 deletions

3
Cargo.lock generated
View File

@ -4715,9 +4715,10 @@ dependencies = [
[[package]] [[package]]
name = "metrics" name = "metrics"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=992ebc5483d937c8f6b883e266f8ed2a67a7fa9a#992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603#c4734e337c66d38e6396742cd5117b596e8d2603"
dependencies = [ dependencies = [
"chrono", "chrono",
"duration-str",
"futures", "futures",
"influx_db_client", "influx_db_client",
"lazy_static", "lazy_static",

View File

@ -28,7 +28,7 @@ members = [
resolver = "2" resolver = "2"
[workspace.dependencies] [workspace.dependencies]
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" } metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "c4734e337c66d38e6396742cd5117b596e8d2603" }
[patch.crates-io] [patch.crates-io]
discv5 = { path = "version-meld/discv5" } discv5 = { path = "version-meld/discv5" }

View File

@ -226,18 +226,18 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
&mut self, &mut self,
proof: RangeProof<E>, proof: RangeProof<E>,
) -> Result<Vec<(usize, usize, E)>> { ) -> Result<Vec<(usize, usize, E)>> {
self.fill_with_proof( let mut updated_nodes = Vec::new();
proof let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
.left_proof if left_nodes.len() >= self.leaf_height {
.proof_nodes_in_tree() updated_nodes
.split_off(self.leaf_height), .append(&mut self.fill_with_proof(left_nodes.split_off(self.leaf_height))?);
)?; }
self.fill_with_proof( let mut right_nodes = proof.right_proof.proof_nodes_in_tree();
proof if right_nodes.len() >= self.leaf_height {
.right_proof updated_nodes
.proof_nodes_in_tree() .append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
.split_off(self.leaf_height), }
) Ok(updated_nodes)
} }
pub fn fill_with_file_proof( pub fn fill_with_file_proof(

View File

@ -9,5 +9,5 @@ exit-future = "0.2.0"
futures = "0.3.21" futures = "0.3.21"
lazy_static = "1.4.0" lazy_static = "1.4.0"
lighthouse_metrics = { path = "../lighthouse_metrics" } lighthouse_metrics = { path = "../lighthouse_metrics" }
tokio = { version = "1.19.2", features = ["rt"] } tokio = { version = "1.38.0", features = ["full"] }
tracing = "0.1.35" tracing = "0.1.35"

View File

@ -222,7 +222,7 @@ impl LogEntryFetcher {
) -> UnboundedReceiver<LogFetchProgress> { ) -> UnboundedReceiver<LogFetchProgress> {
let provider = self.provider.clone(); let provider = self.provider.clone();
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel(); let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, provider.clone()); let contract = self.flow_contract();
let log_page_size = self.log_page_size; let log_page_size = self.log_page_size;
executor.spawn( executor.spawn(
@ -236,22 +236,29 @@ impl LogEntryFetcher {
.filter; .filter;
let mut stream = LogQuery::new(&provider, &filter, log_query_delay) let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
.with_page_size(log_page_size); .with_page_size(log_page_size);
debug!( info!(
"start_recover starts, start={} end={}", "start_recover starts, start={} end={}",
start_block_number, end_block_number start_block_number, end_block_number
); );
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await { while let Some(maybe_log) = stream.next().await {
match maybe_log { match maybe_log {
Ok(log) => { Ok(log) => {
let sync_progress = let sync_progress =
if log.block_hash.is_some() && log.block_number.is_some() { if log.block_hash.is_some() && log.block_number.is_some() {
let synced_block = LogFetchProgress::SyncedBlock(( if block_hash_sent != log.block_hash
log.block_number.unwrap().as_u64(), || block_number_sent != log.block_number
log.block_hash.unwrap(), {
None, let synced_block = LogFetchProgress::SyncedBlock((
)); log.block_number.unwrap().as_u64(),
progress = log.block_number.unwrap().as_u64(); log.block_hash.unwrap(),
Some(synced_block) None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
} else {
None
}
} else { } else {
None None
}; };
@ -268,11 +275,17 @@ impl LogEntryFetcher {
log.block_number.expect("block number exist").as_u64(), log.block_number.expect("block number exist").as_u64(),
)) ))
.and_then(|_| match sync_progress { .and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b), Some(b) => {
recover_tx.send(b)?;
block_hash_sent = log.block_hash;
block_number_sent = log.block_number;
Ok(())
}
None => Ok(()), None => Ok(()),
}) })
{ {
error!("send error: e={:?}", e); error!("send error: e={:?}", e);
break;
} }
} }
Err(e) => { Err(e) => {
@ -289,6 +302,8 @@ impl LogEntryFetcher {
} }
} }
} }
info!("log recover end");
}, },
"log recover", "log recover",
); );
@ -305,7 +320,7 @@ impl LogEntryFetcher {
mut watch_progress_rx: UnboundedReceiver<u64>, mut watch_progress_rx: UnboundedReceiver<u64>,
) -> UnboundedReceiver<LogFetchProgress> { ) -> UnboundedReceiver<LogFetchProgress> {
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel(); let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, self.provider.clone()); let contract = self.flow_contract();
let provider = self.provider.clone(); let provider = self.provider.clone();
let confirmation_delay = self.confirmation_delay; let confirmation_delay = self.confirmation_delay;
let log_page_size = self.log_page_size; let log_page_size = self.log_page_size;
@ -583,6 +598,10 @@ impl LogEntryFetcher {
pub fn provider(&self) -> &Provider<RetryClient<Http>> { pub fn provider(&self) -> &Provider<RetryClient<Http>> {
self.provider.as_ref() self.provider.as_ref()
} }
pub fn flow_contract(&self) -> ZgsFlow<Provider<RetryClient<Http>>> {
ZgsFlow::new(self.contract_address, self.provider.clone())
}
} }
async fn check_watch_process( async fn check_watch_process(
@ -658,17 +677,24 @@ async fn check_watch_process(
"get block hash for block {} from RPC, assume there is no org", "get block hash for block {} from RPC, assume there is no org",
*progress - 1 *progress - 1
); );
match provider.get_block(*progress - 1).await { let hash = loop {
Ok(Some(v)) => { match provider.get_block(*progress - 1).await {
break v.hash.expect("parent block hash expect exist"); Ok(Some(v)) => {
break v.hash.expect("parent block hash expect exist");
}
Ok(None) => {
panic!("parent block {} expect exist", *progress - 1);
}
Err(e) => {
if e.to_string().contains("server is too busy") {
warn!("server busy, wait for parent block {}", *progress - 1);
} else {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
} }
Ok(None) => { };
panic!("parent block {} expect exist", *progress - 1); break hash;
}
Err(e) => {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
} }
}; };
} }

View File

@ -510,6 +510,41 @@ impl LogSyncManager {
} }
self.data_cache.garbage_collect(self.next_tx_seq); self.data_cache.garbage_collect(self.next_tx_seq);
self.next_tx_seq += 1; self.next_tx_seq += 1;
// Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
}
}
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
}
}
true true
} }
} }

View File

@ -67,8 +67,8 @@ impl MinerConfig {
}) })
} }
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> { pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
let provider = Arc::new(Provider::new( Ok(Arc::new(Provider::new(
RetryClientBuilder::default() RetryClientBuilder::default()
.rate_limit_retries(self.rate_limit_retries) .rate_limit_retries(self.rate_limit_retries)
.timeout_retries(self.timeout_retries) .timeout_retries(self.timeout_retries)
@ -78,7 +78,11 @@ impl MinerConfig {
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?, .map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
Box::new(HttpRateLimitRetryPolicy), Box::new(HttpRateLimitRetryPolicy),
), ),
)); )))
}
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = self.make_provider()?;
let chain_id = provider let chain_id = provider
.get_chainid() .get_chainid()
.await .await

View File

@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use ethereum_types::H256; use ethereum_types::H256;
use ethers::prelude::{Http, Provider, RetryClient};
use tokio::time::{sleep, Duration, Instant}; use tokio::time::{sleep, Duration, Instant};
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow}; use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
@ -12,14 +13,14 @@ use storage_async::Store;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use zgs_spec::SECTORS_PER_SEAL; use zgs_spec::SECTORS_PER_SEAL;
use crate::config::{MineServiceMiddleware, MinerConfig}; use crate::config::MinerConfig;
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1; const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5; const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5; const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer { pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
store: Arc<Store>, store: Arc<Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>, context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64, last_context_flow_length: u64,
@ -29,7 +30,7 @@ pub struct Sealer {
impl Sealer { impl Sealer {
pub fn spawn( pub fn spawn(
executor: TaskExecutor, executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
store: Arc<Store>, store: Arc<Store>,
config: &MinerConfig, config: &MinerConfig,
miner_id: H256, miner_id: H256,

View File

@ -33,11 +33,13 @@ impl MineService {
config: MinerConfig, config: MinerConfig,
store: Arc<Store>, store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> { ) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?); let provider = config.make_provider()?;
let signing_provider = Arc::new(config.make_signing_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024); let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?; let miner_id =
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
debug!("miner id setting complete."); debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn( let mine_context_receiver = MineContextWatcher::spawn(
@ -61,6 +63,7 @@ impl MineService {
mine_answer_receiver, mine_answer_receiver,
mine_context_receiver, mine_context_receiver,
provider.clone(), provider.clone(),
signing_provider,
store.clone(), store.clone(),
&config, &config,
); );

View File

@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow}; use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256; use ethereum_types::U256;
use ethers::contract::ContractCall; use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::PendingTransaction; use ethers::providers::PendingTransaction;
use hex::ToHex; use hex::ToHex;
use shared_types::FlowRangeProof; use shared_types::FlowRangeProof;
@ -24,7 +25,7 @@ pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>, mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>, mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>, mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>, default_gas_limit: Option<U256>,
store: Arc<Store>, store: Arc<Store>,
} }
@ -34,11 +35,12 @@ impl Submitter {
executor: TaskExecutor, executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>, mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>, mine_context_receiver: broadcast::Receiver<MineContextMessage>,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
signing_provider: Arc<MineServiceMiddleware>,
store: Arc<Store>, store: Arc<Store>,
config: &MinerConfig, config: &MinerConfig,
) { ) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone()); let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider); let flow_contract = ZgsFlow::new(config.flow_address, provider);
let default_gas_limit = config.submission_gas; let default_gas_limit = config.submission_gas;

View File

@ -14,13 +14,13 @@ use tokio::{
try_join, try_join,
}; };
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
use ethers::prelude::{Http, RetryClient};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{ops::DerefMut, str::FromStr}; use std::{ops::DerefMut, str::FromStr};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
pub type MineContextMessage = Option<PoraPuzzle>; pub type MineContextMessage = Option<PoraPuzzle>;
lazy_static! { lazy_static! {
@ -29,9 +29,9 @@ lazy_static! {
} }
pub struct MineContextWatcher { pub struct MineContextWatcher {
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
mine_contract: PoraMine<MineServiceMiddleware>, mine_contract: PoraMine<Provider<RetryClient<Http>>>,
mine_context_sender: broadcast::Sender<MineContextMessage>, mine_context_sender: broadcast::Sender<MineContextMessage>,
last_report: MineContextMessage, last_report: MineContextMessage,
@ -44,7 +44,7 @@ impl MineContextWatcher {
pub fn spawn( pub fn spawn(
executor: TaskExecutor, executor: TaskExecutor,
msg_recv: broadcast::Receiver<MinerMessage>, msg_recv: broadcast::Receiver<MinerMessage>,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
config: &MinerConfig, config: &MinerConfig,
) -> broadcast::Receiver<MineContextMessage> { ) -> broadcast::Receiver<MineContextMessage> {
let mine_contract = PoraMine::new(config.mine_address, provider.clone()); let mine_contract = PoraMine::new(config.mine_address, provider.clone());

View File

@ -10,7 +10,7 @@ mod service;
use duration_str::deserialize_duration; use duration_str::deserialize_duration;
use network::Multiaddr; use network::Multiaddr;
use serde::Deserialize; use serde::Deserialize;
use std::time::Duration; use std::{net::IpAddr, time::Duration};
pub use crate::service::RouterService; pub use crate::service::RouterService;
@ -26,6 +26,7 @@ pub struct Config {
pub libp2p_nodes: Vec<Multiaddr>, pub libp2p_nodes: Vec<Multiaddr>,
pub private_ip_enabled: bool, pub private_ip_enabled: bool,
pub check_announced_ip: bool, pub check_announced_ip: bool,
pub public_address: Option<IpAddr>,
// batcher // batcher
/// Timeout to publish messages in batch /// Timeout to publish messages in batch
@ -47,6 +48,7 @@ impl Default for Config {
libp2p_nodes: vec![], libp2p_nodes: vec![],
private_ip_enabled: false, private_ip_enabled: false,
check_announced_ip: false, check_announced_ip: false,
public_address: None,
batcher_timeout: Duration::from_secs(1), batcher_timeout: Duration::from_secs(1),
batcher_file_capacity: 1, batcher_file_capacity: 1,

View File

@ -348,17 +348,26 @@ impl Libp2pEventHandler {
} }
} }
async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> { async fn construct_announced_ip(&self) -> Option<Multiaddr> {
// public address configured
if let Some(ip) = self.config.public_address {
let mut addr = Multiaddr::empty();
addr.push(ip.into());
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
return Some(addr);
}
// public listen address
if let Some(addr) = self.get_listen_addr() { if let Some(addr) = self.get_listen_addr() {
return Some(addr); return Some(addr);
} }
// auto detect public IP address
let ipv4_addr = public_ip::addr_v4().await?; let ipv4_addr = public_ip::addr_v4().await?;
let mut addr = Multiaddr::empty(); let mut addr = Multiaddr::empty();
addr.push(Protocol::Ip4(ipv4_addr)); addr.push(Protocol::Ip4(ipv4_addr));
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp())); addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
self.network_globals self.network_globals
.listen_multiaddrs .listen_multiaddrs
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let shard_config = self.store.get_store().get_shard_config(); let shard_config = self.store.get_store().get_shard_config();
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
shard_config: ShardConfig, shard_config: ShardConfig,
) -> Option<PubsubMessage> { ) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let msg = AnnounceShardConfig { let msg = AnnounceShardConfig {
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
index_end: u64, index_end: u64,
) -> Option<PubsubMessage> { ) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let msg = AnnounceChunks { let msg = AnnounceChunks {

View File

@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc; use jsonrpsee::proc_macros::rpc;
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot}; use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
use storage::config::ShardConfig; use storage::{config::ShardConfig, H256};
#[rpc(server, client, namespace = "zgs")] #[rpc(server, client, namespace = "zgs")]
pub trait Rpc { pub trait Rpc {
@ -77,4 +77,7 @@ pub trait Rpc {
sector_index: u64, sector_index: u64,
flow_root: Option<DataRoot>, flow_root: Option<DataRoot>,
) -> RpcResult<FlowProof>; ) -> RpcResult<FlowProof>;
#[method(name = "getFlowContext")]
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
} }

View File

@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE}; use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result}; use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig; use storage::config::ShardConfig;
use storage::try_option; use storage::{try_option, H256};
pub struct RpcServerImpl { pub struct RpcServerImpl {
pub ctx: Context, pub ctx: Context,
@ -198,6 +198,10 @@ impl RpcServer for RpcServerImpl {
assert_eq!(proof.left_proof, proof.right_proof); assert_eq!(proof.left_proof, proof.right_proof);
Ok(proof.right_proof) Ok(proof.right_proof)
} }
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
Ok(self.ctx.log_store.get_context().await?)
}
} }
impl RpcServerImpl { impl RpcServerImpl {

View File

@ -2,7 +2,7 @@ use super::{Client, RuntimeContext};
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool}; use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager}; use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage}; use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
use network::{ use network::{
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId, self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService, Service as LibP2PService,
@ -216,6 +216,16 @@ impl ClientBuilder {
Ok(self) Ok(self)
} }
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
self.async_store
.as_ref()
.unwrap()
.update_shard_config(config)
.await;
Ok(self)
}
/// Starts the networking stack. /// Starts the networking stack.
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> { pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
let executor = require!("router", self, runtime_context).clone().executor; let executor = require!("router", self, runtime_context).clone().executor;

View File

@ -200,6 +200,13 @@ impl ZgsConfig {
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> { pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
let mut router_config = self.router.clone(); let mut router_config = self.router.clone();
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec(); router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
if router_config.public_address.is_none() {
if let Some(addr) = &self.network_enr_address {
router_config.public_address = Some(addr.parse().unwrap());
}
}
Ok(router_config) Ok(router_config)
} }
@ -228,7 +235,7 @@ impl ZgsConfig {
} }
} }
fn shard_config(&self) -> Result<ShardConfig, String> { pub fn shard_config(&self) -> Result<ShardConfig, String> {
self.shard_position.clone().try_into() self.shard_position.clone().try_into()
} }
} }

View File

@ -17,6 +17,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
let miner_config = config.mine_config()?; let miner_config = config.mine_config()?;
let router_config = config.router_config(&network_config)?; let router_config = config.router_config(&network_config)?;
let pruner_config = config.pruner_config()?; let pruner_config = config.pruner_config()?;
let shard_config = config.shard_config()?;
ClientBuilder::default() ClientBuilder::default()
.with_runtime_context(context) .with_runtime_context(context)
@ -30,6 +31,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await? .await?
.with_miner(miner_config) .with_miner(miner_config)
.await? .await?
.with_shard(shard_config)
.await?
.with_pruner(pruner_config) .with_pruner(pruner_config)
.await? .await?
.with_rpc(config.rpc, config.chunk_pool_config()?) .with_rpc(config.rpc, config.chunk_pool_config()?)

View File

@ -29,7 +29,7 @@ itertools = "0.13.0"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
parking_lot = "0.12.3" parking_lot = "0.12.3"
serde_json = "1.0.127" serde_json = "1.0.127"
tokio = { version = "1.10.0", features = ["sync"] } tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
[dev-dependencies] [dev-dependencies]

View File

@ -712,7 +712,7 @@ impl LogManager {
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)? tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
} }
// Initialize // Initialize
None => Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None), None => Merkle::new_with_depth(vec![], 1, None),
}; };
debug!( debug!(
@ -761,6 +761,10 @@ impl LogManager {
.merkle .merkle
.write() .write()
.try_initialize(&log_manager.flow_store)?; .try_initialize(&log_manager.flow_store)?;
info!(
"Log manager initialized, state={:?}",
log_manager.get_context()?
);
Ok(log_manager) Ok(log_manager)
} }
@ -788,7 +792,8 @@ impl LogManager {
.unwrap(); .unwrap();
} }
std::result::Result::Err(_) => { std::result::Result::Err(_) => {
error!("Receiver error"); debug!("Log manager inner channel closed");
break;
} }
}; };
} }

View File

@ -335,11 +335,7 @@ impl TransactionStore {
} }
let mut merkle = if last_chunk_start_index == 0 { let mut merkle = if last_chunk_start_index == 0 {
// The first entry hash is initialized as zero. // The first entry hash is initialized as zero.
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth( AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
vec![H256::zero()],
log2_pow2(PORA_CHUNK_SIZE) + 1,
None,
)
} else { } else {
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth( AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
vec![], vec![],

View File

@ -324,7 +324,7 @@ auto_sync_enabled = true
# enabled = false # enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s". # Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = "" # report_interval = "10s"
# File name to output metrics periodically. # File name to output metrics periodically.
# file_report_output = "" # file_report_output = ""

View File

@ -336,7 +336,7 @@ auto_sync_enabled = true
# enabled = false # enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s". # Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = "" # report_interval = "10s"
# File name to output metrics periodically. # File name to output metrics periodically.
# file_report_output = "" # file_report_output = ""

View File

@ -338,7 +338,7 @@
# enabled = false # enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s". # Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = "" # report_interval = "10s"
# File name to output metrics periodically. # File name to output metrics periodically.
# file_report_output = "" # file_report_output = ""

View File

@ -1 +1 @@
75c251804a29ab22adced50d92478cf0baf834bc bea58429e436e4952ae69235d9079cfc4ac5f3b3

View File

@ -40,8 +40,8 @@
"type": "function" "type": "function"
} }
], ],
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033", "bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033", "deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"linkReferences": {}, "linkReferences": {},
"deployedLinkReferences": {} "deployedLinkReferences": {}
} }

View File

@ -70,8 +70,8 @@
"type": "function" "type": "function"
} }
], ],
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033", "bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033", "deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"linkReferences": {}, "linkReferences": {},
"deployedLinkReferences": {} "deployedLinkReferences": {}
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

55
tests/root_consistency_test.py Executable file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
from utility.submission import create_submission, submit_data
from utility.utils import wait_until, assert_equal
from test_framework.blockchain_node import BlockChainNodeType
class RootConsistencyTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 1
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
chunk_data = item * 256 * size
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
def assert_flow_status(self, expected_length):
contract_root = self.contract.get_flow_root().hex()
contract_length = self.contract.get_flow_length()
(node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
assert_equal(contract_length, node_length)
assert_equal(contract_length, expected_length)
assert_equal(contract_root, node_root[2:])
def run_test(self):
self.assert_flow_status(1)
self.submit_data(b"\x11", 1)
self.assert_flow_status(2)
self.submit_data(b"\x11", 8 + 4 + 2)
self.assert_flow_status(16 + 4 + 2)
self.submit_data(b"\x12", 128 + 64)
self.assert_flow_status(256 + 64)
self.submit_data(b"\x13", 512 + 256)
self.assert_flow_status(1024 + 512 + 256)
if __name__ == "__main__":
RootConsistencyTest().main()

View File

@ -91,7 +91,12 @@ class FlowContractProxy(ContractProxy):
def get_mine_context(self, node_idx=0): def get_mine_context(self, node_idx=0):
return self._call("makeContextWithResult", node_idx) return self._call("makeContextWithResult", node_idx)
def get_flow_root(self, node_idx=0):
return self._call("computeFlowRoot", node_idx)
def get_flow_length(self, node_idx=0):
return self._call("tree", node_idx)[0]
class MineContractProxy(ContractProxy): class MineContractProxy(ContractProxy):
def last_mined_epoch(self, node_idx=0): def last_mined_epoch(self, node_idx=0):

View File

@ -100,6 +100,9 @@ class ZgsNode(TestNode):
def zgs_get_file_info_by_tx_seq(self, tx_seq): def zgs_get_file_info_by_tx_seq(self, tx_seq):
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq]) return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
def zgs_get_flow_context(self, tx_seq):
return self.rpc.zgs_getFlowContext([tx_seq])
def shutdown(self): def shutdown(self):
self.rpc.admin_shutdown() self.rpc.admin_shutdown()