From 51a2305c2dea869c38463d83e328f5dc8acd4332 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 14:57:30 +0800 Subject: [PATCH] Add comments --- node/network/src/behaviour/gossip_cache.rs | 2 +- node/network/src/types/pubsub.rs | 2 + node/router/src/libp2p_event_handler.rs | 50 ++++++++++++---------- node/sync/src/controllers/serial.rs | 1 + node/sync/src/lib.rs | 3 ++ node/sync/src/service.rs | 10 ++--- 6 files changed, 40 insertions(+), 28 deletions(-) diff --git a/node/network/src/behaviour/gossip_cache.rs b/node/network/src/behaviour/gossip_cache.rs index 78b38d3..eeafd62 100644 --- a/node/network/src/behaviour/gossip_cache.rs +++ b/node/network/src/behaviour/gossip_cache.rs @@ -39,7 +39,7 @@ pub struct GossipCacheBuilder { default_timeout: Option, /// Timeout for Example messages. example: Option, - /// Timeout for NewFile messges. + /// Timeout for NewFile messages. new_file: Option, /// Timeout for blocks FindFile messages. find_file: Option, diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 569860a..d5dd4e1 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -114,6 +114,7 @@ impl ssz::Decode for WrappedPeerId { } } +/// Published when file uploaded or completed to sync from other peers. #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct NewFile { pub tx_id: TxID, @@ -127,6 +128,7 @@ pub struct FindFile { pub tx_id: TxID, pub num_shard: usize, pub shard_id: usize, + /// Indicates whether publish to neighboar nodes only. pub neighbors_only: bool, pub timestamp: u32, } diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index ef6cf02..439dafa 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -30,8 +30,12 @@ use crate::peer_manager::PeerManager; use crate::Config; lazy_static::lazy_static! { + /// Timeout to publish NewFile message to neighbor nodes. pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); - pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); + /// Timeout to publish FindFile message to neighbor nodes. + pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); + /// Timeout to publish FindFile message in the whole network. + pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10); @@ -236,7 +240,7 @@ impl Libp2pEventHandler { peer_id, action: PeerAction::Fatal, source: ReportSource::RPC, - msg: "Invalid shard config in FileAnnouncement", + msg: "Invalid shard config in AnnounceFile RPC message", }), } } @@ -339,11 +343,11 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(source, msg).await + self.on_new_file(propagation_source, msg).await } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); - self.on_find_file(source, msg).await + self.on_find_file(propagation_source, msg).await } PubsubMessage::FindChunks(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1); @@ -373,6 +377,7 @@ impl Libp2pEventHandler { } } + /// Handle NewFile pubsub message `msg` that published by `from` peer. async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance { // verify timestamp let d = duration_since( @@ -397,9 +402,11 @@ impl Libp2pEventHandler { Err(_) => return MessageAcceptance::Reject, }; - // update shard config cache - self.file_location_cache - .insert_peer_config(from, announced_shard_config); + // ignore if shard config mismatch + let my_shard_config = self.store.get_store().get_shard_config(); + if !my_shard_config.intersect(&announced_shard_config) { + return MessageAcceptance::Ignore; + } // ignore if already exists match self.store.check_tx_completed(msg.tx_id.seq).await { @@ -421,11 +428,8 @@ impl Libp2pEventHandler { } } - // notify sync layer if shard config matches - let my_shard_config = self.store.get_store().get_shard_config(); - if my_shard_config.intersect(&announced_shard_config) { - self.send_to_sync(SyncMessage::NewFile { from, msg }); - } + // notify sync layer to handle in advance + self.send_to_sync(SyncMessage::NewFile { from, msg }); MessageAcceptance::Ignore } @@ -567,7 +571,7 @@ impl Libp2pEventHandler { Some(PubsubMessage::AnnounceShardConfig(signed)) } - async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance { + async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance { let FindFile { tx_id, timestamp, .. } = msg; @@ -577,12 +581,17 @@ impl Libp2pEventHandler { timestamp, metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(), ); - if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { + let timeout = if msg.neighbors_only { + *FIND_FILE_NEIGHBORS_TIMEOUT + } else { + *FIND_FILE_TIMEOUT + }; + if d < TOLERABLE_DRIFT.neg() || d > timeout { debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message"); metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1); if msg.neighbors_only { self.send_to_network(NetworkMessage::ReportPeer { - peer_id, + peer_id: from, action: PeerAction::LowToleranceError, source: ReportSource::Gossipsub, msg: "Received out of date FindFile message", @@ -597,7 +606,7 @@ impl Libp2pEventHandler { Err(_) => return MessageAcceptance::Reject, }; - // ignore if shard config mismatch + // handle on shard config mismatch let my_shard_config = self.store.get_store().get_shard_config(); if !my_shard_config.intersect(&announced_shard_config) { return if msg.neighbors_only { @@ -607,10 +616,6 @@ impl Libp2pEventHandler { }; } - // update peer shard config in cache - self.file_location_cache - .insert_peer_config(peer_id, announced_shard_config); - // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { @@ -618,8 +623,9 @@ impl Libp2pEventHandler { trace!(?tx_id, "Found file locally, responding to FindFile query"); if msg.neighbors_only { + // announce file via RPC to avoid flooding pubsub message self.send_to_network(NetworkMessage::SendRequest { - peer_id, + peer_id: from, request: Request::AnnounceFile(FileAnnouncement { tx_id, num_shard: my_shard_config.num_shard, @@ -635,8 +641,8 @@ impl Libp2pEventHandler { } } + // do not forward to whole network if only find file from neighbor nodes if msg.neighbors_only { - // do not forward to other peers anymore return MessageAcceptance::Ignore; } diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 3c75082..876f6d7 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -566,6 +566,7 @@ impl SerialSyncController { info!(%self.tx_seq, "Succeeded to finalize file"); self.state = SyncState::Completed; metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0); + // notify neighbor nodes about new file completed to sync self.ctx .send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id }); } diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index b7067a3..5f1a60e 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -21,6 +21,9 @@ use std::{ #[serde(default)] pub struct Config { // sync service config + /// Indicates whether to sync file from neighbor nodes only. + /// This is to avoid flooding file announcements in the whole network, + /// which leads to high latency or event timeout to sync files. pub neighbors_only: bool, #[serde(deserialize_with = "deserialize_duration")] pub heartbeat_interval: Duration, diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index d155b46..3fd12f6 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -774,7 +774,7 @@ impl SyncService { } } - /// Handle on NewFile gossip message received. + /// Handle on `NewFile` gossip message received. async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) { debug!(%from, ?msg, "Received NewFile gossip"); @@ -789,14 +789,14 @@ impl SyncService { } } - /// Handle on AnnounceFile RPC message received. - async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) { + /// Handle on `AnnounceFile` RPC message received. + async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) { + // Notify new peer announced if file already in sync if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) { - // Notify new peer announced if file already in sync if let Ok(shard_config) = ShardConfig::new(announcement.shard_id, announcement.num_shard) { - controller.on_peer_announced(peer_id, shard_config); + controller.on_peer_announced(from, shard_config); controller.transition(); } }