diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 717fe0c..569860a 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -127,6 +127,7 @@ pub struct FindFile { pub tx_id: TxID, pub num_shard: usize, pub shard_id: usize, + pub neighbors_only: bool, pub timestamp: u32, } diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 5c28991..2922bae 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -5,6 +5,7 @@ use std::{ops::Neg, sync::Arc}; use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use network::multiaddr::Protocol; +use network::rpc::methods::FileAnnouncement; use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig}; use network::{ rpc::StatusMessage, @@ -30,7 +31,7 @@ use crate::Config; lazy_static::lazy_static! { pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); - pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); + pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10); @@ -326,11 +327,11 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(propagation_source, msg).await + self.on_new_file(source, msg).await } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); - self.on_find_file(msg).await + self.on_find_file(source, msg).await } PubsubMessage::FindChunks(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1); @@ -550,7 +551,7 @@ impl Libp2pEventHandler { Some(PubsubMessage::AnnounceShardConfig(signed)) } - async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance { + async fn on_find_file(&self, peer_id: PeerId, msg: FindFile) -> MessageAcceptance { let FindFile { tx_id, timestamp, .. } = msg; @@ -563,6 +564,14 @@ impl Libp2pEventHandler { if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message"); metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1); + if msg.neighbors_only { + self.send_to_network(NetworkMessage::ReportPeer { + peer_id, + action: PeerAction::LowToleranceError, + source: ReportSource::Gossipsub, + msg: "Received out of date FindFile message", + }); + } return MessageAcceptance::Ignore; } @@ -572,20 +581,33 @@ impl Libp2pEventHandler { Err(_) => return MessageAcceptance::Reject, }; - // propagate FindFile query to other nodes if shard mismatch + // ignore if shard config mismatch let my_shard_config = self.store.get_store().get_shard_config(); if !my_shard_config.intersect(&announced_shard_config) { - metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1); - return MessageAcceptance::Accept; + return if msg.neighbors_only { + MessageAcceptance::Ignore + } else { + MessageAcceptance::Accept + }; } + // update peer shard config in cache + self.file_location_cache + .insert_peer_config(peer_id, announced_shard_config); + // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { if tx.id() == tx_id { trace!(?tx_id, "Found file locally, responding to FindFile query"); - if self.publish_file(tx_id).await.is_some() { + if msg.neighbors_only { + self.send_to_network(NetworkMessage::SendRequest { + peer_id, + request: Request::AnnounceFile(FileAnnouncement { tx_id }), + request_id: RequestId::Router(Instant::now()), + }); + } else if self.publish_file(tx_id).await.is_some() { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1); return MessageAcceptance::Ignore; } @@ -593,6 +615,11 @@ impl Libp2pEventHandler { } } + if msg.neighbors_only { + // do not forward to other peers anymore + return MessageAcceptance::Ignore + } + // try from cache if let Some(mut msg) = self.file_location_cache.get_one(tx_id) { trace!(?tx_id, "Found file in cache, responding to FindFile query"); @@ -914,7 +941,7 @@ impl Libp2pEventHandler { } } - pub async fn publish_file(&self, tx_id: TxID) -> Option { + async fn publish_file(&self, tx_id: TxID) -> Option { match self.file_batcher.write().await.add(tx_id) { Some(batch) => { let announcement = self.construct_announce_file_message(batch).await?; @@ -1287,6 +1314,7 @@ mod tests { tx_id, num_shard: 1, shard_id: 0, + neighbors_only: false, timestamp, }); handler.on_pubsub_message(alice, bob, &id, message).await diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index d4072bb..44f093e 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -204,6 +204,7 @@ impl SerialSyncController { tx_id: self.tx_id, num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, + neighbors_only: false, timestamp: timestamp_now(), })); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 353b3ea..c577e09 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -601,6 +601,7 @@ impl SyncService { tx_id: tx.id(), num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, + neighbors_only: false, timestamp: timestamp_now(), })); Ok(())