Merge branch 'main' into cache_mpt

This commit is contained in:
Peilun Li 2024-10-15 01:24:09 +08:00
commit 6a4b246e8b
28 changed files with 484 additions and 134 deletions

View File

@ -245,22 +245,16 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
) -> Result<Vec<(usize, usize, E)>> { ) -> Result<Vec<(usize, usize, E)>> {
self.node_manager.start_transaction(); self.node_manager.start_transaction();
let mut updated_nodes = Vec::new(); let mut updated_nodes = Vec::new();
updated_nodes.append( let mut left_nodes = proof.left_proof.proof_nodes_in_tree();
&mut self.fill_with_proof( if left_nodes.len() >= self.leaf_height {
proof updated_nodes
.left_proof .append(&mut self.fill_with_proof(left_nodes.split_off(self.leaf_height))?);
.proof_nodes_in_tree() }
.split_off(self.leaf_height), let mut right_nodes = proof.right_proof.proof_nodes_in_tree();
)?, if right_nodes.len() >= self.leaf_height {
); updated_nodes
updated_nodes.append( .append(&mut self.fill_with_proof(right_nodes.split_off(self.leaf_height))?);
&mut self.fill_with_proof( }
proof
.right_proof
.proof_nodes_in_tree()
.split_off(self.leaf_height),
)?,
);
self.node_manager.commit(); self.node_manager.commit();
Ok(updated_nodes) Ok(updated_nodes)
} }

View File

@ -9,5 +9,5 @@ exit-future = "0.2.0"
futures = "0.3.21" futures = "0.3.21"
lazy_static = "1.4.0" lazy_static = "1.4.0"
lighthouse_metrics = { path = "../lighthouse_metrics" } lighthouse_metrics = { path = "../lighthouse_metrics" }
tokio = { version = "1.19.2", features = ["rt"] } tokio = { version = "1.38.0", features = ["full"] }
tracing = "0.1.35" tracing = "0.1.35"

View File

@ -178,7 +178,10 @@ impl LogEntryFetcher {
if let Some(finalized_block_number) = finalized_block_number { if let Some(finalized_block_number) = finalized_block_number {
let safe_block_number = std::cmp::min( let safe_block_number = std::cmp::min(
std::cmp::min(log_latest_block_number, finalized_block_number), std::cmp::min(
log_latest_block_number.saturating_sub(1),
finalized_block_number,
),
processed_block_number, processed_block_number,
); );
let mut pending_keys = vec![]; let mut pending_keys = vec![];
@ -219,7 +222,7 @@ impl LogEntryFetcher {
) -> UnboundedReceiver<LogFetchProgress> { ) -> UnboundedReceiver<LogFetchProgress> {
let provider = self.provider.clone(); let provider = self.provider.clone();
let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel(); let (recover_tx, recover_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, provider.clone()); let contract = self.flow_contract();
let log_page_size = self.log_page_size; let log_page_size = self.log_page_size;
executor.spawn( executor.spawn(
@ -302,7 +305,7 @@ impl LogEntryFetcher {
mut watch_progress_rx: UnboundedReceiver<u64>, mut watch_progress_rx: UnboundedReceiver<u64>,
) -> UnboundedReceiver<LogFetchProgress> { ) -> UnboundedReceiver<LogFetchProgress> {
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel(); let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, self.provider.clone()); let contract = self.flow_contract();
let provider = self.provider.clone(); let provider = self.provider.clone();
let confirmation_delay = self.confirmation_delay; let confirmation_delay = self.confirmation_delay;
let log_page_size = self.log_page_size; let log_page_size = self.log_page_size;
@ -580,6 +583,10 @@ impl LogEntryFetcher {
pub fn provider(&self) -> &Provider<RetryClient<Http>> { pub fn provider(&self) -> &Provider<RetryClient<Http>> {
self.provider.as_ref() self.provider.as_ref()
} }
pub fn flow_contract(&self) -> ZgsFlow<Provider<RetryClient<Http>>> {
ZgsFlow::new(self.contract_address, self.provider.clone())
}
} }
async fn check_watch_process( async fn check_watch_process(
@ -655,19 +662,26 @@ async fn check_watch_process(
"get block hash for block {} from RPC, assume there is no org", "get block hash for block {} from RPC, assume there is no org",
*progress - 1 *progress - 1
); );
let hash = loop {
match provider.get_block(*progress - 1).await { match provider.get_block(*progress - 1).await {
Ok(Some(v)) => { Ok(Some(v)) => {
v.hash.expect("parent block hash expect exist"); break v.hash.expect("parent block hash expect exist");
} }
Ok(None) => { Ok(None) => {
panic!("parent block {} expect exist", *progress - 1); panic!("parent block {} expect exist", *progress - 1);
} }
Err(e) => { Err(e) => {
if e.to_string().contains("server is too busy") {
warn!("server busy, wait for parent block {}", *progress - 1);
} else {
panic!("parent block {} expect exist, error {}", *progress - 1, e); panic!("parent block {} expect exist, error {}", *progress - 1, e);
} }
} }
} }
}; };
break hash;
}
};
} }
progress_reset_history.retain(|k, _| k + 1000 >= *progress); progress_reset_history.retain(|k, _| k + 1000 >= *progress);

View File

@ -510,6 +510,41 @@ impl LogSyncManager {
} }
self.data_cache.garbage_collect(self.next_tx_seq); self.data_cache.garbage_collect(self.next_tx_seq);
self.next_tx_seq += 1; self.next_tx_seq += 1;
// Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
}
}
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
}
}
true true
} }
} }

View File

@ -10,7 +10,7 @@ mod service;
use duration_str::deserialize_duration; use duration_str::deserialize_duration;
use network::Multiaddr; use network::Multiaddr;
use serde::Deserialize; use serde::Deserialize;
use std::time::Duration; use std::{net::IpAddr, time::Duration};
pub use crate::service::RouterService; pub use crate::service::RouterService;
@ -26,6 +26,7 @@ pub struct Config {
pub libp2p_nodes: Vec<Multiaddr>, pub libp2p_nodes: Vec<Multiaddr>,
pub private_ip_enabled: bool, pub private_ip_enabled: bool,
pub check_announced_ip: bool, pub check_announced_ip: bool,
pub public_address: Option<IpAddr>,
// batcher // batcher
/// Timeout to publish messages in batch /// Timeout to publish messages in batch
@ -47,6 +48,7 @@ impl Default for Config {
libp2p_nodes: vec![], libp2p_nodes: vec![],
private_ip_enabled: false, private_ip_enabled: false,
check_announced_ip: false, check_announced_ip: false,
public_address: None,
batcher_timeout: Duration::from_secs(1), batcher_timeout: Duration::from_secs(1),
batcher_file_capacity: 1, batcher_file_capacity: 1,

View File

@ -348,17 +348,26 @@ impl Libp2pEventHandler {
} }
} }
async fn get_listen_addr_or_add(&self) -> Option<Multiaddr> { async fn construct_announced_ip(&self) -> Option<Multiaddr> {
// public address configured
if let Some(ip) = self.config.public_address {
let mut addr = Multiaddr::empty();
addr.push(ip.into());
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
return Some(addr);
}
// public listen address
if let Some(addr) = self.get_listen_addr() { if let Some(addr) = self.get_listen_addr() {
return Some(addr); return Some(addr);
} }
// auto detect public IP address
let ipv4_addr = public_ip::addr_v4().await?; let ipv4_addr = public_ip::addr_v4().await?;
let mut addr = Multiaddr::empty(); let mut addr = Multiaddr::empty();
addr.push(Protocol::Ip4(ipv4_addr)); addr.push(Protocol::Ip4(ipv4_addr));
addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp())); addr.push(Protocol::Tcp(self.network_globals.listen_port_tcp()));
addr.push(Protocol::P2p(self.network_globals.local_peer_id().into()));
self.network_globals self.network_globals
.listen_multiaddrs .listen_multiaddrs
@ -420,7 +429,7 @@ impl Libp2pEventHandler {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let shard_config = self.store.get_store().get_shard_config(); let shard_config = self.store.get_store().get_shard_config();
@ -452,7 +461,7 @@ impl Libp2pEventHandler {
shard_config: ShardConfig, shard_config: ShardConfig,
) -> Option<PubsubMessage> { ) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let msg = AnnounceShardConfig { let msg = AnnounceShardConfig {
@ -528,7 +537,7 @@ impl Libp2pEventHandler {
index_end: u64, index_end: u64,
) -> Option<PubsubMessage> { ) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read(); let peer_id = *self.network_globals.peer_id.read();
let addr = self.get_listen_addr_or_add().await?; let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let msg = AnnounceChunks { let msg = AnnounceChunks {

View File

@ -12,9 +12,23 @@ pub trait Rpc {
#[method(name = "uploadSegment")] #[method(name = "uploadSegment")]
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()>; async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()>;
#[method(name = "uploadSegmentByTxSeq")]
async fn upload_segment_by_tx_seq(
&self,
segment: SegmentWithProof,
tx_seq: u64,
) -> RpcResult<()>;
#[method(name = "uploadSegments")] #[method(name = "uploadSegments")]
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()>; async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()>;
#[method(name = "uploadSegmentsByTxSeq")]
async fn upload_segments_by_tx_seq(
&self,
segments: Vec<SegmentWithProof>,
tx_seq: u64,
) -> RpcResult<()>;
#[method(name = "downloadSegment")] #[method(name = "downloadSegment")]
async fn download_segment( async fn download_segment(
&self, &self,
@ -23,6 +37,14 @@ pub trait Rpc {
end_index: usize, end_index: usize,
) -> RpcResult<Option<Segment>>; ) -> RpcResult<Option<Segment>>;
#[method(name = "downloadSegmentByTxSeq")]
async fn download_segment_by_tx_seq(
&self,
tx_seq: u64,
start_index: usize,
end_index: usize,
) -> RpcResult<Option<Segment>>;
#[method(name = "downloadSegmentWithProof")] #[method(name = "downloadSegmentWithProof")]
async fn download_segment_with_proof( async fn download_segment_with_proof(
&self, &self,
@ -30,6 +52,13 @@ pub trait Rpc {
index: usize, index: usize,
) -> RpcResult<Option<SegmentWithProof>>; ) -> RpcResult<Option<SegmentWithProof>>;
#[method(name = "downloadSegmentWithProofByTxSeq")]
async fn download_segment_with_proof_by_tx_seq(
&self,
tx_seq: u64,
index: usize,
) -> RpcResult<Option<SegmentWithProof>>;
#[method(name = "checkFileFinalized")] #[method(name = "checkFileFinalized")]
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>; async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;

View File

@ -42,6 +42,16 @@ impl RpcServer for RpcServerImpl {
self.put_segment(segment).await self.put_segment(segment).await
} }
async fn upload_segment_by_tx_seq(
&self,
segment: SegmentWithProof,
tx_seq: u64,
) -> RpcResult<()> {
info!(tx_seq = %tx_seq, index = %segment.index, "zgs_uploadSegmentByTxSeq");
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
self.put_segment_with_maybe_tx(segment, maybe_tx).await
}
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> { async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
let root = match segments.first() { let root = match segments.first() {
None => return Ok(()), None => return Ok(()),
@ -57,6 +67,23 @@ impl RpcServer for RpcServerImpl {
Ok(()) Ok(())
} }
async fn upload_segments_by_tx_seq(
&self,
segments: Vec<SegmentWithProof>,
tx_seq: u64,
) -> RpcResult<()> {
let indices = SegmentIndexArray::new(&segments);
info!(%tx_seq, ?indices, "zgs_uploadSegmentsByTxSeq");
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
for segment in segments.into_iter() {
self.put_segment_with_maybe_tx(segment, maybe_tx.clone())
.await?;
}
Ok(())
}
async fn download_segment( async fn download_segment(
&self, &self,
data_root: DataRoot, data_root: DataRoot,
@ -65,34 +92,26 @@ impl RpcServer for RpcServerImpl {
) -> RpcResult<Option<Segment>> { ) -> RpcResult<Option<Segment>> {
info!(%data_root, %start_index, %end_index, "zgs_downloadSegment"); info!(%data_root, %start_index, %end_index, "zgs_downloadSegment");
if start_index >= end_index {
return Err(error::invalid_params("end_index", "invalid chunk index"));
}
if end_index - start_index > self.ctx.config.chunks_per_segment {
return Err(error::invalid_params(
"end_index",
format!(
"exceeds maximum chunks {}",
self.ctx.config.chunks_per_segment
),
));
}
let tx_seq = try_option!( let tx_seq = try_option!(
self.ctx self.ctx
.log_store .log_store
.get_tx_seq_by_data_root(&data_root) .get_tx_seq_by_data_root(&data_root)
.await? .await?
); );
let segment = try_option!(
self.ctx
.log_store
.get_chunks_by_tx_and_index_range(tx_seq, start_index, end_index)
.await?
);
Ok(Some(Segment(segment.data))) self.get_segment_by_tx_seq(tx_seq, start_index, end_index)
.await
}
async fn download_segment_by_tx_seq(
&self,
tx_seq: u64,
start_index: usize,
end_index: usize,
) -> RpcResult<Option<Segment>> {
info!(%tx_seq, %start_index, %end_index, "zgs_downloadSegmentByTxSeq");
self.get_segment_by_tx_seq(tx_seq, start_index, end_index)
.await
} }
async fn download_segment_with_proof( async fn download_segment_with_proof(
@ -104,40 +123,19 @@ impl RpcServer for RpcServerImpl {
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?); let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
// validate index self.get_segment_with_proof_by_tx(tx, index).await
let chunks_per_segment = self.ctx.config.chunks_per_segment;
let (num_segments, last_segment_size) =
SegmentWithProof::split_file_into_segments(tx.size as usize, chunks_per_segment)?;
if index >= num_segments {
return Err(error::invalid_params("index", "index out of bound"));
} }
// calculate chunk start and end index async fn download_segment_with_proof_by_tx_seq(
let start_index = index * chunks_per_segment; &self,
let end_index = if index == num_segments - 1 { tx_seq: u64,
// last segment without padding chunks by flow index: usize,
start_index + last_segment_size / CHUNK_SIZE ) -> RpcResult<Option<SegmentWithProof>> {
} else { info!(%tx_seq, %index, "zgs_downloadSegmentWithProofByTxSeq");
start_index + chunks_per_segment
};
let segment = try_option!( let tx = try_option!(self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?);
self.ctx
.log_store
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index, None)
.await?
);
let proof = tx.compute_segment_proof(&segment, chunks_per_segment)?; self.get_segment_with_proof_by_tx(tx, index).await
Ok(Some(SegmentWithProof {
root: data_root,
data: segment.chunks.data,
index,
proof,
file_size: tx.size as usize,
}))
} }
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> { async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
@ -277,15 +275,29 @@ impl RpcServerImpl {
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> { async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
debug!(root = %segment.root, index = %segment.index, "putSegment"); debug!(root = %segment.root, index = %segment.index, "putSegment");
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
let maybe_tx = self let maybe_tx = self
.ctx .ctx
.log_store .log_store
.get_tx_by_data_root(&segment.root) .get_tx_by_data_root(&segment.root)
.await?; .await?;
let mut need_cache = false;
self.put_segment_with_maybe_tx(segment, maybe_tx).await
}
async fn put_segment_with_maybe_tx(
&self,
segment: SegmentWithProof,
maybe_tx: Option<Transaction>,
) -> RpcResult<()> {
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
if let Some(tx) = &maybe_tx {
if tx.data_merkle_root != segment.root {
return Err(error::internal_error("data root and tx seq not match"));
}
}
let mut need_cache = false;
if self if self
.ctx .ctx
.chunk_pool .chunk_pool
@ -323,6 +335,77 @@ impl RpcServerImpl {
} }
Ok(()) Ok(())
} }
async fn get_segment_by_tx_seq(
&self,
tx_seq: u64,
start_index: usize,
end_index: usize,
) -> RpcResult<Option<Segment>> {
if start_index >= end_index {
return Err(error::invalid_params("end_index", "invalid chunk index"));
}
if end_index - start_index > self.ctx.config.chunks_per_segment {
return Err(error::invalid_params(
"end_index",
format!(
"exceeds maximum chunks {}",
self.ctx.config.chunks_per_segment
),
));
}
let segment = try_option!(
self.ctx
.log_store
.get_chunks_by_tx_and_index_range(tx_seq, start_index, end_index)
.await?
);
Ok(Some(Segment(segment.data)))
}
async fn get_segment_with_proof_by_tx(
&self,
tx: Transaction,
index: usize,
) -> RpcResult<Option<SegmentWithProof>> {
// validate index
let chunks_per_segment = self.ctx.config.chunks_per_segment;
let (num_segments, last_segment_size) =
SegmentWithProof::split_file_into_segments(tx.size as usize, chunks_per_segment)?;
if index >= num_segments {
return Err(error::invalid_params("index", "index out of bound"));
}
// calculate chunk start and end index
let start_index = index * chunks_per_segment;
let end_index = if index == num_segments - 1 {
// last segment without padding chunks by flow
start_index + last_segment_size / CHUNK_SIZE
} else {
start_index + chunks_per_segment
};
let segment = try_option!(
self.ctx
.log_store
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index, None)
.await?
);
let proof = tx.compute_segment_proof(&segment, chunks_per_segment)?;
Ok(Some(SegmentWithProof {
root: tx.data_merkle_root,
data: segment.chunks.data,
index,
proof,
file_size: tx.size as usize,
}))
}
} }
enum SegmentIndex { enum SegmentIndex {

View File

@ -113,12 +113,16 @@ impl Transaction {
1 << (depth - 1) 1 << (depth - 1)
} }
pub fn num_entries(&self) -> usize { pub fn num_entries_of_list(merkle_nodes: &[(usize, DataRoot)]) -> usize {
self.merkle_nodes.iter().fold(0, |size, &(depth, _)| { merkle_nodes.iter().fold(0, |size, &(depth, _)| {
size + Transaction::num_entries_of_node(depth) size + Transaction::num_entries_of_node(depth)
}) })
} }
pub fn num_entries(&self) -> usize {
Self::num_entries_of_list(&self.merkle_nodes)
}
pub fn hash(&self) -> H256 { pub fn hash(&self) -> H256 {
let bytes = self.as_ssz_bytes(); let bytes = self.as_ssz_bytes();
let mut h = Keccak::v256(); let mut h = Keccak::v256();

View File

@ -2,7 +2,7 @@ use super::{Client, RuntimeContext};
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool}; use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager}; use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage}; use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
use network::{ use network::{
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId, self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService, Service as LibP2PService,
@ -216,6 +216,16 @@ impl ClientBuilder {
Ok(self) Ok(self)
} }
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
self.async_store
.as_ref()
.unwrap()
.update_shard_config(config)
.await;
Ok(self)
}
/// Starts the networking stack. /// Starts the networking stack.
pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> { pub fn with_router(mut self, router_config: router::Config) -> Result<Self, String> {
let executor = require!("router", self, runtime_context).clone().executor; let executor = require!("router", self, runtime_context).clone().executor;

View File

@ -204,6 +204,13 @@ impl ZgsConfig {
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> { pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
let mut router_config = self.router.clone(); let mut router_config = self.router.clone();
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec(); router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
if router_config.public_address.is_none() {
if let Some(addr) = &self.network_enr_address {
router_config.public_address = Some(addr.parse().unwrap());
}
}
Ok(router_config) Ok(router_config)
} }
@ -232,7 +239,7 @@ impl ZgsConfig {
} }
} }
fn shard_config(&self) -> Result<ShardConfig, String> { pub fn shard_config(&self) -> Result<ShardConfig, String> {
self.shard_position.clone().try_into() self.shard_position.clone().try_into()
} }
} }

View File

@ -17,6 +17,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
let miner_config = config.mine_config()?; let miner_config = config.mine_config()?;
let router_config = config.router_config(&network_config)?; let router_config = config.router_config(&network_config)?;
let pruner_config = config.pruner_config()?; let pruner_config = config.pruner_config()?;
let shard_config = config.shard_config()?;
ClientBuilder::default() ClientBuilder::default()
.with_runtime_context(context) .with_runtime_context(context)
@ -30,6 +31,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await? .await?
.with_miner(miner_config) .with_miner(miner_config)
.await? .await?
.with_shard(shard_config)
.await?
.with_pruner(pruner_config) .with_pruner(pruner_config)
.await? .await?
.with_rpc(config.rpc, config.chunk_pool_config()?) .with_rpc(config.rpc, config.chunk_pool_config()?)

View File

@ -29,7 +29,7 @@ itertools = "0.13.0"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
parking_lot = "0.12.3" parking_lot = "0.12.3"
serde_json = "1.0.127" serde_json = "1.0.127"
tokio = { version = "1.10.0", features = ["sync"] } tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
[dev-dependencies] [dev-dependencies]

View File

@ -258,7 +258,7 @@ impl LogStoreWrite for LogManager {
} }
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned(); let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
// TODO(zz): Should we validate received tx? // TODO(zz): Should we validate received tx?
self.append_subtree_list(tx.merkle_nodes.clone(), &mut merkle)?; self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
merkle.commit_merkle(tx.seq)?; merkle.commit_merkle(tx.seq)?;
debug!( debug!(
"commit flow root: root={:?}", "commit flow root: root={:?}",
@ -694,7 +694,7 @@ impl LogManager {
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)? tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
} }
// Initialize // Initialize
None => Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None), None => Merkle::new_with_depth(vec![], 1, None),
}; };
debug!( debug!(
@ -732,6 +732,10 @@ impl LogManager {
.merkle .merkle
.write() .write()
.try_initialize(&log_manager.flow_store)?; .try_initialize(&log_manager.flow_store)?;
info!(
"Log manager initialized, state={:?}",
log_manager.get_context()?
);
Ok(log_manager) Ok(log_manager)
} }
@ -839,6 +843,7 @@ impl LogManager {
#[instrument(skip(self, merkle))] #[instrument(skip(self, merkle))]
fn append_subtree_list( fn append_subtree_list(
&self, &self,
tx_start_index: u64,
merkle_list: Vec<(usize, DataRoot)>, merkle_list: Vec<(usize, DataRoot)>,
merkle: &mut MerkleManager, merkle: &mut MerkleManager,
) -> Result<()> { ) -> Result<()> {
@ -846,7 +851,7 @@ impl LogManager {
return Ok(()); return Ok(());
} }
self.pad_tx(1 << (merkle_list[0].0 - 1), &mut *merkle)?; self.pad_tx(tx_start_index, &mut *merkle)?;
let mut batch_root_map = BTreeMap::new(); let mut batch_root_map = BTreeMap::new();
for (subtree_depth, subtree_root) in merkle_list { for (subtree_depth, subtree_root) in merkle_list {
@ -894,18 +899,18 @@ impl LogManager {
} }
#[instrument(skip(self, merkle))] #[instrument(skip(self, merkle))]
fn pad_tx(&self, first_subtree_size: u64, merkle: &mut MerkleManager) -> Result<()> { fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow. // Check if we need to pad the flow.
let mut tx_start_flow_index = let mut tx_start_flow_index =
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64; merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
let extra = tx_start_flow_index % first_subtree_size; let pad_size = tx_start_index - tx_start_flow_index;
trace!( trace!(
"before pad_tx {} {}", "before pad_tx {} {}",
merkle.pora_chunks_merkle.leaves(), merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves() merkle.last_chunk_merkle.leaves()
); );
if extra != 0 { if pad_size != 0 {
for pad_data in Self::padding((first_subtree_size - extra) as usize) { for pad_data in Self::padding(pad_size as usize) {
let mut is_full_empty = true; let mut is_full_empty = true;
let mut root_map = BTreeMap::new(); let mut root_map = BTreeMap::new();
@ -968,12 +973,10 @@ impl LogManager {
// Update the flow database. // Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save // This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known. // subtrees with data known.
self.flow_store self.flow_store.append_entries(ChunkArray {
.append_entries(ChunkArray {
data: pad_data.to_vec(), data: pad_data.to_vec(),
start_index: tx_start_flow_index, start_index: tx_start_flow_index,
}) })?;
.unwrap();
} }
tx_start_flow_index += data_size as u64; tx_start_flow_index += data_size as u64;

View File

@ -335,11 +335,7 @@ impl TransactionStore {
} }
let mut merkle = if last_chunk_start_index == 0 { let mut merkle = if last_chunk_start_index == 0 {
// The first entry hash is initialized as zero. // The first entry hash is initialized as zero.
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth( AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
vec![H256::zero()],
log2_pow2(PORA_CHUNK_SIZE) + 1,
None,
)
} else { } else {
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth( AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
vec![], vec![],

View File

@ -33,7 +33,7 @@
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be # List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
# configured as well to enable UDP discovery. # configured as well to enable UDP discovery.
network_boot_nodes = ["/ip4/35.95.5.134/udp/1234/p2p/16Uiu2HAmFGrDV8wKToa1dd8uh6bz8bSY28n33iRP3pvfeBU6ysCw","/ip4/35.84.189.77/udp/1234/p2p/16Uiu2HAmF7t5iuRoWLMvQVfHbbJr5TFgHo2oU1CDxJm56eLdxRAY","/ip4/8.154.34.28/udp/1234/p2p/16Uiu2HAmBb7PQzvfZjHBENcF7E7mZaiHSrpBoH7mKTyNijYdqMM6"] network_boot_nodes = ["/ip4/47.251.88.201/udp/1234/p2p/16Uiu2HAmFGrDV8wKToa1dd8uh6bz8bSY28n33iRP3pvfeBU6ysCw","/ip4/47.76.49.188/udp/1234/p2p/16Uiu2HAmBb7PQzvfZjHBENcF7E7mZaiHSrpBoH7mKTyNijYdqMM6"]
# List of libp2p nodes to initially connect to. # List of libp2p nodes to initially connect to.
# network_libp2p_nodes = [] # network_libp2p_nodes = []

View File

@ -33,7 +33,7 @@
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be # List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
# configured as well to enable UDP discovery. # configured as well to enable UDP discovery.
network_boot_nodes = ["/ip4/54.219.26.22/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQWu3i1FL1aFYeh9wiQTNu4pDCgps","/ip4/52.52.127.117/udp/1234/p2p/16Uiu2HAkzRjxK2gorngB1Xq84qDrT4hSVznYDHj6BkbaE4SGx9oS","/ip4/8.154.47.100/udp/1234/p2p/16Uiu2HAm2k6ua2mGgvZ8rTMV8GhpW71aVzkQWy7D37TTDuLCpgmX"] network_boot_nodes = ["/ip4/47.251.117.133/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQWu3i1FL1aFYeh9wiQTNu4pDCgps","/ip4/47.76.61.226/udp/1234/p2p/16Uiu2HAm2k6ua2mGgvZ8rTMV8GhpW71aVzkQWy7D37TTDuLCpgmX"]
# List of libp2p nodes to initially connect to. # List of libp2p nodes to initially connect to.
# network_libp2p_nodes = [] # network_libp2p_nodes = []

View File

@ -1 +1 @@
a0b536c6acff24b5d4bf20d9db4e95c399e61196 bea58429e436e4952ae69235d9079cfc4ac5f3b3

File diff suppressed because one or more lines are too long

View File

@ -25,10 +25,23 @@
"outputs": [], "outputs": [],
"stateMutability": "nonpayable", "stateMutability": "nonpayable",
"type": "function" "type": "function"
},
{
"inputs": [],
"name": "pricePerSector",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
} }
], ],
"bytecode": "0x6080604052348015600f57600080fd5b5060a08061001e6000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c8063da6eb36a14602d575b600080fd5b603d6038366004603f565b505050565b005b600080600060608486031215605357600080fd5b50508135936020830135935060409092013591905056fea2646970667358221220fba54ab16c6496385cdd933e87b05b9e545a857b82ffa918f0d0e4a34ae41d7164736f6c63430008100033", "stateMutability": "pure",
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060285760003560e01c8063da6eb36a14602d575b600080fd5b603d6038366004603f565b505050565b005b600080600060608486031215605357600080fd5b50508135936020830135935060409092013591905056fea2646970667358221220fba54ab16c6496385cdd933e87b05b9e545a857b82ffa918f0d0e4a34ae41d7164736f6c63430008100033", "type": "function"
}
],
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"linkReferences": {}, "linkReferences": {},
"deployedLinkReferences": {} "deployedLinkReferences": {}
} }

View File

@ -70,8 +70,8 @@
"type": "function" "type": "function"
} }
], ],
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ebb4f7274983bea96e7fd68a63e91f4ad67260ff76111312d8c8559b9b5b621064736f6c63430008100033", "bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220ebb4f7274983bea96e7fd68a63e91f4ad67260ff76111312d8c8559b9b5b621064736f6c63430008100033", "deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"linkReferences": {}, "linkReferences": {},
"deployedLinkReferences": {} "deployedLinkReferences": {}
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
import time
import base64
import random
from test_framework.test_framework import TestFramework
from utility.submission import ENTRY_SIZE, submit_data
from utility.submission import create_submission
from utility.utils import (
assert_equal,
wait_until,
)
class ShardSubmitTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 4
self.zgs_node_configs[0] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "0/4"
}
self.zgs_node_configs[1] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "1/4"
}
self.zgs_node_configs[2] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "2/4"
}
self.zgs_node_configs[3] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "3/4"
}
def run_test(self):
data_size = [
256*960,
256*1024,
2,
255,
256*960,
256*120,
256,
257,
1023,
1024,
1025,
256 * 1023,
256 * 1023 + 1,
256 * 1024,
256 * 1024 + 1,
256 * 1025,
256 * 2048 - 1,
256 * 2048,
256 * 16385,
256 * 1024 * 256,
]
for i, v in enumerate(data_size):
self.submission_data(v, i + 1, True)
def submission_data(self, size, submission_index, rand_data=True):
self.log.info("file size: %d", size)
chunk_data = random.randbytes(size) if rand_data else b"\x10" * size
submissions, data_root = create_submission(chunk_data)
self.log.info("data root: %s, submissions: %s", data_root, submissions)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == submission_index)
for i in range(4):
client = self.nodes[i]
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
if __name__ == "__main__":
ShardSubmitTest().main()

View File

@ -194,13 +194,16 @@ def generate_merkle_tree_by_batch(data):
def submit_data(client, data): def submit_data(client, data):
# NOTE: we assume the data is unique in this function, otherwise zgs_getFileInfo will only get the information of the first data with same root
shard_config = client.rpc.zgs_getShardConfig() shard_config = client.rpc.zgs_getShardConfig()
shard_id = int(shard_config["shardId"]) shard_id = int(shard_config["shardId"])
num_shard = int(shard_config["numShard"]) num_shard = int(shard_config["numShard"])
segments = data_to_segments(data) segments = data_to_segments(data)
file_info = client.zgs_get_file_info(segments[0]["root"])
start_seg_index = file_info["tx"]["startEntryIndex"] // 1024
for index, segment in enumerate(segments): for index, segment in enumerate(segments):
if index % num_shard == shard_id: if (start_seg_index + index) % num_shard == shard_id:
client.zgs_upload_segment(segment) client.zgs_upload_segment(segment)
return segments return segments