mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
12 Commits
0cd07d8903
...
2a5a230f21
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2a5a230f21 | ||
![]() |
8f17a7ad72 | ||
![]() |
2947cb7ac6 | ||
![]() |
39efb721c5 | ||
![]() |
9fe5a2c18b | ||
![]() |
80b4d63cba | ||
![]() |
b2a70501c2 | ||
![]() |
e701c8fdbd | ||
![]() |
a4b02a21b7 | ||
![]() |
3fc1543fb4 | ||
![]() |
82fd29968b | ||
![]() |
45fa344564 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -4715,9 +4715,10 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "metrics"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=992ebc5483d937c8f6b883e266f8ed2a67a7fa9a#992ebc5483d937c8f6b883e266f8ed2a67a7fa9a"
|
||||
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603#c4734e337c66d38e6396742cd5117b596e8d2603"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"duration-str",
|
||||
"futures",
|
||||
"influx_db_client",
|
||||
"lazy_static",
|
||||
|
@ -28,7 +28,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.dependencies]
|
||||
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" }
|
||||
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "c4734e337c66d38e6396742cd5117b596e8d2603" }
|
||||
|
||||
[patch.crates-io]
|
||||
discv5 = { path = "version-meld/discv5" }
|
||||
|
@ -226,18 +226,18 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
&mut self,
|
||||
proof: RangeProof<E>,
|
||||
) -> Result<Vec<(usize, usize, E)>> {
|
||||
self.fill_with_proof(
|
||||
proof
|
||||
.left_proof
|
||||
.proof_nodes_in_tree()
|
||||
.split_off(self.leaf_height),
|
||||
)?;
|
||||
self.fill_with_proof(
|
||||
proof
|
||||
.right_proof
|
||||
.proof_nodes_in_tree()
|
||||
.split_off(self.leaf_height),
|
||||
)
|
||||
let mut updated_nodes = Vec::new();
|
||||
let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
|
||||
if left_nodes.len() >= self.leaf_height {
|
||||
updated_nodes
|
||||
.append(&mut self.fill_with_proof(left_nodes.split_off(self.leaf_height))?);
|
||||
}
|
||||
let mut right_nodes = proof.right_proof.proof_nodes_in_tree();
|
||||
if right_nodes.len() >= self.leaf_height {
|
||||
updated_nodes
|
||||
.append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
|
||||
}
|
||||
Ok(updated_nodes)
|
||||
}
|
||||
|
||||
pub fn fill_with_file_proof(
|
||||
|
@ -9,5 +9,5 @@ exit-future = "0.2.0"
|
||||
futures = "0.3.21"
|
||||
lazy_static = "1.4.0"
|
||||
lighthouse_metrics = { path = "../lighthouse_metrics" }
|
||||
tokio = { version = "1.19.2", features = ["rt"] }
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
tracing = "0.1.35"
|
||||
|
@ -222,7 +222,7 @@ impl LogEntryFetcher {
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let provider = self.provider.clone();
|
||||
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, provider.clone());
|
||||
let contract = self.flow_contract();
|
||||
let log_page_size = self.log_page_size;
|
||||
|
||||
executor.spawn(
|
||||
@ -236,15 +236,19 @@ impl LogEntryFetcher {
|
||||
.filter;
|
||||
let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
|
||||
.with_page_size(log_page_size);
|
||||
debug!(
|
||||
info!(
|
||||
"start_recover starts, start={} end={}",
|
||||
start_block_number, end_block_number
|
||||
);
|
||||
let (mut block_hash_sent, mut block_number_sent) = (None, None);
|
||||
while let Some(maybe_log) = stream.next().await {
|
||||
match maybe_log {
|
||||
Ok(log) => {
|
||||
let sync_progress =
|
||||
if log.block_hash.is_some() && log.block_number.is_some() {
|
||||
if block_hash_sent != log.block_hash
|
||||
|| block_number_sent != log.block_number
|
||||
{
|
||||
let synced_block = LogFetchProgress::SyncedBlock((
|
||||
log.block_number.unwrap().as_u64(),
|
||||
log.block_hash.unwrap(),
|
||||
@ -254,6 +258,9 @@ impl LogEntryFetcher {
|
||||
Some(synced_block)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
debug!("recover: progress={:?}", sync_progress);
|
||||
|
||||
@ -268,11 +275,17 @@ impl LogEntryFetcher {
|
||||
log.block_number.expect("block number exist").as_u64(),
|
||||
))
|
||||
.and_then(|_| match sync_progress {
|
||||
Some(b) => recover_tx.send(b),
|
||||
Some(b) => {
|
||||
recover_tx.send(b)?;
|
||||
block_hash_sent = log.block_hash;
|
||||
block_number_sent = log.block_number;
|
||||
Ok(())
|
||||
}
|
||||
None => Ok(()),
|
||||
})
|
||||
{
|
||||
error!("send error: e={:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@ -289,6 +302,8 @@ impl LogEntryFetcher {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("log recover end");
|
||||
},
|
||||
"log recover",
|
||||
);
|
||||
@ -305,7 +320,7 @@ impl LogEntryFetcher {
|
||||
mut watch_progress_rx: UnboundedReceiver<u64>,
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
||||
let contract = self.flow_contract();
|
||||
let provider = self.provider.clone();
|
||||
let confirmation_delay = self.confirmation_delay;
|
||||
let log_page_size = self.log_page_size;
|
||||
@ -583,6 +598,10 @@ impl LogEntryFetcher {
|
||||
pub fn provider(&self) -> &Provider<RetryClient<Http>> {
|
||||
self.provider.as_ref()
|
||||
}
|
||||
|
||||
pub fn flow_contract(&self) -> ZgsFlow<Provider<RetryClient<Http>>> {
|
||||
ZgsFlow::new(self.contract_address, self.provider.clone())
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_watch_process(
|
||||
@ -658,6 +677,7 @@ async fn check_watch_process(
|
||||
"get block hash for block {} from RPC, assume there is no org",
|
||||
*progress - 1
|
||||
);
|
||||
let hash = loop {
|
||||
match provider.get_block(*progress - 1).await {
|
||||
Ok(Some(v)) => {
|
||||
break v.hash.expect("parent block hash expect exist");
|
||||
@ -666,11 +686,17 @@ async fn check_watch_process(
|
||||
panic!("parent block {} expect exist", *progress - 1);
|
||||
}
|
||||
Err(e) => {
|
||||
if e.to_string().contains("server is too busy") {
|
||||
warn!("server busy, wait for parent block {}", *progress - 1);
|
||||
} else {
|
||||
panic!("parent block {} expect exist, error {}", *progress - 1, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
break hash;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
progress_reset_history.retain(|k, _| k + 1000 >= *progress);
|
||||
|
@ -510,6 +510,41 @@ impl LogSyncManager {
|
||||
}
|
||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||
self.next_tx_seq += 1;
|
||||
|
||||
// Check if the computed data root matches on-chain state.
|
||||
// If the call fails, we won't check the root here and return `true` directly.
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
@ -67,8 +67,8 @@ impl MinerConfig {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> {
|
||||
let provider = Arc::new(Provider::new(
|
||||
pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
|
||||
Ok(Arc::new(Provider::new(
|
||||
RetryClientBuilder::default()
|
||||
.rate_limit_retries(self.rate_limit_retries)
|
||||
.timeout_retries(self.timeout_retries)
|
||||
@ -78,7 +78,11 @@ impl MinerConfig {
|
||||
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
|
||||
Box::new(HttpRateLimitRetryPolicy),
|
||||
),
|
||||
));
|
||||
)))
|
||||
}
|
||||
|
||||
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
|
||||
let provider = self.make_provider()?;
|
||||
let chain_id = provider
|
||||
.get_chainid()
|
||||
.await
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use ethereum_types::H256;
|
||||
use ethers::prelude::{Http, Provider, RetryClient};
|
||||
use tokio::time::{sleep, Duration, Instant};
|
||||
|
||||
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
|
||||
@ -12,14 +13,14 @@ use storage_async::Store;
|
||||
use task_executor::TaskExecutor;
|
||||
use zgs_spec::SECTORS_PER_SEAL;
|
||||
|
||||
use crate::config::{MineServiceMiddleware, MinerConfig};
|
||||
use crate::config::MinerConfig;
|
||||
|
||||
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
|
||||
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
|
||||
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
|
||||
|
||||
pub struct Sealer {
|
||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||
store: Arc<Store>,
|
||||
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
|
||||
last_context_flow_length: u64,
|
||||
@ -29,7 +30,7 @@ pub struct Sealer {
|
||||
impl Sealer {
|
||||
pub fn spawn(
|
||||
executor: TaskExecutor,
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
store: Arc<Store>,
|
||||
config: &MinerConfig,
|
||||
miner_id: H256,
|
||||
|
@ -33,11 +33,13 @@ impl MineService {
|
||||
config: MinerConfig,
|
||||
store: Arc<Store>,
|
||||
) -> Result<broadcast::Sender<MinerMessage>, String> {
|
||||
let provider = Arc::new(config.make_provider().await?);
|
||||
let provider = config.make_provider()?;
|
||||
let signing_provider = Arc::new(config.make_signing_provider().await?);
|
||||
|
||||
let (msg_send, msg_recv) = broadcast::channel(1024);
|
||||
|
||||
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
|
||||
let miner_id =
|
||||
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
|
||||
debug!("miner id setting complete.");
|
||||
|
||||
let mine_context_receiver = MineContextWatcher::spawn(
|
||||
@ -61,6 +63,7 @@ impl MineService {
|
||||
mine_answer_receiver,
|
||||
mine_context_receiver,
|
||||
provider.clone(),
|
||||
signing_provider,
|
||||
store.clone(),
|
||||
&config,
|
||||
);
|
||||
|
@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
|
||||
use contract_interface::{PoraMine, ZgsFlow};
|
||||
use ethereum_types::U256;
|
||||
use ethers::contract::ContractCall;
|
||||
use ethers::prelude::{Http, Provider, RetryClient};
|
||||
use ethers::providers::PendingTransaction;
|
||||
use hex::ToHex;
|
||||
use shared_types::FlowRangeProof;
|
||||
@ -24,7 +25,7 @@ pub struct Submitter {
|
||||
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
||||
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
||||
mine_contract: PoraMine<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||
default_gas_limit: Option<U256>,
|
||||
store: Arc<Store>,
|
||||
}
|
||||
@ -34,11 +35,12 @@ impl Submitter {
|
||||
executor: TaskExecutor,
|
||||
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
|
||||
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
signing_provider: Arc<MineServiceMiddleware>,
|
||||
store: Arc<Store>,
|
||||
config: &MinerConfig,
|
||||
) {
|
||||
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
|
||||
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
|
||||
let flow_contract = ZgsFlow::new(config.flow_address, provider);
|
||||
let default_gas_limit = config.submission_gas;
|
||||
|
||||
|
@ -14,13 +14,13 @@ use tokio::{
|
||||
try_join,
|
||||
};
|
||||
|
||||
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
|
||||
use ethers::prelude::{Http, RetryClient};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{ops::DerefMut, str::FromStr};
|
||||
|
||||
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
|
||||
|
||||
pub type MineContextMessage = Option<PoraPuzzle>;
|
||||
|
||||
lazy_static! {
|
||||
@ -29,9 +29,9 @@ lazy_static! {
|
||||
}
|
||||
|
||||
pub struct MineContextWatcher {
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
flow_contract: ZgsFlow<MineServiceMiddleware>,
|
||||
mine_contract: PoraMine<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
|
||||
mine_contract: PoraMine<Provider<RetryClient<Http>>>,
|
||||
|
||||
mine_context_sender: broadcast::Sender<MineContextMessage>,
|
||||
last_report: MineContextMessage,
|
||||
@ -44,7 +44,7 @@ impl MineContextWatcher {
|
||||
pub fn spawn(
|
||||
executor: TaskExecutor,
|
||||
msg_recv: broadcast::Receiver<MinerMessage>,
|
||||
provider: Arc<MineServiceMiddleware>,
|
||||
provider: Arc<Provider<RetryClient<Http>>>,
|
||||
config: &MinerConfig,
|
||||
) -> broadcast::Receiver<MineContextMessage> {
|
||||
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
|
||||
|
@ -10,7 +10,7 @@ mod service;
|
||||
use duration_str::deserialize_duration;
|
||||
use network::Multiaddr;
|
||||
use serde::Deserialize;
|
||||
use std::time::Duration;
|
||||
use std::{net::IpAddr, time::Duration};
|
||||
|
||||
pub use crate::service::RouterService;
|
||||
|
||||
@ -26,6 +26,7 @@ pub struct Config {
|
||||
pub libp2p_nodes: Vec<Multiaddr>,
|
||||
pub private_ip_enabled: bool,
|
||||
pub check_announced_ip: bool,
|
||||
pub public_address: Option<IpAddr>,
|
||||
|
||||
// batcher
|
||||
/// Timeout to publish messages in batch
|
||||
@ -47,6 +48,7 @@ impl Default for Config {
|
||||
libp2p_nodes: vec![],
|
||||
private_ip_enabled: false,
|
||||
check_announced_ip: false,
|
||||
public_address: None,
|
||||
|
||||
batcher_timeout: Duration::from_secs(1),
|
||||
batcher_file_capacity: 1,
|
||||
|
@ -348,17 +348,26 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> {
|
||||
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
|
||||
// public address configured
|
||||
if let Some(ip) = self.config.public_address {
|
||||
let mut addr = Multiaddr::empty();
|
||||
addr.push(ip.into());
|
||||
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
|
||||
return Some(addr);
|
||||
}
|
||||
|
||||
// public listen address
|
||||
if let Some(addr) = self.get_listen_addr() {
|
||||
return Some(addr);
|
||||
}
|
||||
|
||||
// auto detect public IP address
|
||||
let ipv4_addr = public_ip::addr_v4().await?;
|
||||
|
||||
let mut addr = Multiaddr::empty();
|
||||
addr.push(Protocol::Ip4(ipv4_addr));
|
||||
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
|
||||
addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
|
||||
|
||||
self.network_globals
|
||||
.listen_multiaddrs
|
||||
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
|
||||
|
||||
let peer_id = *self.network_globals.peer_id.read();
|
||||
|
||||
let addr = self.get_listen_addr_or_add().await?;
|
||||
let addr = self.construct_announced_ip().await?;
|
||||
|
||||
let timestamp = timestamp_now();
|
||||
let shard_config = self.store.get_store().get_shard_config();
|
||||
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
|
||||
shard_config: ShardConfig,
|
||||
) -> Option<PubsubMessage> {
|
||||
let peer_id = *self.network_globals.peer_id.read();
|
||||
let addr = self.get_listen_addr_or_add().await?;
|
||||
let addr = self.construct_announced_ip().await?;
|
||||
let timestamp = timestamp_now();
|
||||
|
||||
let msg = AnnounceShardConfig {
|
||||
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
|
||||
index_end: u64,
|
||||
) -> Option<PubsubMessage> {
|
||||
let peer_id = *self.network_globals.peer_id.read();
|
||||
let addr = self.get_listen_addr_or_add().await?;
|
||||
let addr = self.construct_announced_ip().await?;
|
||||
let timestamp = timestamp_now();
|
||||
|
||||
let msg = AnnounceChunks {
|
||||
|
@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::proc_macros::rpc;
|
||||
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::{config::ShardConfig, H256};
|
||||
|
||||
#[rpc(server, client, namespace = "zgs")]
|
||||
pub trait Rpc {
|
||||
@ -77,4 +77,7 @@ pub trait Rpc {
|
||||
sector_index: u64,
|
||||
flow_root: Option<DataRoot>,
|
||||
) -> RpcResult<FlowProof>;
|
||||
|
||||
#[method(name = "getFlowContext")]
|
||||
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ use jsonrpsee::core::RpcResult;
|
||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||
use std::fmt::{Debug, Formatter, Result};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::try_option;
|
||||
use storage::{try_option, H256};
|
||||
|
||||
pub struct RpcServerImpl {
|
||||
pub ctx: Context,
|
||||
@ -198,6 +198,10 @@ impl RpcServer for RpcServerImpl {
|
||||
assert_eq!(proof.left_proof, proof.right_proof);
|
||||
Ok(proof.right_proof)
|
||||
}
|
||||
|
||||
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
|
||||
Ok(self.ctx.log_store.get_context().await?)
|
||||
}
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
|
@ -2,7 +2,7 @@ use super::{Client, RuntimeContext};
|
||||
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
|
||||
use file_location_cache::FileLocationCache;
|
||||
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
||||
use miner::{MineService, MinerConfig, MinerMessage};
|
||||
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
|
||||
use network::{
|
||||
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
|
||||
Service as LibP2PService,
|
||||
@ -216,6 +216,16 @@ impl ClientBuilder {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
|
||||
self.async_store
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.update_shard_config(config)
|
||||
.await;
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Starts the networking stack.
|
||||
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
|
||||
let executor = require!("router", self, runtime_context).clone().executor;
|
||||
|
@ -200,6 +200,13 @@ impl ZgsConfig {
|
||||
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
|
||||
let mut router_config = self.router.clone();
|
||||
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
|
||||
|
||||
if router_config.public_address.is_none() {
|
||||
if let Some(addr) = &self.network_enr_address {
|
||||
router_config.public_address = Some(addr.parse().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(router_config)
|
||||
}
|
||||
|
||||
@ -228,7 +235,7 @@ impl ZgsConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn shard_config(&self) -> Result<ShardConfig, String> {
|
||||
pub fn shard_config(&self) -> Result<ShardConfig, String> {
|
||||
self.shard_position.clone().try_into()
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
let miner_config = config.mine_config()?;
|
||||
let router_config = config.router_config(&network_config)?;
|
||||
let pruner_config = config.pruner_config()?;
|
||||
let shard_config = config.shard_config()?;
|
||||
|
||||
ClientBuilder::default()
|
||||
.with_runtime_context(context)
|
||||
@ -30,6 +31,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
.await?
|
||||
.with_miner(miner_config)
|
||||
.await?
|
||||
.with_shard(shard_config)
|
||||
.await?
|
||||
.with_pruner(pruner_config)
|
||||
.await?
|
||||
.with_rpc(config.rpc, config.chunk_pool_config()?)
|
||||
|
@ -29,7 +29,7 @@ itertools = "0.13.0"
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
parking_lot = "0.12.3"
|
||||
serde_json = "1.0.127"
|
||||
tokio = { version = "1.10.0", features = ["sync"] }
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -712,7 +712,7 @@ impl LogManager {
|
||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
||||
}
|
||||
// Initialize
|
||||
None => Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None),
|
||||
None => Merkle::new_with_depth(vec![], 1, None),
|
||||
};
|
||||
|
||||
debug!(
|
||||
@ -761,6 +761,10 @@ impl LogManager {
|
||||
.merkle
|
||||
.write()
|
||||
.try_initialize(&log_manager.flow_store)?;
|
||||
info!(
|
||||
"Log manager initialized, state={:?}",
|
||||
log_manager.get_context()?
|
||||
);
|
||||
Ok(log_manager)
|
||||
}
|
||||
|
||||
@ -788,7 +792,8 @@ impl LogManager {
|
||||
.unwrap();
|
||||
}
|
||||
std::result::Result::Err(_) => {
|
||||
error!("Receiver error");
|
||||
debug!("Log manager inner channel closed");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -335,11 +335,7 @@ impl TransactionStore {
|
||||
}
|
||||
let mut merkle = if last_chunk_start_index == 0 {
|
||||
// The first entry hash is initialized as zero.
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
||||
vec![H256::zero()],
|
||||
log2_pow2(PORA_CHUNK_SIZE) + 1,
|
||||
None,
|
||||
)
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
|
||||
} else {
|
||||
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
|
||||
vec![],
|
||||
|
@ -324,7 +324,7 @@ auto_sync_enabled = true
|
||||
# enabled = false
|
||||
|
||||
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
|
||||
# report_interval = ""
|
||||
# report_interval = "10s"
|
||||
|
||||
# File name to output metrics periodically.
|
||||
# file_report_output = ""
|
||||
|
@ -336,7 +336,7 @@ auto_sync_enabled = true
|
||||
# enabled = false
|
||||
|
||||
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
|
||||
# report_interval = ""
|
||||
# report_interval = "10s"
|
||||
|
||||
# File name to output metrics periodically.
|
||||
# file_report_output = ""
|
||||
|
@ -338,7 +338,7 @@
|
||||
# enabled = false
|
||||
|
||||
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
|
||||
# report_interval = ""
|
||||
# report_interval = "10s"
|
||||
|
||||
# File name to output metrics periodically.
|
||||
# file_report_output = ""
|
||||
|
@ -1 +1 @@
|
||||
75c251804a29ab22adced50d92478cf0baf834bc
|
||||
bea58429e436e4952ae69235d9079cfc4ac5f3b3
|
||||
|
@ -40,8 +40,8 @@
|
||||
"type": "function"
|
||||
}
|
||||
],
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
|
||||
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122044ebf96fcad90f0bbc521513843d64fbc182c5c913a8210a4d638393793be63064736f6c63430008100033",
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
|
||||
"linkReferences": {},
|
||||
"deployedLinkReferences": {}
|
||||
}
|
||||
|
@ -70,8 +70,8 @@
|
||||
"type": "function"
|
||||
}
|
||||
],
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
|
||||
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ce57385afc7714a4000e530d1e1154d214fc1c0e2392abde201018635be1a2ab64736f6c63430008100033",
|
||||
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
|
||||
"linkReferences": {},
|
||||
"deployedLinkReferences": {}
|
||||
}
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
55
tests/root_consistency_test.py
Executable file
55
tests/root_consistency_test.py
Executable file
@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python3
|
||||
from test_framework.test_framework import TestFramework
|
||||
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
|
||||
from utility.submission import create_submission, submit_data
|
||||
from utility.utils import wait_until, assert_equal
|
||||
from test_framework.blockchain_node import BlockChainNodeType
|
||||
|
||||
|
||||
class RootConsistencyTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_blockchain_nodes = 1
|
||||
self.num_nodes = 1
|
||||
|
||||
def submit_data(self, item, size):
|
||||
submissions_before = self.contract.num_submissions()
|
||||
client = self.nodes[0]
|
||||
chunk_data = item * 256 * size
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||
|
||||
segment = submit_data(client, chunk_data)
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
def assert_flow_status(self, expected_length):
|
||||
contract_root = self.contract.get_flow_root().hex()
|
||||
contract_length = self.contract.get_flow_length()
|
||||
(node_root, node_length) = tuple(self.nodes[0].zgs_getFlowContext())
|
||||
|
||||
assert_equal(contract_length, node_length)
|
||||
assert_equal(contract_length, expected_length)
|
||||
assert_equal(contract_root, node_root[2:])
|
||||
|
||||
|
||||
|
||||
def run_test(self):
|
||||
self.assert_flow_status(1)
|
||||
|
||||
self.submit_data(b"\x11", 1)
|
||||
self.assert_flow_status(2)
|
||||
|
||||
self.submit_data(b"\x11", 8 + 4 + 2)
|
||||
self.assert_flow_status(16 + 4 + 2)
|
||||
|
||||
self.submit_data(b"\x12", 128 + 64)
|
||||
self.assert_flow_status(256 + 64)
|
||||
|
||||
self.submit_data(b"\x13", 512 + 256)
|
||||
self.assert_flow_status(1024 + 512 + 256)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
RootConsistencyTest().main()
|
@ -92,6 +92,11 @@ class FlowContractProxy(ContractProxy):
|
||||
def get_mine_context(self, node_idx=0):
|
||||
return self._call("makeContextWithResult", node_idx)
|
||||
|
||||
def get_flow_root(self, node_idx=0):
|
||||
return self._call("computeFlowRoot", node_idx)
|
||||
|
||||
def get_flow_length(self, node_idx=0):
|
||||
return self._call("tree", node_idx)[0]
|
||||
|
||||
class MineContractProxy(ContractProxy):
|
||||
def last_mined_epoch(self, node_idx=0):
|
||||
|
@ -101,6 +101,9 @@ class ZgsNode(TestNode):
|
||||
def zgs_get_file_info_by_tx_seq(self, tx_seq):
|
||||
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
|
||||
|
||||
def zgs_get_flow_context(self, tx_seq):
|
||||
return self.rpc.zgs_getFlowContext([tx_seq])
|
||||
|
||||
def shutdown(self):
|
||||
self.rpc.admin_shutdown()
|
||||
self.wait_until_stopped()
|
||||
|
Loading…
Reference in New Issue
Block a user