From cebc3c8247d19506eb76ecf39b258ebc601e9bda Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 19:29:08 +0800 Subject: [PATCH] Mark peer connected if FileAnnouncement RPC message received --- node/network/src/rpc/methods.rs | 2 + node/router/src/libp2p_event_handler.rs | 33 ++++++++++++---- node/sync/src/controllers/serial.rs | 52 +++++++++++++++++++------ node/sync/src/service.rs | 22 ++++++----- 4 files changed, 81 insertions(+), 28 deletions(-) diff --git a/node/network/src/rpc/methods.rs b/node/network/src/rpc/methods.rs index 8d8e381..554eb8b 100644 --- a/node/network/src/rpc/methods.rs +++ b/node/network/src/rpc/methods.rs @@ -182,6 +182,8 @@ pub struct DataByHashRequest { #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] pub struct FileAnnouncement { pub tx_id: TxID, + pub num_shard: usize, + pub shard_id: usize, } /// Request a chunk array from a peer. diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 2922bae..57fa79d 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -222,11 +222,23 @@ impl Libp2pEventHandler { metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1); } Request::AnnounceFile(announcement) => { - self.send_to_sync(SyncMessage::AnnounceFile { - peer_id, - request_id, - announcement, - }); + match ShardConfig::new(announcement.shard_id, announcement.num_shard) { + Ok(v) => { + self.file_location_cache.insert_peer_config(peer_id, v); + + self.send_to_sync(SyncMessage::AnnounceFile { + peer_id, + request_id, + announcement, + }); + } + Err(_) => self.send_to_network(NetworkMessage::ReportPeer { + peer_id, + action: PeerAction::Fatal, + source: ReportSource::RPC, + msg: "Invalid shard config in FileAnnouncement", + }), + } } Request::DataByHash(_) => { // ignore @@ -411,6 +423,9 @@ impl Libp2pEventHandler { self.send_to_sync(SyncMessage::NewFile { from, msg }); } + self.file_location_cache + .insert_peer_config(from, announced_shard_config); + MessageAcceptance::Ignore } @@ -604,7 +619,11 @@ impl Libp2pEventHandler { if msg.neighbors_only { self.send_to_network(NetworkMessage::SendRequest { peer_id, - request: Request::AnnounceFile(FileAnnouncement { tx_id }), + request: Request::AnnounceFile(FileAnnouncement { + tx_id, + num_shard: my_shard_config.num_shard, + shard_id: my_shard_config.shard_id, + }), request_id: RequestId::Router(Instant::now()), }); } else if self.publish_file(tx_id).await.is_some() { @@ -617,7 +636,7 @@ impl Libp2pEventHandler { if msg.neighbors_only { // do not forward to other peers anymore - return MessageAcceptance::Ignore + return MessageAcceptance::Ignore; } // try from cache diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 44f093e..33c61ef 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -14,12 +14,13 @@ use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE}; use ssz::Encode; use std::{sync::Arc, time::Instant}; use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE}; -use storage_async::Store; +use storage_async::{ShardConfig, Store}; #[derive(Clone, Debug, PartialEq, Eq)] pub enum FailureReason { DBError(String), TxReverted(TxID), + TimeoutFindFile, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -88,6 +89,9 @@ pub struct SerialSyncController { /// Cache for storing and serving gossip messages. file_location_cache: Arc, + + /// Whether to find files from neighbors only. + neighbors_only: bool, } impl SerialSyncController { @@ -114,6 +118,7 @@ impl SerialSyncController { ctx, store, file_location_cache, + neighbors_only: true, } } @@ -159,11 +164,14 @@ impl SerialSyncController { /// Find more peers to sync chunks. Return whether `FindFile` pubsub message published, fn try_find_peers(&mut self) { - let (published, num_new_peers) = if self.goal.is_all_chunks() { - self.publish_find_file() - } else { + let (published, num_new_peers) = if !self.goal.is_all_chunks() { self.publish_find_chunks(); (true, 0) + } else if self.neighbors_only { + self.do_publish_find_file(); + (true, 0) + } else { + self.publish_find_file() }; info!(%self.tx_seq, %published, %num_new_peers, "Finding peers"); @@ -199,16 +207,21 @@ impl SerialSyncController { return (false, num_new_peers); } + self.do_publish_find_file(); + + (true, num_new_peers) + } + + fn do_publish_find_file(&self) { let shard_config = self.store.get_store().get_shard_config(); + self.ctx.publish(PubsubMessage::FindFile(FindFile { tx_id: self.tx_id, num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, - neighbors_only: false, + neighbors_only: self.neighbors_only, timestamp: timestamp_now(), })); - - (true, num_new_peers) } fn publish_find_chunks(&self) { @@ -341,6 +354,14 @@ impl SerialSyncController { } } + /// Triggered when any peer (TCP connected) announced file via RPC message. + pub fn on_peer_announced(&mut self, peer_id: PeerId, shard_config: ShardConfig) { + self.peers + .add_new_peer_with_config(peer_id, Multiaddr::empty(), shard_config); + self.peers + .update_state_force(&peer_id, PeerState::Connected); + } + pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) { match err { DialError::ConnectionLimit(_) => { @@ -643,12 +664,19 @@ impl SerialSyncController { { self.state = SyncState::FoundPeers; } else { - // storage node may not have the specific file when `FindFile` - // gossip message received. In this case, just broadcast the - // `FindFile` message again. + // FindFile timeout if since.elapsed() >= self.config.peer_find_timeout { - debug!(%self.tx_seq, "Finding peer timeout and try to find peers again"); - self.try_find_peers(); + if self.neighbors_only { + self.state = SyncState::Failed { + reason: FailureReason::TimeoutFindFile, + }; + } else { + // storage node may not have the specific file when `FindFile` + // gossip message received. In this case, just broadcast the + // `FindFile` message again. + debug!(%self.tx_seq, "Finding peer timeout and try to find peers again"); + self.try_find_peers(); + } } completed = true; diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index c577e09..22296bb 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -779,22 +779,26 @@ impl SyncService { debug!(%from, ?msg, "Received NewFile gossip"); if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) { - // Notify new peer found if file already in sync - // TODO qbit: do not require remote address since already TCP connected - // controller.on_peer_found(from, addr); - controller.transition(); + // Notify new peer announced if file already in sync + if let Ok(shard_config) = ShardConfig::new(msg.shard_id, msg.num_shard) { + controller.on_peer_announced(from, shard_config); + controller.transition(); + } } else if let Some(manager) = &self.auto_sync_manager { let _ = manager.new_file_send.send(msg.tx_id.seq); } } /// Handle on AnnounceFile RPC message received. - async fn on_announce_file(&mut self, _peer_id: PeerId, announcement: FileAnnouncement) { + async fn on_announce_file(&mut self, peer_id: PeerId, announcement: FileAnnouncement) { if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) { - // Notify new peer found if file already in sync - // TODO qbit: do not require remote address since already TCP connected - // controller.on_peer_found(from, addr); - controller.transition(); + // Notify new peer announced if file already in sync + if let Ok(shard_config) = + ShardConfig::new(announcement.shard_id, announcement.num_shard) + { + controller.on_peer_announced(peer_id, shard_config); + controller.transition(); + } } }