diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index c42dc07..1ab2301 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -5,7 +5,7 @@ use std::{ops::Neg, sync::Arc}; use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use network::multiaddr::Protocol; -use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig}; +use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig}; use network::{ rpc::StatusMessage, types::{ @@ -29,6 +29,7 @@ use crate::peer_manager::PeerManager; use crate::Config; lazy_static::lazy_static! { + pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); @@ -316,8 +317,10 @@ impl Libp2pEventHandler { match message { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, - // TODO qbit: handle the NewFile pubsub message - PubsubMessage::NewFile(_) => todo!(), + PubsubMessage::NewFile(msg) => { + metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); + self.on_new_file(source, msg).await + } PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); self.on_find_file(msg).await @@ -350,6 +353,59 @@ impl Libp2pEventHandler { } } + async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance { + // verify timestamp + let d = duration_since( + msg.timestamp, + metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY.clone(), + ); + if d < TOLERABLE_DRIFT.neg() || d > *NEW_FILE_TIMEOUT { + debug!(?d, ?msg, "Invalid timestamp, ignoring NewFile message"); + metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT.mark(1); + self.send_to_network(NetworkMessage::ReportPeer { + peer_id: from, + action: PeerAction::LowToleranceError, + source: ReportSource::Gossipsub, + msg: "Received out of date NewFile message", + }); + return MessageAcceptance::Ignore; + } + + // verify announced shard config + let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) { + Ok(v) => v, + Err(_) => return MessageAcceptance::Reject, + }; + + // ignore if already exists + match self.store.check_tx_completed(msg.tx_id.seq).await { + Ok(true) => return MessageAcceptance::Ignore, + Ok(false) => {} + Err(err) => { + warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx completed"); + return MessageAcceptance::Ignore; + } + } + + // ignore if already pruned + match self.store.check_tx_pruned(msg.tx_id.seq).await { + Ok(true) => return MessageAcceptance::Ignore, + Ok(false) => {} + Err(err) => { + warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx pruned"); + return MessageAcceptance::Ignore; + } + } + + // notify sync layer if shard config matches + let my_shard_config = self.store.get_store().get_shard_config(); + if my_shard_config.intersect(&announced_shard_config) { + self.send_to_sync(SyncMessage::NewFile { from, msg }); + } + + MessageAcceptance::Ignore + } + async fn construct_announced_ip(&self) -> Option { // public address configured if let Some(ip) = self.config.public_address { diff --git a/node/router/src/metrics.rs b/node/router/src/metrics.rs index 03c3081..141d034 100644 --- a/node/router/src/metrics.rs +++ b/node/router/src/metrics.rs @@ -44,6 +44,11 @@ lazy_static::lazy_static! { pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc = register_meter_with_group("router_libp2p_handle_response_error", "qps"); pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024); + // libp2p_event_handler: new file + pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "qps"); + pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_new_file", "latency", 1024); + pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT: Arc = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "timeout"); + // libp2p_event_handler: find & announce file pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index fee3f52..56fb896 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -8,7 +8,7 @@ use anyhow::{anyhow, bail, Result}; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; use log_entry_sync::LogSyncEvent; -use network::types::{AnnounceChunks, FindFile}; +use network::types::{AnnounceChunks, FindFile, NewFile}; use network::PubsubMessage; use network::{ rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId, @@ -70,6 +70,10 @@ pub enum SyncMessage { AnnounceChunksGossip { msg: AnnounceChunks, }, + NewFile { + from: PeerId, + msg: NewFile, + }, } #[derive(Debug)] @@ -265,6 +269,7 @@ impl SyncService { SyncMessage::AnnounceShardConfig { .. } => { // FIXME: Check if controllers need to be reset? } + SyncMessage::NewFile { from, msg } => todo!(), } }