From e06936f381475dcb57edf1439eda0305c8df6c46 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 16:39:59 +0800 Subject: [PATCH 01/19] Add new P2P protocol NewFile --- node/network/src/behaviour/gossip_cache.rs | 13 +++++++++++++ node/network/src/behaviour/mod.rs | 3 +++ node/network/src/types/mod.rs | 2 +- node/network/src/types/pubsub.rs | 17 +++++++++++++++++ node/network/src/types/topics.rs | 8 +++++++- node/router/src/libp2p_event_handler.rs | 2 ++ 6 files changed, 43 insertions(+), 2 deletions(-) diff --git a/node/network/src/behaviour/gossip_cache.rs b/node/network/src/behaviour/gossip_cache.rs index acaa3c7..78b38d3 100644 --- a/node/network/src/behaviour/gossip_cache.rs +++ b/node/network/src/behaviour/gossip_cache.rs @@ -20,6 +20,8 @@ pub struct GossipCache { topic_msgs: HashMap, Key>>, /// Timeout for Example messages. example: Option, + /// Timeout for NewFile messages. + new_file: Option, /// Timeout for FindFile messages. find_file: Option, /// Timeout for FindChunks messages. @@ -37,6 +39,8 @@ pub struct GossipCacheBuilder { default_timeout: Option, /// Timeout for Example messages. example: Option, + /// Timeout for NewFile messges. + new_file: Option, /// Timeout for blocks FindFile messages. find_file: Option, /// Timeout for blocks FindChunks messages. @@ -64,6 +68,12 @@ impl GossipCacheBuilder { self } + /// Timeout for NewFile messages. + pub fn new_file_timeout(mut self, timeout: Duration) -> Self { + self.new_file = Some(timeout); + self + } + /// Timeout for FindFile messages. pub fn find_file_timeout(mut self, timeout: Duration) -> Self { self.find_file = Some(timeout); @@ -98,6 +108,7 @@ impl GossipCacheBuilder { let GossipCacheBuilder { default_timeout, example, + new_file, find_file, find_chunks, announce_file, @@ -109,6 +120,7 @@ impl GossipCacheBuilder { expirations: DelayQueue::default(), topic_msgs: HashMap::default(), example: example.or(default_timeout), + new_file: new_file.or(default_timeout), find_file: find_file.or(default_timeout), find_chunks: find_chunks.or(default_timeout), announce_file: announce_file.or(default_timeout), @@ -129,6 +141,7 @@ impl GossipCache { pub fn insert(&mut self, topic: GossipTopic, data: Vec) { let expire_timeout = match topic.kind() { GossipKind::Example => self.example, + GossipKind::NewFile => self.new_file, GossipKind::FindFile => self.find_file, GossipKind::FindChunks => self.find_chunks, GossipKind::AnnounceFile => self.announce_file, diff --git a/node/network/src/behaviour/mod.rs b/node/network/src/behaviour/mod.rs index d06c692..f3c6c26 100644 --- a/node/network/src/behaviour/mod.rs +++ b/node/network/src/behaviour/mod.rs @@ -232,6 +232,9 @@ impl Behaviour { let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into(); topic.hash() }; + params + .topics + .insert(get_hash(GossipKind::NewFile), TopicScoreParams::default()); params .topics .insert(get_hash(GossipKind::FindFile), TopicScoreParams::default()); diff --git a/node/network/src/types/mod.rs b/node/network/src/types/mod.rs index bb0a661..c8f93a7 100644 --- a/node/network/src/types/mod.rs +++ b/node/network/src/types/mod.rs @@ -7,7 +7,7 @@ pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::{ - AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, + AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile, PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig, SignedMessage, SnappyTransform, }; diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 13f6a60..700878b 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -114,6 +114,14 @@ impl ssz::Decode for WrappedPeerId { } } +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct NewFile { + pub tx_id: TxID, + pub num_shard: usize, + pub shard_id: usize, + pub timestamp: u32, +} + #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct FindFile { pub tx_id: TxID, @@ -205,6 +213,7 @@ type SignedAnnounceFiles = Vec; #[derive(Debug, Clone, PartialEq, Eq)] pub enum PubsubMessage { ExampleMessage(u64), + NewFile(NewFile), FindFile(FindFile), FindChunks(FindChunks), AnnounceFile(Vec), @@ -283,6 +292,7 @@ impl PubsubMessage { pub fn kind(&self) -> GossipKind { match self { PubsubMessage::ExampleMessage(_) => GossipKind::Example, + PubsubMessage::NewFile(_) => GossipKind::NewFile, PubsubMessage::FindFile(_) => GossipKind::FindFile, PubsubMessage::FindChunks(_) => GossipKind::FindChunks, PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile, @@ -309,6 +319,9 @@ impl PubsubMessage { GossipKind::Example => Ok(PubsubMessage::ExampleMessage( u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, )), + GossipKind::NewFile => Ok(PubsubMessage::NewFile( + NewFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, + )), GossipKind::FindFile => Ok(PubsubMessage::FindFile( FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, )), @@ -341,6 +354,7 @@ impl PubsubMessage { // messages for us. match &self { PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(), + PubsubMessage::NewFile(data) => data.as_ssz_bytes(), PubsubMessage::FindFile(data) => data.as_ssz_bytes(), PubsubMessage::FindChunks(data) => data.as_ssz_bytes(), PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(), @@ -356,6 +370,9 @@ impl std::fmt::Display for PubsubMessage { PubsubMessage::ExampleMessage(msg) => { write!(f, "Example message: {}", msg) } + PubsubMessage::NewFile(msg) => { + write!(f, "NewFile message: {:?}", msg) + } PubsubMessage::FindFile(msg) => { write!(f, "FindFile message: {:?}", msg) } diff --git a/node/network/src/types/topics.rs b/node/network/src/types/topics.rs index e9dfba8..a4fc80b 100644 --- a/node/network/src/types/topics.rs +++ b/node/network/src/types/topics.rs @@ -8,13 +8,15 @@ use strum::AsRefStr; pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const EXAMPLE_TOPIC: &str = "example"; +pub const NEW_FILE_TOPIC: &str = "new_file"; pub const FIND_FILE_TOPIC: &str = "find_file"; pub const FIND_CHUNKS_TOPIC: &str = "find_chunks"; pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file"; pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks"; pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config"; -pub const CORE_TOPICS: [GossipKind; 4] = [ +pub const CORE_TOPICS: [GossipKind; 5] = [ + GossipKind::NewFile, GossipKind::FindFile, GossipKind::FindChunks, GossipKind::AnnounceFile, @@ -37,6 +39,7 @@ pub struct GossipTopic { #[strum(serialize_all = "snake_case")] pub enum GossipKind { Example, + NewFile, FindFile, FindChunks, AnnounceFile, @@ -77,6 +80,7 @@ impl GossipTopic { let kind = match topic_parts[2] { EXAMPLE_TOPIC => GossipKind::Example, + NEW_FILE_TOPIC => GossipKind::NewFile, FIND_FILE_TOPIC => GossipKind::FindFile, FIND_CHUNKS_TOPIC => GossipKind::FindChunks, ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile, @@ -106,6 +110,7 @@ impl From for String { let kind = match topic.kind { GossipKind::Example => EXAMPLE_TOPIC, + GossipKind::NewFile => NEW_FILE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC, GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, @@ -125,6 +130,7 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::Example => EXAMPLE_TOPIC, + GossipKind::NewFile => NEW_FILE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC, GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index d1b09cc..c42dc07 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -316,6 +316,8 @@ impl Libp2pEventHandler { match message { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, + // TODO qbit: handle the NewFile pubsub message + PubsubMessage::NewFile(_) => todo!(), PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); self.on_find_file(msg).await From d12bf66129a9754e1264dd48442eacc72994c8f9 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 17:05:55 +0800 Subject: [PATCH 02/19] Publish NewFile message when any file finalized --- node/router/src/service.rs | 26 ++++++++++++++++---------- node/sync/src/controllers/serial.rs | 2 ++ node/sync/src/service.rs | 5 ++++- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 938c2c3..94bbeb5 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -5,11 +5,14 @@ use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use futures::{channel::mpsc::Sender, prelude::*}; use miner::MinerMessage; +use network::types::NewFile; +use network::PubsubMessage; use network::{ BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId, Service as LibP2PService, Swarm, }; use pruner::PrunerMessage; +use shared_types::timestamp_now; use std::sync::Arc; use storage::log_store::Store as LogStore; use storage_async::Store; @@ -44,6 +47,8 @@ pub struct RouterService { /// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP /// port). upnp_mappings: (Option, Option), + + store: Arc, } impl RouterService { @@ -63,7 +68,6 @@ impl RouterService { local_keypair: Keypair, config: Config, ) { - let store = Store::new(store, executor.clone()); let peers = Arc::new(RwLock::new(PeerManager::new(config.clone()))); // create the network service and spawn the task @@ -81,11 +85,12 @@ impl RouterService { sync_send, chunk_pool_send, local_keypair, - store, + Store::new(store.clone(), executor.clone()), file_location_cache, peers, ), upnp_mappings: (None, None), + store, }; // spawn service @@ -328,14 +333,15 @@ impl RouterService { } } NetworkMessage::AnnounceLocalFile { tx_id } => { - if self - .libp2p_event_handler - .publish_file(tx_id) - .await - .is_some() - { - metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); - } + let shard_config = self.store.get_shard_config(); + let msg = PubsubMessage::NewFile(NewFile { + tx_id, + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, + timestamp: timestamp_now(), + }); + self.libp2p.swarm.behaviour_mut().publish(vec![msg]); + metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); } NetworkMessage::UPnPMappingEstablished { tcp_socket, diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index dcf724a..6b4f5e4 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -545,6 +545,8 @@ impl SerialSyncController { info!(%self.tx_seq, "Succeeded to finalize file"); self.state = SyncState::Completed; metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0); + self.ctx + .send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id }); } Ok(false) => { warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx"); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 7efba4e..fee3f52 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -642,7 +642,10 @@ impl SyncService { Some(s) => s, None => { debug!(%tx.seq, "No more data needed"); - self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?; + if self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await? { + self.ctx + .send(NetworkMessage::AnnounceLocalFile { tx_id: tx.id() }); + } return Ok(()); } }; From 364640adc871498b1c8ae07943789759b980960c Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 18:09:52 +0800 Subject: [PATCH 03/19] handle NewFile message in router --- node/router/src/libp2p_event_handler.rs | 62 +++++++++++++++++++++++-- node/router/src/metrics.rs | 5 ++ node/sync/src/service.rs | 7 ++- 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index c42dc07..1ab2301 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -5,7 +5,7 @@ use std::{ops::Neg, sync::Arc}; use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use network::multiaddr::Protocol; -use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig}; +use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig}; use network::{ rpc::StatusMessage, types::{ @@ -29,6 +29,7 @@ use crate::peer_manager::PeerManager; use crate::Config; lazy_static::lazy_static! { + pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); @@ -316,8 +317,10 @@ impl Libp2pEventHandler { match message { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, - // TODO qbit: handle the NewFile pubsub message - PubsubMessage::NewFile(_) => todo!(), + PubsubMessage::NewFile(msg) => { + metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); + self.on_new_file(source, msg).await + } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); self.on_find_file(msg).await @@ -350,6 +353,59 @@ impl Libp2pEventHandler { } } + async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance { + // verify timestamp + let d = duration_since( + msg.timestamp, + metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY.clone(), + ); + if d < TOLERABLE_DRIFT.neg() || d > *NEW_FILE_TIMEOUT { + debug!(?d, ?msg, "Invalid timestamp, ignoring NewFile message"); + metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT.mark(1); + self.send_to_network(NetworkMessage::ReportPeer { + peer_id: from, + action: PeerAction::LowToleranceError, + source: ReportSource::Gossipsub, + msg: "Received out of date NewFile message", + }); + return MessageAcceptance::Ignore; + } + + // verify announced shard config + let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) { + Ok(v) => v, + Err(_) => return MessageAcceptance::Reject, + }; + + // ignore if already exists + match self.store.check_tx_completed(msg.tx_id.seq).await { + Ok(true) => return MessageAcceptance::Ignore, + Ok(false) => {} + Err(err) => { + warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx completed"); + return MessageAcceptance::Ignore; + } + } + + // ignore if already pruned + match self.store.check_tx_pruned(msg.tx_id.seq).await { + Ok(true) => return MessageAcceptance::Ignore, + Ok(false) => {} + Err(err) => { + warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx pruned"); + return MessageAcceptance::Ignore; + } + } + + // notify sync layer if shard config matches + let my_shard_config = self.store.get_store().get_shard_config(); + if my_shard_config.intersect(&announced_shard_config) { + self.send_to_sync(SyncMessage::NewFile { from, msg }); + } + + MessageAcceptance::Ignore + } + async fn construct_announced_ip(&self) -> Option { // public address configured if let Some(ip) = self.config.public_address { diff --git a/node/router/src/metrics.rs b/node/router/src/metrics.rs index 03c3081..141d034 100644 --- a/node/router/src/metrics.rs +++ b/node/router/src/metrics.rs @@ -44,6 +44,11 @@ lazy_static::lazy_static! { pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc = register_meter_with_group("router_libp2p_handle_response_error", "qps"); pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024); + // libp2p_event_handler: new file + pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "qps"); + pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_new_file", "latency", 1024); + pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT: Arc = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "timeout"); + // libp2p_event_handler: find & announce file pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index fee3f52..56fb896 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -8,7 +8,7 @@ use anyhow::{anyhow, bail, Result}; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; use log_entry_sync::LogSyncEvent; -use network::types::{AnnounceChunks, FindFile}; +use network::types::{AnnounceChunks, FindFile, NewFile}; use network::PubsubMessage; use network::{ rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId, @@ -70,6 +70,10 @@ pub enum SyncMessage { AnnounceChunksGossip { msg: AnnounceChunks, }, + NewFile { + from: PeerId, + msg: NewFile, + }, } #[derive(Debug)] @@ -265,6 +269,7 @@ impl SyncService { SyncMessage::AnnounceShardConfig { .. } => { // FIXME: Check if controllers need to be reset? } + SyncMessage::NewFile { from, msg } => todo!(), } } From 245ee0d903461f6a9a97a21c2c858d14dc492cc7 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 19:00:47 +0800 Subject: [PATCH 04/19] handle NewFile in sync servic to write in db --- node/sync/src/auto_sync/manager.rs | 28 ++++++++++++++++++++++++---- node/sync/src/service.rs | 15 ++++++++++++++- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index c880ffb..6a87cdf 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -15,12 +15,17 @@ use tokio::sync::{ use crate::{Config, SyncSender}; -use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore}; +use super::{ + batcher_random::RandomBatcher, + batcher_serial::SerialBatcher, + sync_store::{Queue, SyncStore}, +}; pub struct AutoSyncManager { pub serial: SerialBatcher, pub random: RandomBatcher, pub file_announcement_send: UnboundedSender, + pub new_file_send: UnboundedSender, pub catched_up: Arc, } @@ -33,10 +38,24 @@ impl AutoSyncManager { log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { - let (send, recv) = unbounded_channel(); + let (file_announcement_send, file_announcement_recv) = unbounded_channel(); + let (new_file_send, mut new_file_recv) = unbounded_channel(); let sync_store = Arc::new(SyncStore::new(store.clone())); let catched_up = Arc::new(AtomicBool::new(false)); + // handle new file + let sync_store_cloned = sync_store.clone(); + executor.spawn( + async move { + while let Some(tx_seq) = new_file_recv.recv().await { + if let Err(err) = sync_store_cloned.insert(tx_seq, Queue::Ready).await { + warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); + } + } + }, + "auto_sync_handle_new_file", + ); + // sync in sequence let serial = SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) @@ -44,7 +63,7 @@ impl AutoSyncManager { executor.spawn( serial .clone() - .start(recv, log_sync_recv, catched_up.clone()), + .start(file_announcement_recv, log_sync_recv, catched_up.clone()), "auto_sync_serial", ); @@ -67,7 +86,8 @@ impl AutoSyncManager { Ok(Self { serial, random, - file_announcement_send: send, + file_announcement_send, + new_file_send, catched_up, }) } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 56fb896..66c4a48 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -269,7 +269,7 @@ impl SyncService { SyncMessage::AnnounceShardConfig { .. } => { // FIXME: Check if controllers need to be reset? } - SyncMessage::NewFile { from, msg } => todo!(), + SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await, } } @@ -756,6 +756,19 @@ impl SyncService { } } + async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) { + debug!(%from, ?msg, "Received NewFile gossip"); + + if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) { + // Notify new peer found if file already in sync + // TODO qbit: do not require remote address since already TCP connected + // controller.on_peer_found(from, addr); + controller.transition(); + } else if let Some(manager) = &self.auto_sync_manager { + let _ = manager.new_file_send.send(msg.tx_id.seq); + } + } + /// Terminate file sync of `min_tx_seq`. /// If `is_reverted` is `true` (means confirmed transactions reverted), /// also terminate `tx_seq` greater than `min_tx_seq` From 98ec24b863dbf5d402aefb6648e3d1ece245b849 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 19:08:48 +0800 Subject: [PATCH 05/19] use propagation source to handle NewFile message --- node/router/src/libp2p_event_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 1ab2301..bccfb2b 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -319,7 +319,7 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(source, msg).await + self.on_new_file(propagation_source, msg).await } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); From ceb165d79b33b268812dfbd8dc92142cd2794894 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 15:30:07 +0800 Subject: [PATCH 06/19] Disable sequential sync and store new file in v2 sync store --- node/sync/src/auto_sync/batcher_random.rs | 13 ++-- node/sync/src/auto_sync/manager.rs | 75 +++++++++++++---------- node/sync/src/auto_sync/sync_store.rs | 8 +++ node/sync/src/service.rs | 5 +- 4 files changed, 62 insertions(+), 39 deletions(-) diff --git a/node/sync/src/auto_sync/batcher_random.rs b/node/sync/src/auto_sync/batcher_random.rs index 16cd5f2..3f872a6 100644 --- a/node/sync/src/auto_sync/batcher_random.rs +++ b/node/sync/src/auto_sync/batcher_random.rs @@ -59,14 +59,13 @@ impl RandomBatcher { pub async fn start(mut self, catched_up: Arc) { info!("Start to sync files"); - loop { - // disable file sync until catched up - if !catched_up.load(Ordering::Relaxed) { - trace!("Cannot sync file in catch-up phase"); - sleep(self.config.auto_sync_idle_interval).await; - continue; - } + // wait for log entry sync catched up + while !catched_up.load(Ordering::Relaxed) { + trace!("Cannot sync file in catch-up phase"); + sleep(self.config.auto_sync_idle_interval).await; + } + loop { if let Ok(state) = self.get_state().await { metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64); metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64); diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index 6a87cdf..187f6af 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -9,7 +9,7 @@ use storage_async::Store; use task_executor::TaskExecutor; use tokio::sync::{ broadcast, - mpsc::{unbounded_channel, UnboundedSender}, + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }; @@ -22,7 +22,7 @@ use super::{ }; pub struct AutoSyncManager { - pub serial: SerialBatcher, + pub serial: Option, pub random: RandomBatcher, pub file_announcement_send: UnboundedSender, pub new_file_send: UnboundedSender, @@ -35,60 +35,73 @@ impl AutoSyncManager { executor: &TaskExecutor, store: Store, sync_send: SyncSender, - log_sync_recv: broadcast::Receiver, + _log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { - let (file_announcement_send, file_announcement_recv) = unbounded_channel(); - let (new_file_send, mut new_file_recv) = unbounded_channel(); - let sync_store = Arc::new(SyncStore::new(store.clone())); + let (file_announcement_send, _file_announcement_recv) = unbounded_channel(); + let (new_file_send, new_file_recv) = unbounded_channel(); + // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors + let sync_store = Arc::new(SyncStore::new_with_name( + store.clone(), + "pendingv2", + "readyv2", + )); let catched_up = Arc::new(AtomicBool::new(false)); // handle new file - let sync_store_cloned = sync_store.clone(); executor.spawn( - async move { - while let Some(tx_seq) = new_file_recv.recv().await { - if let Err(err) = sync_store_cloned.insert(tx_seq, Queue::Ready).await { - warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); - } - } - }, + Self::handle_new_file(new_file_recv, sync_store.clone()), "auto_sync_handle_new_file", ); // sync in sequence - let serial = - SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) - .await?; - executor.spawn( - serial - .clone() - .start(file_announcement_recv, log_sync_recv, catched_up.clone()), - "auto_sync_serial", - ); + // let serial = + // SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) + // .await?; + // executor.spawn( + // serial + // .clone() + // .start(file_announcement_recv, log_sync_recv, catched_up.clone()), + // "auto_sync_serial", + // ); // sync randomly let random = RandomBatcher::new(config, store, sync_send, sync_store); executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random"); // handle on catched up notification - let catched_up_cloned = catched_up.clone(); executor.spawn( - async move { - if catch_up_end_recv.await.is_ok() { - info!("log entry catched up"); - catched_up_cloned.store(true, Ordering::Relaxed); - } - }, + Self::listen_catch_up(catch_up_end_recv, catched_up.clone()), "auto_sync_wait_for_catchup", ); Ok(Self { - serial, + serial: None, random, file_announcement_send, new_file_send, catched_up, }) } + + async fn handle_new_file( + mut new_file_recv: UnboundedReceiver, + sync_store: Arc, + ) { + while let Some(tx_seq) = new_file_recv.recv().await { + if let Err(err) = sync_store.insert(tx_seq, Queue::Ready).await { + warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); + } + } + } + + async fn listen_catch_up( + catch_up_end_recv: oneshot::Receiver<()>, + catched_up: Arc, + ) { + if catch_up_end_recv.await.is_ok() { + info!("log entry catched up"); + catched_up.store(true, Ordering::Relaxed); + } + } } diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index 796dc1a..825b858 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -42,6 +42,14 @@ impl SyncStore { } } + pub fn new_with_name(store: Store, pending: &'static str, ready: &'static str) -> Self { + Self { + store: Arc::new(RwLock::new(store)), + pending_txs: TxStore::new(pending), + ready_txs: TxStore::new(ready), + } + } + /// Returns the number of pending txs and ready txs. pub async fn stat(&self) -> Result<(usize, usize)> { let async_store = self.store.read().await; diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 66c4a48..d7de9c0 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -284,7 +284,10 @@ impl SyncService { Some(manager) => SyncServiceState { num_syncing: self.controllers.len(), catched_up: Some(manager.catched_up.load(Ordering::Relaxed)), - auto_sync_serial: Some(manager.serial.get_state().await), + auto_sync_serial: match &manager.serial { + Some(v) => Some(v.get_state().await), + None => None, + }, auto_sync_random: manager.random.get_state().await.ok(), }, None => SyncServiceState { From 09b34fbf07f84f758724f3e79cd00e7b1ace0d7f Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 15:43:28 +0800 Subject: [PATCH 07/19] Add shard config in FindFile --- node/network/src/types/pubsub.rs | 2 ++ node/router/src/libp2p_event_handler.rs | 24 ++++++++++++++++++++++-- node/sync/src/controllers/serial.rs | 3 +++ node/sync/src/service.rs | 3 +++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 700878b..717fe0c 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -125,6 +125,8 @@ pub struct NewFile { #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct FindFile { pub tx_id: TxID, + pub num_shard: usize, + pub shard_id: usize, pub timestamp: u32, } diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index bccfb2b..9dff174 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -544,7 +544,9 @@ impl Libp2pEventHandler { } async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance { - let FindFile { tx_id, timestamp } = msg; + let FindFile { + tx_id, timestamp, .. + } = msg; // verify timestamp let d = duration_since( @@ -557,6 +559,19 @@ impl Libp2pEventHandler { return MessageAcceptance::Ignore; } + // verify announced shard config + let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) { + Ok(v) => v, + Err(_) => return MessageAcceptance::Reject, + }; + + // propagate FindFile query to other nodes if shard mismatch + let my_shard_config = self.store.get_store().get_shard_config(); + if !my_shard_config.intersect(&announced_shard_config) { + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1); + return MessageAcceptance::Accept; + } + // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { @@ -1261,7 +1276,12 @@ mod tests { ) -> MessageAcceptance { let (alice, bob) = (PeerId::random(), PeerId::random()); let id = MessageId::new(b"dummy message"); - let message = PubsubMessage::FindFile(FindFile { tx_id, timestamp }); + let message = PubsubMessage::FindFile(FindFile { + tx_id, + num_shard: 1, + shard_id: 0, + timestamp, + }); handler.on_pubsub_message(alice, bob, &id, message).await } diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 6b4f5e4..d4072bb 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -199,8 +199,11 @@ impl SerialSyncController { return (false, num_new_peers); } + let shard_config = self.store.get_store().get_shard_config(); self.ctx.publish(PubsubMessage::FindFile(FindFile { tx_id: self.tx_id, + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, timestamp: timestamp_now(), })); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index d7de9c0..9c164a3 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -585,8 +585,11 @@ impl SyncService { Some(tx) => tx, None => bail!("Transaction not found"), }; + let shard_config = self.store.get_store().get_shard_config(); self.ctx.publish(PubsubMessage::FindFile(FindFile { tx_id: tx.id(), + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, timestamp: timestamp_now(), })); Ok(()) From 35c4760724dc9a4d50d92a19af417f03d336df6a Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 18:01:24 +0800 Subject: [PATCH 08/19] Add AnnounceFile RPC message in network layer --- node/network/src/behaviour/mod.rs | 10 ++++++++++ node/network/src/peer_manager/mod.rs | 3 +++ node/network/src/rpc/codec/ssz_snappy.rs | 8 ++++++++ node/network/src/rpc/methods.rs | 6 ++++++ node/network/src/rpc/mod.rs | 1 + node/network/src/rpc/outbound.rs | 12 ++++++++++++ node/network/src/rpc/protocol.rs | 21 +++++++++++++++++++++ node/network/src/rpc/rate_limiter.rs | 11 +++++++++++ node/router/src/libp2p_event_handler.rs | 7 +++++++ node/sync/src/service.rs | 22 ++++++++++++++++++++++ 10 files changed, 101 insertions(+) diff --git a/node/network/src/behaviour/mod.rs b/node/network/src/behaviour/mod.rs index f3c6c26..fb562ac 100644 --- a/node/network/src/behaviour/mod.rs +++ b/node/network/src/behaviour/mod.rs @@ -6,6 +6,7 @@ use crate::peer_manager::{ ConnectionDirection, PeerManager, PeerManagerEvent, }; use crate::rpc::methods::DataByHashRequest; +use crate::rpc::methods::FileAnnouncement; use crate::rpc::methods::GetChunksRequest; use crate::rpc::*; use crate::service::Context as ServiceContext; @@ -546,6 +547,9 @@ impl Behaviour { Request::DataByHash { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"]) } + Request::AnnounceFile { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"]) + } Request::GetChunks { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"]) } @@ -758,6 +762,9 @@ where InboundRequest::DataByHash(req) => { self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req)) } + InboundRequest::AnnounceFile(req) => { + self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req)) + } InboundRequest::GetChunks(req) => { self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req)) } @@ -972,6 +979,8 @@ pub enum Request { Status(StatusMessage), /// A data by hash request. DataByHash(DataByHashRequest), + /// An AnnounceFile message. + AnnounceFile(FileAnnouncement), /// A GetChunks request. GetChunks(GetChunksRequest), } @@ -981,6 +990,7 @@ impl std::convert::From for OutboundRequest { match req { Request::Status(s) => OutboundRequest::Status(s), Request::DataByHash(r) => OutboundRequest::DataByHash(r), + Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r), Request::GetChunks(r) => OutboundRequest::GetChunks(r), } } diff --git a/node/network/src/peer_manager/mod.rs b/node/network/src/peer_manager/mod.rs index f7b0f76..e9b33d0 100644 --- a/node/network/src/peer_manager/mod.rs +++ b/node/network/src/peer_manager/mod.rs @@ -460,6 +460,7 @@ impl PeerManager { Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, Protocol::DataByHash => PeerAction::MidToleranceError, + Protocol::AnnounceFile => PeerAction::MidToleranceError, Protocol::GetChunks => PeerAction::MidToleranceError, }, }, @@ -474,6 +475,7 @@ impl PeerManager { Protocol::Goodbye => return, Protocol::Status => PeerAction::LowToleranceError, Protocol::DataByHash => return, + Protocol::AnnounceFile => return, Protocol::GetChunks => return, } } @@ -488,6 +490,7 @@ impl PeerManager { Protocol::Goodbye => return, Protocol::Status => return, Protocol::DataByHash => PeerAction::MidToleranceError, + Protocol::AnnounceFile => PeerAction::MidToleranceError, Protocol::GetChunks => PeerAction::MidToleranceError, }, }, diff --git a/node/network/src/rpc/codec/ssz_snappy.rs b/node/network/src/rpc/codec/ssz_snappy.rs index 14ecec8..d0e9370 100644 --- a/node/network/src/rpc/codec/ssz_snappy.rs +++ b/node/network/src/rpc/codec/ssz_snappy.rs @@ -159,6 +159,7 @@ impl Encoder for SSZSnappyOutboundCodec { OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(), + OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(), OutboundRequest::GetChunks(req) => req.as_ssz_bytes(), }; // SSZ encoded bytes should be within `max_packet_size` @@ -346,6 +347,9 @@ fn handle_v1_request( Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest { hashes: VariableList::from_ssz_bytes(decoded_buffer)?, }))), + Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile( + FileAnnouncement::from_ssz_bytes(decoded_buffer)?, + ))), Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks( GetChunksRequest::from_ssz_bytes(decoded_buffer)?, ))), @@ -373,6 +377,10 @@ fn handle_v1_response( Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new( ZgsData::from_ssz_bytes(decoded_buffer)?, )))), + // This case should be unreachable as `AnnounceFile` has no response. + Protocol::AnnounceFile => Err(RPCError::InvalidData( + "AnnounceFile RPC message has no valid response".to_string(), + )), Protocol::GetChunks => Ok(Some(RPCResponse::Chunks( ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?, ))), diff --git a/node/network/src/rpc/methods.rs b/node/network/src/rpc/methods.rs index e2ec11e..8d8e381 100644 --- a/node/network/src/rpc/methods.rs +++ b/node/network/src/rpc/methods.rs @@ -178,6 +178,12 @@ pub struct DataByHashRequest { pub hashes: VariableList, } +// The message of `AnnounceFile` RPC message. +#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] +pub struct FileAnnouncement { + pub tx_id: TxID, +} + /// Request a chunk array from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] pub struct GetChunksRequest { diff --git a/node/network/src/rpc/mod.rs b/node/network/src/rpc/mod.rs index 18e66d6..4907876 100644 --- a/node/network/src/rpc/mod.rs +++ b/node/network/src/rpc/mod.rs @@ -118,6 +118,7 @@ impl RPC { .n_every(Protocol::Status, 5, Duration::from_secs(15)) .one_every(Protocol::Goodbye, Duration::from_secs(10)) .n_every(Protocol::DataByHash, 128, Duration::from_secs(10)) + .n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10)) .n_every(Protocol::GetChunks, 4096, Duration::from_secs(10)) .build() .expect("Configuration parameters are valid"); diff --git a/node/network/src/rpc/outbound.rs b/node/network/src/rpc/outbound.rs index 30a6bdc..6ba1f4a 100644 --- a/node/network/src/rpc/outbound.rs +++ b/node/network/src/rpc/outbound.rs @@ -34,6 +34,7 @@ pub enum OutboundRequest { Goodbye(GoodbyeReason), Ping(Ping), DataByHash(DataByHashRequest), + AnnounceFile(FileAnnouncement), GetChunks(GetChunksRequest), } @@ -72,6 +73,11 @@ impl OutboundRequest { Version::V1, Encoding::SSZSnappy, )], + OutboundRequest::AnnounceFile(_) => vec![ProtocolId::new( + Protocol::AnnounceFile, + Version::V1, + Encoding::SSZSnappy, + )], OutboundRequest::GetChunks(_) => vec![ProtocolId::new( Protocol::GetChunks, Version::V1, @@ -89,6 +95,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => 0, OutboundRequest::Ping(_) => 1, OutboundRequest::DataByHash(req) => req.hashes.len() as u64, + OutboundRequest::AnnounceFile(_) => 0, OutboundRequest::GetChunks(_) => 1, } } @@ -100,6 +107,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => Protocol::Goodbye, OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::DataByHash(_) => Protocol::DataByHash, + OutboundRequest::AnnounceFile(_) => Protocol::AnnounceFile, OutboundRequest::GetChunks(_) => Protocol::GetChunks, } } @@ -114,6 +122,7 @@ impl OutboundRequest { OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), + OutboundRequest::AnnounceFile(_) => unreachable!(), OutboundRequest::GetChunks(_) => unreachable!(), } } @@ -170,6 +179,9 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::DataByHash(req) => { write!(f, "Data by hash: {:?}", req) } + OutboundRequest::AnnounceFile(req) => { + write!(f, "AnnounceFile: {:?}", req) + } OutboundRequest::GetChunks(req) => { write!(f, "GetChunks: {:?}", req) } diff --git a/node/network/src/rpc/protocol.rs b/node/network/src/rpc/protocol.rs index 53de8df..a9bdf41 100644 --- a/node/network/src/rpc/protocol.rs +++ b/node/network/src/rpc/protocol.rs @@ -91,6 +91,8 @@ pub enum Protocol { /// TODO DataByHash, + /// The file announce protocol. + AnnounceFile, /// The Chunk sync protocol. GetChunks, } @@ -115,6 +117,7 @@ impl std::fmt::Display for Protocol { Protocol::Goodbye => "goodbye", Protocol::Ping => "ping", Protocol::DataByHash => "data_by_hash", + Protocol::AnnounceFile => "announce_file", Protocol::GetChunks => "get_chunks", }; f.write_str(repr) @@ -155,6 +158,7 @@ impl UpgradeInfo for RPCProtocol { ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy), + ProtocolId::new(Protocol::AnnounceFile, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy), ] } @@ -216,6 +220,10 @@ impl ProtocolId { // TODO RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX) } + Protocol::AnnounceFile => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), Protocol::GetChunks => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -243,6 +251,7 @@ impl ProtocolId { ::ssz_fixed_len(), ), + Protocol::AnnounceFile => RpcLimits::new(0, 0), // AnnounceFile request has no response Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX), } } @@ -325,6 +334,7 @@ pub enum InboundRequest { Goodbye(GoodbyeReason), Ping(Ping), DataByHash(DataByHashRequest), + AnnounceFile(FileAnnouncement), GetChunks(GetChunksRequest), } @@ -363,6 +373,11 @@ impl InboundRequest { Version::V1, Encoding::SSZSnappy, )], + InboundRequest::AnnounceFile(_) => vec![ProtocolId::new( + Protocol::AnnounceFile, + Version::V1, + Encoding::SSZSnappy, + )], InboundRequest::GetChunks(_) => vec![ProtocolId::new( Protocol::GetChunks, Version::V1, @@ -380,6 +395,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => 0, InboundRequest::DataByHash(req) => req.hashes.len() as u64, InboundRequest::Ping(_) => 1, + InboundRequest::AnnounceFile(_) => 0, InboundRequest::GetChunks(_) => 1, } } @@ -391,6 +407,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => Protocol::Goodbye, InboundRequest::Ping(_) => Protocol::Ping, InboundRequest::DataByHash(_) => Protocol::DataByHash, + InboundRequest::AnnounceFile(_) => Protocol::AnnounceFile, InboundRequest::GetChunks(_) => Protocol::GetChunks, } } @@ -405,6 +422,7 @@ impl InboundRequest { InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), + InboundRequest::AnnounceFile(_) => unreachable!(), InboundRequest::GetChunks(_) => unreachable!(), } } @@ -523,6 +541,9 @@ impl std::fmt::Display for InboundRequest { InboundRequest::DataByHash(req) => { write!(f, "Data by hash: {:?}", req) } + InboundRequest::AnnounceFile(req) => { + write!(f, "Announce File: {:?}", req) + } InboundRequest::GetChunks(req) => { write!(f, "Get Chunks: {:?}", req) } diff --git a/node/network/src/rpc/rate_limiter.rs b/node/network/src/rpc/rate_limiter.rs index c213146..9c95282 100644 --- a/node/network/src/rpc/rate_limiter.rs +++ b/node/network/src/rpc/rate_limiter.rs @@ -68,6 +68,8 @@ pub struct RPCRateLimiter { status_rl: Limiter, /// DataByHash rate limiter. data_by_hash_rl: Limiter, + /// AnnounceFile rate limiter. + announce_file_rl: Limiter, /// GetChunks rate limiter. get_chunks_rl: Limiter, } @@ -91,6 +93,8 @@ pub struct RPCRateLimiterBuilder { status_quota: Option, /// Quota for the DataByHash protocol. data_by_hash_quota: Option, + /// Quota for the AnnounceFile protocol. + announce_file_quota: Option, /// Quota for the GetChunks protocol. get_chunks_quota: Option, } @@ -109,6 +113,7 @@ impl RPCRateLimiterBuilder { Protocol::Status => self.status_quota = q, Protocol::Goodbye => self.goodbye_quota = q, Protocol::DataByHash => self.data_by_hash_quota = q, + Protocol::AnnounceFile => self.announce_file_quota = q, Protocol::GetChunks => self.get_chunks_quota = q, } self @@ -145,6 +150,9 @@ impl RPCRateLimiterBuilder { let data_by_hash_quota = self .data_by_hash_quota .ok_or("DataByHash quota not specified")?; + let announce_file_quota = self + .announce_file_quota + .ok_or("AnnounceFile quota not specified")?; let get_chunks_quota = self .get_chunks_quota .ok_or("GetChunks quota not specified")?; @@ -154,6 +162,7 @@ impl RPCRateLimiterBuilder { let status_rl = Limiter::from_quota(status_quota)?; let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?; + let announce_file_rl = Limiter::from_quota(announce_file_quota)?; let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?; // check for peers to prune every 30 seconds, starting in 30 seconds @@ -166,6 +175,7 @@ impl RPCRateLimiterBuilder { status_rl, goodbye_rl, data_by_hash_rl, + announce_file_rl, get_chunks_rl, init_time: Instant::now(), }) @@ -210,6 +220,7 @@ impl RPCRateLimiter { Protocol::Status => &mut self.status_rl, Protocol::Goodbye => &mut self.goodbye_rl, Protocol::DataByHash => &mut self.data_by_hash_rl, + Protocol::AnnounceFile => &mut self.announce_file_rl, Protocol::GetChunks => &mut self.get_chunks_rl, }; check(limiter) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 9dff174..5c28991 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -220,6 +220,13 @@ impl Libp2pEventHandler { }); metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1); } + Request::AnnounceFile(announcement) => { + self.send_to_sync(SyncMessage::AnnounceFile { + peer_id, + request_id, + announcement, + }); + } Request::DataByHash(_) => { // ignore } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 9c164a3..353b3ea 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, Result}; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; use log_entry_sync::LogSyncEvent; +use network::rpc::methods::FileAnnouncement; use network::types::{AnnounceChunks, FindFile, NewFile}; use network::PubsubMessage; use network::{ @@ -74,6 +75,11 @@ pub enum SyncMessage { from: PeerId, msg: NewFile, }, + AnnounceFile { + peer_id: PeerId, + request_id: PeerRequestId, + announcement: FileAnnouncement, + }, } #[derive(Debug)] @@ -270,6 +276,11 @@ impl SyncService { // FIXME: Check if controllers need to be reset? } SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await, + SyncMessage::AnnounceFile { + peer_id, + announcement, + .. + } => self.on_announce_file(peer_id, announcement).await, } } @@ -762,6 +773,7 @@ impl SyncService { } } + /// Handle on NewFile gossip message received. async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) { debug!(%from, ?msg, "Received NewFile gossip"); @@ -775,6 +787,16 @@ impl SyncService { } } + /// Handle on AnnounceFile RPC message received. + async fn on_announce_file(&mut self, _peer_id: PeerId, announcement: FileAnnouncement) { + if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) { + // Notify new peer found if file already in sync + // TODO qbit: do not require remote address since already TCP connected + // controller.on_peer_found(from, addr); + controller.transition(); + } + } + /// Terminate file sync of `min_tx_seq`. /// If `is_reverted` is `true` (means confirmed transactions reverted), /// also terminate `tx_seq` greater than `min_tx_seq` From 25524bd9d2dfdb55486e2d7c635e45fdbc827861 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 18:38:41 +0800 Subject: [PATCH 09/19] do not propagate FindFile to whole network --- node/network/src/types/pubsub.rs | 1 + node/router/src/libp2p_event_handler.rs | 46 ++++++++++++++++++++----- node/sync/src/controllers/serial.rs | 1 + node/sync/src/service.rs | 1 + 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 717fe0c..569860a 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -127,6 +127,7 @@ pub struct FindFile { pub tx_id: TxID, pub num_shard: usize, pub shard_id: usize, + pub neighbors_only: bool, pub timestamp: u32, } diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 5c28991..2922bae 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -5,6 +5,7 @@ use std::{ops::Neg, sync::Arc}; use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use network::multiaddr::Protocol; +use network::rpc::methods::FileAnnouncement; use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig}; use network::{ rpc::StatusMessage, @@ -30,7 +31,7 @@ use crate::Config; lazy_static::lazy_static! { pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); - pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); + pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10); @@ -326,11 +327,11 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(propagation_source, msg).await + self.on_new_file(source, msg).await } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); - self.on_find_file(msg).await + self.on_find_file(source, msg).await } PubsubMessage::FindChunks(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1); @@ -550,7 +551,7 @@ impl Libp2pEventHandler { Some(PubsubMessage::AnnounceShardConfig(signed)) } - async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance { + async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance { let FindFile { tx_id, timestamp, .. } = msg; @@ -563,6 +564,14 @@ impl Libp2pEventHandler { if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message"); metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1); + if msg.neighbors_only { + self.send_to_network(NetworkMessage::ReportPeer { + peer_id, + action: PeerAction::LowToleranceError, + source: ReportSource::Gossipsub, + msg: "Received out of date FindFile message", + }); + } return MessageAcceptance::Ignore; } @@ -572,20 +581,33 @@ impl Libp2pEventHandler { Err(_) => return MessageAcceptance::Reject, }; - // propagate FindFile query to other nodes if shard mismatch + // ignore if shard config mismatch let my_shard_config = self.store.get_store().get_shard_config(); if !my_shard_config.intersect(&announced_shard_config) { - metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1); - return MessageAcceptance::Accept; + return if msg.neighbors_only { + MessageAcceptance::Ignore + } else { + MessageAcceptance::Accept + }; } + // update peer shard config in cache + self.file_location_cache + .insert_peer_config(peer_id, announced_shard_config); + // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { if tx.id() == tx_id { trace!(?tx_id, "Found file locally, responding to FindFile query"); - if self.publish_file(tx_id).await.is_some() { + if msg.neighbors_only { + self.send_to_network(NetworkMessage::SendRequest { + peer_id, + request: Request::AnnounceFile(FileAnnouncement { tx_id }), + request_id: RequestId::Router(Instant::now()), + }); + } else if self.publish_file(tx_id).await.is_some() { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1); return MessageAcceptance::Ignore; } @@ -593,6 +615,11 @@ impl Libp2pEventHandler { } } + if msg.neighbors_only { + // do not forward to other peers anymore + return MessageAcceptance::Ignore + } + // try from cache if let Some(mut msg) = self.file_location_cache.get_one(tx_id) { trace!(?tx_id, "Found file in cache, responding to FindFile query"); @@ -914,7 +941,7 @@ impl Libp2pEventHandler { } } - pub async fn publish_file(&self, tx_id: TxID) -> Option { + async fn publish_file(&self, tx_id: TxID) -> Option { match self.file_batcher.write().await.add(tx_id) { Some(batch) => { let announcement = self.construct_announce_file_message(batch).await?; @@ -1287,6 +1314,7 @@ mod tests { tx_id, num_shard: 1, shard_id: 0, + neighbors_only: false, timestamp, }); handler.on_pubsub_message(alice, bob, &id, message).await diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index d4072bb..44f093e 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -204,6 +204,7 @@ impl SerialSyncController { tx_id: self.tx_id, num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, + neighbors_only: false, timestamp: timestamp_now(), })); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 353b3ea..c577e09 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -601,6 +601,7 @@ impl SyncService { tx_id: tx.id(), num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, + neighbors_only: false, timestamp: timestamp_now(), })); Ok(()) From cebc3c8247d19506eb76ecf39b258ebc601e9bda Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 19:29:08 +0800 Subject: [PATCH 10/19] Mark peer connected if FileAnnouncement RPC message received --- node/network/src/rpc/methods.rs | 2 + node/router/src/libp2p_event_handler.rs | 33 ++++++++++++---- node/sync/src/controllers/serial.rs | 52 +++++++++++++++++++------ node/sync/src/service.rs | 22 ++++++----- 4 files changed, 81 insertions(+), 28 deletions(-) diff --git a/node/network/src/rpc/methods.rs b/node/network/src/rpc/methods.rs index 8d8e381..554eb8b 100644 --- a/node/network/src/rpc/methods.rs +++ b/node/network/src/rpc/methods.rs @@ -182,6 +182,8 @@ pub struct DataByHashRequest { #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] pub struct FileAnnouncement { pub tx_id: TxID, + pub num_shard: usize, + pub shard_id: usize, } /// Request a chunk array from a peer. diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 2922bae..57fa79d 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -222,11 +222,23 @@ impl Libp2pEventHandler { metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1); } Request::AnnounceFile(announcement) => { - self.send_to_sync(SyncMessage::AnnounceFile { - peer_id, - request_id, - announcement, - }); + match ShardConfig::new(announcement.shard_id, announcement.num_shard) { + Ok(v) => { + self.file_location_cache.insert_peer_config(peer_id, v); + + self.send_to_sync(SyncMessage::AnnounceFile { + peer_id, + request_id, + announcement, + }); + } + Err(_) => self.send_to_network(NetworkMessage::ReportPeer { + peer_id, + action: PeerAction::Fatal, + source: ReportSource::RPC, + msg: "Invalid shard config in FileAnnouncement", + }), + } } Request::DataByHash(_) => { // ignore @@ -411,6 +423,9 @@ impl Libp2pEventHandler { self.send_to_sync(SyncMessage::NewFile { from, msg }); } + self.file_location_cache + .insert_peer_config(from, announced_shard_config); + MessageAcceptance::Ignore } @@ -604,7 +619,11 @@ impl Libp2pEventHandler { if msg.neighbors_only { self.send_to_network(NetworkMessage::SendRequest { peer_id, - request: Request::AnnounceFile(FileAnnouncement { tx_id }), + request: Request::AnnounceFile(FileAnnouncement { + tx_id, + num_shard: my_shard_config.num_shard, + shard_id: my_shard_config.shard_id, + }), request_id: RequestId::Router(Instant::now()), }); } else if self.publish_file(tx_id).await.is_some() { @@ -617,7 +636,7 @@ impl Libp2pEventHandler { if msg.neighbors_only { // do not forward to other peers anymore - return MessageAcceptance::Ignore + return MessageAcceptance::Ignore; } // try from cache diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 44f093e..33c61ef 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -14,12 +14,13 @@ use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE}; use ssz::Encode; use std::{sync::Arc, time::Instant}; use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE}; -use storage_async::Store; +use storage_async::{ShardConfig, Store}; #[derive(Clone, Debug, PartialEq, Eq)] pub enum FailureReason { DBError(String), TxReverted(TxID), + TimeoutFindFile, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -88,6 +89,9 @@ pub struct SerialSyncController { /// Cache for storing and serving gossip messages. file_location_cache: Arc, + + /// Whether to find files from neighbors only. + neighbors_only: bool, } impl SerialSyncController { @@ -114,6 +118,7 @@ impl SerialSyncController { ctx, store, file_location_cache, + neighbors_only: true, } } @@ -159,11 +164,14 @@ impl SerialSyncController { /// Find more peers to sync chunks. Return whether `FindFile` pubsub message published, fn try_find_peers(&mut self) { - let (published, num_new_peers) = if self.goal.is_all_chunks() { - self.publish_find_file() - } else { + let (published, num_new_peers) = if !self.goal.is_all_chunks() { self.publish_find_chunks(); (true, 0) + } else if self.neighbors_only { + self.do_publish_find_file(); + (true, 0) + } else { + self.publish_find_file() }; info!(%self.tx_seq, %published, %num_new_peers, "Finding peers"); @@ -199,16 +207,21 @@ impl SerialSyncController { return (false, num_new_peers); } + self.do_publish_find_file(); + + (true, num_new_peers) + } + + fn do_publish_find_file(&self) { let shard_config = self.store.get_store().get_shard_config(); + self.ctx.publish(PubsubMessage::FindFile(FindFile { tx_id: self.tx_id, num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, - neighbors_only: false, + neighbors_only: self.neighbors_only, timestamp: timestamp_now(), })); - - (true, num_new_peers) } fn publish_find_chunks(&self) { @@ -341,6 +354,14 @@ impl SerialSyncController { } } + /// Triggered when any peer (TCP connected) announced file via RPC message. + pub fn on_peer_announced(&mut self, peer_id: PeerId, shard_config: ShardConfig) { + self.peers + .add_new_peer_with_config(peer_id, Multiaddr::empty(), shard_config); + self.peers + .update_state_force(&peer_id, PeerState::Connected); + } + pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) { match err { DialError::ConnectionLimit(_) => { @@ -643,12 +664,19 @@ impl SerialSyncController { { self.state = SyncState::FoundPeers; } else { - // storage node may not have the specific file when `FindFile` - // gossip message received. In this case, just broadcast the - // `FindFile` message again. + // FindFile timeout if since.elapsed() >= self.config.peer_find_timeout { - debug!(%self.tx_seq, "Finding peer timeout and try to find peers again"); - self.try_find_peers(); + if self.neighbors_only { + self.state = SyncState::Failed { + reason: FailureReason::TimeoutFindFile, + }; + } else { + // storage node may not have the specific file when `FindFile` + // gossip message received. In this case, just broadcast the + // `FindFile` message again. + debug!(%self.tx_seq, "Finding peer timeout and try to find peers again"); + self.try_find_peers(); + } } completed = true; diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index c577e09..22296bb 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -779,22 +779,26 @@ impl SyncService { debug!(%from, ?msg, "Received NewFile gossip"); if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) { - // Notify new peer found if file already in sync - // TODO qbit: do not require remote address since already TCP connected - // controller.on_peer_found(from, addr); - controller.transition(); + // Notify new peer announced if file already in sync + if let Ok(shard_config) = ShardConfig::new(msg.shard_id, msg.num_shard) { + controller.on_peer_announced(from, shard_config); + controller.transition(); + } } else if let Some(manager) = &self.auto_sync_manager { let _ = manager.new_file_send.send(msg.tx_id.seq); } } /// Handle on AnnounceFile RPC message received. - async fn on_announce_file(&mut self, _peer_id: PeerId, announcement: FileAnnouncement) { + async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) { if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) { - // Notify new peer found if file already in sync - // TODO qbit: do not require remote address since already TCP connected - // controller.on_peer_found(from, addr); - controller.transition(); + // Notify new peer announced if file already in sync + if let Ok(shard_config) = + ShardConfig::new(announcement.shard_id, announcement.num_shard) + { + controller.on_peer_announced(peer_id, shard_config); + controller.transition(); + } } } From 0f93b035f1ff9dce93ac64e33ab6867b5ef9b97e Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 20:01:57 +0800 Subject: [PATCH 11/19] fix unit test failures --- node/sync/src/controllers/serial.rs | 11 ++++------- node/sync/src/lib.rs | 2 ++ node/sync/src/service.rs | 4 +++- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 33c61ef..acb5bff 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -89,9 +89,6 @@ pub struct SerialSyncController { /// Cache for storing and serving gossip messages. file_location_cache: Arc, - - /// Whether to find files from neighbors only. - neighbors_only: bool, } impl SerialSyncController { @@ -118,7 +115,6 @@ impl SerialSyncController { ctx, store, file_location_cache, - neighbors_only: true, } } @@ -167,7 +163,7 @@ impl SerialSyncController { let (published, num_new_peers) = if !self.goal.is_all_chunks() { self.publish_find_chunks(); (true, 0) - } else if self.neighbors_only { + } else if self.config.neighbors_only { self.do_publish_find_file(); (true, 0) } else { @@ -219,7 +215,7 @@ impl SerialSyncController { tx_id: self.tx_id, num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, - neighbors_only: self.neighbors_only, + neighbors_only: self.config.neighbors_only, timestamp: timestamp_now(), })); } @@ -666,7 +662,7 @@ impl SerialSyncController { } else { // FindFile timeout if since.elapsed() >= self.config.peer_find_timeout { - if self.neighbors_only { + if self.config.neighbors_only { self.state = SyncState::Failed { reason: FailureReason::TimeoutFindFile, }; @@ -1547,6 +1543,7 @@ mod tests { controller.on_response(peer_id, chunks).await; assert_eq!(*controller.get_status(), SyncState::Completed); + assert!(matches!(network_recv.try_recv().unwrap(), NetworkMessage::AnnounceLocalFile { .. })); assert!(network_recv.try_recv().is_err()); } diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 3673589..b7067a3 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -21,6 +21,7 @@ use std::{ #[serde(default)] pub struct Config { // sync service config + pub neighbors_only: bool, #[serde(deserialize_with = "deserialize_duration")] pub heartbeat_interval: Duration, pub auto_sync_enabled: bool, @@ -64,6 +65,7 @@ impl Default for Config { fn default() -> Self { Self { // sync service config + neighbors_only: false, heartbeat_interval: Duration::from_secs(5), auto_sync_enabled: false, max_sync_files: 8, diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 22296bb..e119f5e 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1558,6 +1558,7 @@ mod tests { .await; wait_for_tx_finalized(runtime.store.clone(), tx_seq).await; + assert!(matches!(runtime.network_recv.try_recv().unwrap(), NetworkMessage::AnnounceLocalFile { .. })); assert!(!runtime.store.check_tx_completed(0).unwrap()); @@ -1567,7 +1568,7 @@ mod tests { .request(SyncRequest::SyncFile { tx_seq }) .await .unwrap(); - + receive_dial(&mut runtime, &sync_send).await; receive_chunk_request( @@ -1582,6 +1583,7 @@ mod tests { .await; wait_for_tx_finalized(runtime.store, tx_seq).await; + assert!(matches!(runtime.network_recv.try_recv().unwrap(), NetworkMessage::AnnounceLocalFile { .. })); sync_send .notify(SyncMessage::PeerDisconnected { From 43afc79a4a3f9c4eac0f0943a2f1b69b2979a380 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 10:53:16 +0800 Subject: [PATCH 12/19] Change P2P protocol version --- node/network/src/lib.rs | 5 ++++- node/router/src/libp2p_event_handler.rs | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index c6a21d8..09a0dfb 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -93,7 +93,10 @@ pub use peer_manager::{ }; pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME}; -pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0]; +/// Defines the current P2P protocol version. +/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey. +/// - v2: Publish NewFile to neighbors only and announce file via RPC message. +pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0]; /// Application level requests sent to the network. #[derive(Debug, Clone, Copy)] diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 57fa79d..fa62a5f 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -397,6 +397,9 @@ impl Libp2pEventHandler { Err(_) => return MessageAcceptance::Reject, }; + // update shard config cache + self.file_location_cache.insert_peer_config(from, announced_shard_config); + // ignore if already exists match self.store.check_tx_completed(msg.tx_id.seq).await { Ok(true) => return MessageAcceptance::Ignore, @@ -423,9 +426,6 @@ impl Libp2pEventHandler { self.send_to_sync(SyncMessage::NewFile { from, msg }); } - self.file_location_cache - .insert_peer_config(from, announced_shard_config); - MessageAcceptance::Ignore } From 3e46e10a72584d6ddb701229593e8402600790cd Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 10:53:49 +0800 Subject: [PATCH 13/19] Ignore py tests of sequential auto sync --- ...uto_sequential_test.py => sync_auto_sequential_test_ignore.py} | 0 tests/{sync_auto_test.py => sync_auto_test_ignore.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/{sync_auto_sequential_test.py => sync_auto_sequential_test_ignore.py} (100%) rename tests/{sync_auto_test.py => sync_auto_test_ignore.py} (100%) diff --git a/tests/sync_auto_sequential_test.py b/tests/sync_auto_sequential_test_ignore.py similarity index 100% rename from tests/sync_auto_sequential_test.py rename to tests/sync_auto_sequential_test_ignore.py diff --git a/tests/sync_auto_test.py b/tests/sync_auto_test_ignore.py similarity index 100% rename from tests/sync_auto_test.py rename to tests/sync_auto_test_ignore.py From 247b1aaf8f2a61de110a24d0ab3bdee0284d8cfc Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 11:09:30 +0800 Subject: [PATCH 14/19] Add py test for auto sync v2 --- tests/sync_auto_random_v2_test.py | 36 +++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 tests/sync_auto_random_v2_test.py diff --git a/tests/sync_auto_random_v2_test.py b/tests/sync_auto_random_v2_test.py new file mode 100644 index 0000000..6411177 --- /dev/null +++ b/tests/sync_auto_random_v2_test.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +from test_framework.test_framework import TestFramework +from utility.utils import wait_until + +class AutoRandomSyncV2Test(TestFramework): + def setup_params(self): + self.num_nodes = 4 + + # Enable random auto sync v2 + for i in range(self.num_nodes): + self.zgs_node_configs[i] = { + "sync": { + "auto_sync_enabled": True, + "max_sequential_workers": 0, + "max_random_workers": 3, + "neighbors_only": True, + } + } + + def run_test(self): + # Submit and upload files on node 0 + data_root_1 = self.__upload_file__(0, 256 * 1024) + data_root_2 = self.__upload_file__(0, 256 * 1024) + + # Files should be available on other nodes via auto sync + for i in range(1, self.num_nodes): + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"]) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]) + + assert 1 > 2 + +if __name__ == "__main__": + AutoRandomSyncV2Test().main() From 98438fd0e5250d80067bf044a140e13952611432 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 11:31:54 +0800 Subject: [PATCH 15/19] fmt code --- node/router/src/libp2p_event_handler.rs | 3 ++- node/sync/src/controllers/serial.rs | 5 ++++- node/sync/src/service.rs | 12 +++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index fa62a5f..ef6cf02 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -398,7 +398,8 @@ impl Libp2pEventHandler { }; // update shard config cache - self.file_location_cache.insert_peer_config(from, announced_shard_config); + self.file_location_cache + .insert_peer_config(from, announced_shard_config); // ignore if already exists match self.store.check_tx_completed(msg.tx_id.seq).await { diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index acb5bff..3c75082 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -1543,7 +1543,10 @@ mod tests { controller.on_response(peer_id, chunks).await; assert_eq!(*controller.get_status(), SyncState::Completed); - assert!(matches!(network_recv.try_recv().unwrap(), NetworkMessage::AnnounceLocalFile { .. })); + assert!(matches!( + network_recv.try_recv().unwrap(), + NetworkMessage::AnnounceLocalFile { .. } + )); assert!(network_recv.try_recv().is_err()); } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index e119f5e..d155b46 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1558,7 +1558,10 @@ mod tests { .await; wait_for_tx_finalized(runtime.store.clone(), tx_seq).await; - assert!(matches!(runtime.network_recv.try_recv().unwrap(), NetworkMessage::AnnounceLocalFile { .. })); + assert!(matches!( + runtime.network_recv.try_recv().unwrap(), + NetworkMessage::AnnounceLocalFile { .. } + )); assert!(!runtime.store.check_tx_completed(0).unwrap()); @@ -1568,7 +1571,7 @@ mod tests { .request(SyncRequest::SyncFile { tx_seq }) .await .unwrap(); - + receive_dial(&mut runtime, &sync_send).await; receive_chunk_request( @@ -1583,7 +1586,10 @@ mod tests { .await; wait_for_tx_finalized(runtime.store, tx_seq).await; - assert!(matches!(runtime.network_recv.try_recv().unwrap(), NetworkMessage::AnnounceLocalFile { .. })); + assert!(matches!( + runtime.network_recv.try_recv().unwrap(), + NetworkMessage::AnnounceLocalFile { .. } + )); sync_send .notify(SyncMessage::PeerDisconnected { From da903fefe72b584624ec91feb231da06f1a0621e Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 11:56:21 +0800 Subject: [PATCH 16/19] remove dummy code in py test --- tests/sync_auto_random_v2_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/sync_auto_random_v2_test.py b/tests/sync_auto_random_v2_test.py index 6411177..0196327 100644 --- a/tests/sync_auto_random_v2_test.py +++ b/tests/sync_auto_random_v2_test.py @@ -30,7 +30,5 @@ class AutoRandomSyncV2Test(TestFramework): wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]) - assert 1 > 2 - if __name__ == "__main__": AutoRandomSyncV2Test().main() From 98c62b314e0140941cc02810ad6efa10a241031a Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 12:23:08 +0800 Subject: [PATCH 17/19] fix random test failure --- node/sync/src/auto_sync/manager.rs | 46 ++++++++++++++++++------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index 187f6af..68e3da9 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -35,17 +35,21 @@ impl AutoSyncManager { executor: &TaskExecutor, store: Store, sync_send: SyncSender, - _log_sync_recv: broadcast::Receiver, + log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { - let (file_announcement_send, _file_announcement_recv) = unbounded_channel(); + let (file_announcement_send, file_announcement_recv) = unbounded_channel(); let (new_file_send, new_file_recv) = unbounded_channel(); - // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors - let sync_store = Arc::new(SyncStore::new_with_name( - store.clone(), - "pendingv2", - "readyv2", - )); + let sync_store = if config.neighbors_only { + // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors + Arc::new(SyncStore::new_with_name( + store.clone(), + "pendingv2", + "readyv2", + )) + } else { + Arc::new(SyncStore::new(store.clone())) + }; let catched_up = Arc::new(AtomicBool::new(false)); // handle new file @@ -55,15 +59,21 @@ impl AutoSyncManager { ); // sync in sequence - // let serial = - // SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) - // .await?; - // executor.spawn( - // serial - // .clone() - // .start(file_announcement_recv, log_sync_recv, catched_up.clone()), - // "auto_sync_serial", - // ); + let serial = if config.neighbors_only { + None + } else { + let serial = + SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) + .await?; + executor.spawn( + serial + .clone() + .start(file_announcement_recv, log_sync_recv, catched_up.clone()), + "auto_sync_serial", + ); + + Some(serial) + }; // sync randomly let random = RandomBatcher::new(config, store, sync_send, sync_store); @@ -76,7 +86,7 @@ impl AutoSyncManager { ); Ok(Self { - serial: None, + serial, random, file_announcement_send, new_file_send, From 51a2305c2dea869c38463d83e328f5dc8acd4332 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 14:57:30 +0800 Subject: [PATCH 18/19] Add comments --- node/network/src/behaviour/gossip_cache.rs | 2 +- node/network/src/types/pubsub.rs | 2 + node/router/src/libp2p_event_handler.rs | 50 ++++++++++++---------- node/sync/src/controllers/serial.rs | 1 + node/sync/src/lib.rs | 3 ++ node/sync/src/service.rs | 10 ++--- 6 files changed, 40 insertions(+), 28 deletions(-) diff --git a/node/network/src/behaviour/gossip_cache.rs b/node/network/src/behaviour/gossip_cache.rs index 78b38d3..eeafd62 100644 --- a/node/network/src/behaviour/gossip_cache.rs +++ b/node/network/src/behaviour/gossip_cache.rs @@ -39,7 +39,7 @@ pub struct GossipCacheBuilder { default_timeout: Option, /// Timeout for Example messages. example: Option, - /// Timeout for NewFile messges. + /// Timeout for NewFile messages. new_file: Option, /// Timeout for blocks FindFile messages. find_file: Option, diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 569860a..d5dd4e1 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -114,6 +114,7 @@ impl ssz::Decode for WrappedPeerId { } } +/// Published when file uploaded or completed to sync from other peers. #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct NewFile { pub tx_id: TxID, @@ -127,6 +128,7 @@ pub struct FindFile { pub tx_id: TxID, pub num_shard: usize, pub shard_id: usize, + /// Indicates whether publish to neighboar nodes only. pub neighbors_only: bool, pub timestamp: u32, } diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index ef6cf02..439dafa 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -30,8 +30,12 @@ use crate::peer_manager::PeerManager; use crate::Config; lazy_static::lazy_static! { + /// Timeout to publish NewFile message to neighbor nodes. pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); - pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); + /// Timeout to publish FindFile message to neighbor nodes. + pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); + /// Timeout to publish FindFile message in the whole network. + pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10); @@ -236,7 +240,7 @@ impl Libp2pEventHandler { peer_id, action: PeerAction::Fatal, source: ReportSource::RPC, - msg: "Invalid shard config in FileAnnouncement", + msg: "Invalid shard config in AnnounceFile RPC message", }), } } @@ -339,11 +343,11 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(source, msg).await + self.on_new_file(propagation_source, msg).await } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); - self.on_find_file(source, msg).await + self.on_find_file(propagation_source, msg).await } PubsubMessage::FindChunks(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1); @@ -373,6 +377,7 @@ impl Libp2pEventHandler { } } + /// Handle NewFile pubsub message `msg` that published by `from` peer. async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance { // verify timestamp let d = duration_since( @@ -397,9 +402,11 @@ impl Libp2pEventHandler { Err(_) => return MessageAcceptance::Reject, }; - // update shard config cache - self.file_location_cache - .insert_peer_config(from, announced_shard_config); + // ignore if shard config mismatch + let my_shard_config = self.store.get_store().get_shard_config(); + if !my_shard_config.intersect(&announced_shard_config) { + return MessageAcceptance::Ignore; + } // ignore if already exists match self.store.check_tx_completed(msg.tx_id.seq).await { @@ -421,11 +428,8 @@ impl Libp2pEventHandler { } } - // notify sync layer if shard config matches - let my_shard_config = self.store.get_store().get_shard_config(); - if my_shard_config.intersect(&announced_shard_config) { - self.send_to_sync(SyncMessage::NewFile { from, msg }); - } + // notify sync layer to handle in advance + self.send_to_sync(SyncMessage::NewFile { from, msg }); MessageAcceptance::Ignore } @@ -567,7 +571,7 @@ impl Libp2pEventHandler { Some(PubsubMessage::AnnounceShardConfig(signed)) } - async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance { + async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance { let FindFile { tx_id, timestamp, .. } = msg; @@ -577,12 +581,17 @@ impl Libp2pEventHandler { timestamp, metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(), ); - if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { + let timeout = if msg.neighbors_only { + *FIND_FILE_NEIGHBORS_TIMEOUT + } else { + *FIND_FILE_TIMEOUT + }; + if d < TOLERABLE_DRIFT.neg() || d > timeout { debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message"); metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1); if msg.neighbors_only { self.send_to_network(NetworkMessage::ReportPeer { - peer_id, + peer_id: from, action: PeerAction::LowToleranceError, source: ReportSource::Gossipsub, msg: "Received out of date FindFile message", @@ -597,7 +606,7 @@ impl Libp2pEventHandler { Err(_) => return MessageAcceptance::Reject, }; - // ignore if shard config mismatch + // handle on shard config mismatch let my_shard_config = self.store.get_store().get_shard_config(); if !my_shard_config.intersect(&announced_shard_config) { return if msg.neighbors_only { @@ -607,10 +616,6 @@ impl Libp2pEventHandler { }; } - // update peer shard config in cache - self.file_location_cache - .insert_peer_config(peer_id, announced_shard_config); - // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { @@ -618,8 +623,9 @@ impl Libp2pEventHandler { trace!(?tx_id, "Found file locally, responding to FindFile query"); if msg.neighbors_only { + // announce file via RPC to avoid flooding pubsub message self.send_to_network(NetworkMessage::SendRequest { - peer_id, + peer_id: from, request: Request::AnnounceFile(FileAnnouncement { tx_id, num_shard: my_shard_config.num_shard, @@ -635,8 +641,8 @@ impl Libp2pEventHandler { } } + // do not forward to whole network if only find file from neighbor nodes if msg.neighbors_only { - // do not forward to other peers anymore return MessageAcceptance::Ignore; } diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 3c75082..876f6d7 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -566,6 +566,7 @@ impl SerialSyncController { info!(%self.tx_seq, "Succeeded to finalize file"); self.state = SyncState::Completed; metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0); + // notify neighbor nodes about new file completed to sync self.ctx .send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id }); } diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index b7067a3..5f1a60e 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -21,6 +21,9 @@ use std::{ #[serde(default)] pub struct Config { // sync service config + /// Indicates whether to sync file from neighbor nodes only. + /// This is to avoid flooding file announcements in the whole network, + /// which leads to high latency or event timeout to sync files. pub neighbors_only: bool, #[serde(deserialize_with = "deserialize_duration")] pub heartbeat_interval: Duration, diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index d155b46..3fd12f6 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -774,7 +774,7 @@ impl SyncService { } } - /// Handle on NewFile gossip message received. + /// Handle on `NewFile` gossip message received. async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) { debug!(%from, ?msg, "Received NewFile gossip"); @@ -789,14 +789,14 @@ impl SyncService { } } - /// Handle on AnnounceFile RPC message received. - async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) { + /// Handle on `AnnounceFile` RPC message received. + async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) { + // Notify new peer announced if file already in sync if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) { - // Notify new peer announced if file already in sync if let Ok(shard_config) = ShardConfig::new(announcement.shard_id, announcement.num_shard) { - controller.on_peer_announced(peer_id, shard_config); + controller.on_peer_announced(from, shard_config); controller.transition(); } } From cc6cf0fdb23a384a007a0c342ed352cf49c110e6 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 15:01:32 +0800 Subject: [PATCH 19/19] Enable file sync protocol v2 in config file by default --- node/sync/src/lib.rs | 2 +- run/config-testnet-standard.toml | 4 ++++ run/config-testnet-turbo.toml | 4 ++++ run/config.toml | 4 ++++ 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 5f1a60e..3f08d85 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -23,7 +23,7 @@ pub struct Config { // sync service config /// Indicates whether to sync file from neighbor nodes only. /// This is to avoid flooding file announcements in the whole network, - /// which leads to high latency or event timeout to sync files. + /// which leads to high latency or even timeout to sync files. pub neighbors_only: bool, #[serde(deserialize_with = "deserialize_duration")] pub heartbeat_interval: Duration, diff --git a/run/config-testnet-standard.toml b/run/config-testnet-standard.toml index 0aa2166..2a88284 100644 --- a/run/config-testnet-standard.toml +++ b/run/config-testnet-standard.toml @@ -232,6 +232,10 @@ batcher_announcement_capacity = 100 # all files, and sufficient disk space is required. auto_sync_enabled = true +# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file +# announcements in the whole network, which leads to high latency or even timeout to sync files. +neighbors_only = true + # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/run/config-testnet-turbo.toml b/run/config-testnet-turbo.toml index c969bb4..d46951f 100644 --- a/run/config-testnet-turbo.toml +++ b/run/config-testnet-turbo.toml @@ -244,6 +244,10 @@ batcher_announcement_capacity = 100 # all files, and sufficient disk space is required. auto_sync_enabled = true +# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file +# announcements in the whole network, which leads to high latency or even timeout to sync files. +neighbors_only = true + # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8 diff --git a/run/config.toml b/run/config.toml index 6d0f4c8..193f989 100644 --- a/run/config.toml +++ b/run/config.toml @@ -246,6 +246,10 @@ # all files, and sufficient disk space is required. # auto_sync_enabled = false +# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file +# announcements in the whole network, which leads to high latency or even timeout to sync files. +neighbors_only = true + # Maximum number of files in sync from other peers simultaneously. # max_sync_files = 8