From afa471e9ae53ee2346c4f9e05ac10f0841f9e5ab Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Fri, 6 Dec 2024 10:04:12 +0800 Subject: [PATCH] Refactor file sync p2p protocol to NEW_FILE + ASK_FILE + ANSWER_FILE (#293) * refactor new file pubsub message * extract common struct ShardedFile * refactor file sync to ASK_FILE & ANSWER_FILE protocol * simplify code --- node/network/src/behaviour/gossip_cache.rs | 139 ++--------- node/network/src/behaviour/mod.rs | 26 +- node/network/src/peer_manager/mod.rs | 6 +- node/network/src/rpc/codec/ssz_snappy.rs | 14 +- node/network/src/rpc/methods.rs | 8 - node/network/src/rpc/mod.rs | 2 +- node/network/src/rpc/outbound.rs | 17 +- node/network/src/rpc/protocol.rs | 34 +-- node/network/src/rpc/rate_limiter.rs | 22 +- node/network/src/service.rs | 1 + node/network/src/types/mod.rs | 2 +- node/network/src/types/pubsub.rs | 28 ++- node/network/src/types/topics.rs | 7 +- node/router/src/libp2p_event_handler.rs | 263 ++++++++++++--------- node/router/src/metrics.rs | 44 ++-- node/router/src/service.rs | 16 +- node/shared_types/src/lib.rs | 9 +- node/sync/src/controllers/serial.rs | 28 ++- node/sync/src/service.rs | 42 ++-- 19 files changed, 336 insertions(+), 372 deletions(-) diff --git a/node/network/src/behaviour/gossip_cache.rs b/node/network/src/behaviour/gossip_cache.rs index eeafd62..89f7671 100644 --- a/node/network/src/behaviour/gossip_cache.rs +++ b/node/network/src/behaviour/gossip_cache.rs @@ -13,141 +13,34 @@ use tokio_util::time::delay_queue::{DelayQueue, Key}; /// messages are ignored. This behaviour can be changed using `GossipCacheBuilder::default_timeout` /// to apply the same delay to every kind. Individual timeouts for specific kinds can be set and /// will overwrite the default_timeout if present. +#[derive(Default)] pub struct GossipCache { /// Expire timeouts for each topic-msg pair. expirations: DelayQueue<(GossipTopic, Vec)>, /// Messages cached for each topic. topic_msgs: HashMap, Key>>, - /// Timeout for Example messages. - example: Option, - /// Timeout for NewFile messages. - new_file: Option, - /// Timeout for FindFile messages. - find_file: Option, - /// Timeout for FindChunks messages. - find_chunks: Option, - /// Timeout for AnnounceFile. - announce_file: Option, - /// Timeout for AnnounceChunks. - announce_chunks: Option, - /// Timeout for AnnounceShardConfig. - announce_shard_config: Option, -} -#[derive(Default)] -pub struct GossipCacheBuilder { default_timeout: Option, - /// Timeout for Example messages. - example: Option, - /// Timeout for NewFile messages. - new_file: Option, - /// Timeout for blocks FindFile messages. - find_file: Option, - /// Timeout for blocks FindChunks messages. - find_chunks: Option, - /// Timeout for AnnounceFile messages. - announce_file: Option, - /// Timeout for AnnounceChunks messages. - announce_chunks: Option, - /// Timeout for AnnounceShardConfig messages. - announce_shard_config: Option, -} - -#[allow(dead_code)] -impl GossipCacheBuilder { - /// By default, all timeouts all disabled. Setting a default timeout will enable all timeout - /// that are not already set. - pub fn default_timeout(mut self, timeout: Duration) -> Self { - self.default_timeout = Some(timeout); - self - } - - /// Timeout for Example messages. - pub fn example_timeout(mut self, timeout: Duration) -> Self { - self.example = Some(timeout); - self - } - - /// Timeout for NewFile messages. - pub fn new_file_timeout(mut self, timeout: Duration) -> Self { - self.new_file = Some(timeout); - self - } - - /// Timeout for FindFile messages. - pub fn find_file_timeout(mut self, timeout: Duration) -> Self { - self.find_file = Some(timeout); - self - } - - /// Timeout for FindChunks messages. - pub fn find_chunks_timeout(mut self, timeout: Duration) -> Self { - self.find_chunks = Some(timeout); - self - } - - /// Timeout for AnnounceFile messages. - pub fn announce_file_timeout(mut self, timeout: Duration) -> Self { - self.announce_file = Some(timeout); - self - } - - /// Timeout for AnnounceChunks messages. - pub fn announce_chunks_timeout(mut self, timeout: Duration) -> Self { - self.announce_chunks = Some(timeout); - self - } - - /// Timeout for AnnounceShardConfig messages. - pub fn announce_shard_config_timeout(mut self, timeout: Duration) -> Self { - self.announce_shard_config = Some(timeout); - self - } - - pub fn build(self) -> GossipCache { - let GossipCacheBuilder { - default_timeout, - example, - new_file, - find_file, - find_chunks, - announce_file, - announce_chunks, - announce_shard_config, - } = self; - - GossipCache { - expirations: DelayQueue::default(), - topic_msgs: HashMap::default(), - example: example.or(default_timeout), - new_file: new_file.or(default_timeout), - find_file: find_file.or(default_timeout), - find_chunks: find_chunks.or(default_timeout), - announce_file: announce_file.or(default_timeout), - announce_chunks: announce_chunks.or(default_timeout), - announce_shard_config: announce_shard_config.or(default_timeout), - } - } + /// Timeout for pubsub messages. + timeouts: HashMap, } impl GossipCache { - /// Get a builder of a `GossipCache`. Topic kinds for which no timeout is defined will be - /// ignored if added in `insert`. - pub fn builder() -> GossipCacheBuilder { - GossipCacheBuilder::default() + #[cfg(test)] + pub fn new_with_default_timeout(timeout: Duration) -> Self { + Self { + default_timeout: Some(timeout), + ..Default::default() + } } // Insert a message to be sent later. pub fn insert(&mut self, topic: GossipTopic, data: Vec) { - let expire_timeout = match topic.kind() { - GossipKind::Example => self.example, - GossipKind::NewFile => self.new_file, - GossipKind::FindFile => self.find_file, - GossipKind::FindChunks => self.find_chunks, - GossipKind::AnnounceFile => self.announce_file, - GossipKind::AnnounceChunks => self.announce_chunks, - GossipKind::AnnounceShardConfig => self.announce_shard_config, - }; + let expire_timeout = self + .timeouts + .get(topic.kind()) + .cloned() + .or(self.default_timeout); let expire_timeout = match expire_timeout { Some(expire_timeout) => expire_timeout, @@ -221,9 +114,7 @@ mod tests { #[tokio::test] async fn test_stream() { - let mut cache = GossipCache::builder() - .default_timeout(Duration::from_millis(300)) - .build(); + let mut cache = GossipCache::new_with_default_timeout(Duration::from_millis(300)); let test_topic = GossipTopic::new(GossipKind::Example, crate::types::GossipEncoding::SSZSnappy); cache.insert(test_topic, vec![]); diff --git a/node/network/src/behaviour/mod.rs b/node/network/src/behaviour/mod.rs index 4df7c6d..e18f0af 100644 --- a/node/network/src/behaviour/mod.rs +++ b/node/network/src/behaviour/mod.rs @@ -6,7 +6,6 @@ use crate::peer_manager::{ ConnectionDirection, PeerManager, PeerManagerEvent, }; use crate::rpc::methods::DataByHashRequest; -use crate::rpc::methods::FileAnnouncement; use crate::rpc::methods::GetChunksRequest; use crate::rpc::*; use crate::service::Context as ServiceContext; @@ -32,7 +31,7 @@ use libp2p::{ }, NetworkBehaviour, PeerId, }; -use shared_types::ChunkArrayWithProof; +use shared_types::{ChunkArrayWithProof, ShardedFile}; use std::{ collections::VecDeque, sync::Arc, @@ -236,6 +235,9 @@ impl Behaviour { params .topics .insert(get_hash(GossipKind::NewFile), TopicScoreParams::default()); + params + .topics + .insert(get_hash(GossipKind::AskFile), TopicScoreParams::default()); params .topics .insert(get_hash(GossipKind::FindFile), TopicScoreParams::default()); @@ -270,12 +272,10 @@ impl Behaviour { ..config.peer_manager }; - let slot_duration = std::time::Duration::from_secs(12); + // let slot_duration = std::time::Duration::from_secs(12); // let slot_duration = std::time::Duration::from_secs(ctx.chain_spec.seconds_per_slot); - let gossip_cache = GossipCache::builder() - .example_timeout(slot_duration) // TODO - .build(); + let gossip_cache = GossipCache::default(); Ok(Behaviour { // Sub-behaviours @@ -547,8 +547,8 @@ impl Behaviour { Request::DataByHash { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"]) } - Request::AnnounceFile { .. } => { - metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"]) + Request::AnswerFile { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["answer_file"]) } Request::GetChunks { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"]) @@ -780,8 +780,8 @@ where InboundRequest::DataByHash(req) => { self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req)) } - InboundRequest::AnnounceFile(req) => { - self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req)) + InboundRequest::AnswerFile(req) => { + self.propagate_request(peer_request_id, peer_id, Request::AnswerFile(req)) } InboundRequest::GetChunks(req) => { self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req)) @@ -997,8 +997,8 @@ pub enum Request { Status(StatusMessage), /// A data by hash request. DataByHash(DataByHashRequest), - /// An AnnounceFile message. - AnnounceFile(FileAnnouncement), + /// An AnswerFile message. + AnswerFile(ShardedFile), /// A GetChunks request. GetChunks(GetChunksRequest), } @@ -1008,7 +1008,7 @@ impl std::convert::From for OutboundRequest { match req { Request::Status(s) => OutboundRequest::Status(s), Request::DataByHash(r) => OutboundRequest::DataByHash(r), - Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r), + Request::AnswerFile(r) => OutboundRequest::AnswerFile(r), Request::GetChunks(r) => OutboundRequest::GetChunks(r), } } diff --git a/node/network/src/peer_manager/mod.rs b/node/network/src/peer_manager/mod.rs index 7da167f..f6b17c6 100644 --- a/node/network/src/peer_manager/mod.rs +++ b/node/network/src/peer_manager/mod.rs @@ -465,7 +465,7 @@ impl PeerManager { Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, Protocol::DataByHash => PeerAction::MidToleranceError, - Protocol::AnnounceFile => PeerAction::MidToleranceError, + Protocol::AnswerFile => PeerAction::MidToleranceError, Protocol::GetChunks => PeerAction::MidToleranceError, }, }, @@ -480,7 +480,7 @@ impl PeerManager { Protocol::Goodbye => return, Protocol::Status => PeerAction::LowToleranceError, Protocol::DataByHash => return, - Protocol::AnnounceFile => return, + Protocol::AnswerFile => return, Protocol::GetChunks => return, } } @@ -495,7 +495,7 @@ impl PeerManager { Protocol::Goodbye => return, Protocol::Status => return, Protocol::DataByHash => PeerAction::MidToleranceError, - Protocol::AnnounceFile => PeerAction::MidToleranceError, + Protocol::AnswerFile => PeerAction::MidToleranceError, Protocol::GetChunks => PeerAction::MidToleranceError, }, }, diff --git a/node/network/src/rpc/codec/ssz_snappy.rs b/node/network/src/rpc/codec/ssz_snappy.rs index e35c197..2e0459d 100644 --- a/node/network/src/rpc/codec/ssz_snappy.rs +++ b/node/network/src/rpc/codec/ssz_snappy.rs @@ -5,7 +5,7 @@ use crate::rpc::{ }; use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse}; use libp2p::bytes::BytesMut; -use shared_types::ChunkArrayWithProof; +use shared_types::{ChunkArrayWithProof, ShardedFile}; use snap::read::FrameDecoder; use snap::write::FrameEncoder; use ssz::{Decode, Encode}; @@ -159,7 +159,7 @@ impl Encoder for SSZSnappyOutboundCodec { OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(), - OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(), + OutboundRequest::AnswerFile(req) => req.as_ssz_bytes(), OutboundRequest::GetChunks(req) => req.as_ssz_bytes(), }; // SSZ encoded bytes should be within `max_packet_size` @@ -347,8 +347,8 @@ fn handle_v1_request( Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest { hashes: VariableList::from_ssz_bytes(decoded_buffer)?, }))), - Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile( - FileAnnouncement::from_ssz_bytes(decoded_buffer)?, + Protocol::AnswerFile => Ok(Some(InboundRequest::AnswerFile( + ShardedFile::from_ssz_bytes(decoded_buffer)?, ))), Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks( GetChunksRequest::from_ssz_bytes(decoded_buffer)?, @@ -377,9 +377,9 @@ fn handle_v1_response( Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new( ZgsData::from_ssz_bytes(decoded_buffer)?, )))), - // This case should be unreachable as `AnnounceFile` has no response. - Protocol::AnnounceFile => Err(RPCError::InvalidData( - "AnnounceFile RPC message has no valid response".to_string(), + // This case should be unreachable as `AnswerFile` has no response. + Protocol::AnswerFile => Err(RPCError::InvalidData( + "AnswerFile RPC message has no valid response".to_string(), )), Protocol::GetChunks => Ok(Some(RPCResponse::Chunks( ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?, diff --git a/node/network/src/rpc/methods.rs b/node/network/src/rpc/methods.rs index 8322492..35e95c5 100644 --- a/node/network/src/rpc/methods.rs +++ b/node/network/src/rpc/methods.rs @@ -182,14 +182,6 @@ pub struct DataByHashRequest { pub hashes: VariableList, } -// The message of `AnnounceFile` RPC message. -#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] -pub struct FileAnnouncement { - pub tx_id: TxID, - pub num_shard: usize, - pub shard_id: usize, -} - /// Request a chunk array from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] pub struct GetChunksRequest { diff --git a/node/network/src/rpc/mod.rs b/node/network/src/rpc/mod.rs index 4907876..d72b0da 100644 --- a/node/network/src/rpc/mod.rs +++ b/node/network/src/rpc/mod.rs @@ -118,7 +118,7 @@ impl RPC { .n_every(Protocol::Status, 5, Duration::from_secs(15)) .one_every(Protocol::Goodbye, Duration::from_secs(10)) .n_every(Protocol::DataByHash, 128, Duration::from_secs(10)) - .n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10)) + .n_every(Protocol::AnswerFile, 256, Duration::from_secs(10)) .n_every(Protocol::GetChunks, 4096, Duration::from_secs(10)) .build() .expect("Configuration parameters are valid"); diff --git a/node/network/src/rpc/outbound.rs b/node/network/src/rpc/outbound.rs index 6ba1f4a..e60595b 100644 --- a/node/network/src/rpc/outbound.rs +++ b/node/network/src/rpc/outbound.rs @@ -12,6 +12,7 @@ use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; use futures::{FutureExt, SinkExt}; use libp2p::core::{OutboundUpgrade, UpgradeInfo}; +use shared_types::ShardedFile; use tokio_util::{ codec::Framed, compat::{Compat, FuturesAsyncReadCompatExt}, @@ -34,7 +35,7 @@ pub enum OutboundRequest { Goodbye(GoodbyeReason), Ping(Ping), DataByHash(DataByHashRequest), - AnnounceFile(FileAnnouncement), + AnswerFile(ShardedFile), GetChunks(GetChunksRequest), } @@ -73,8 +74,8 @@ impl OutboundRequest { Version::V1, Encoding::SSZSnappy, )], - OutboundRequest::AnnounceFile(_) => vec![ProtocolId::new( - Protocol::AnnounceFile, + OutboundRequest::AnswerFile(_) => vec![ProtocolId::new( + Protocol::AnswerFile, Version::V1, Encoding::SSZSnappy, )], @@ -95,7 +96,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => 0, OutboundRequest::Ping(_) => 1, OutboundRequest::DataByHash(req) => req.hashes.len() as u64, - OutboundRequest::AnnounceFile(_) => 0, + OutboundRequest::AnswerFile(_) => 0, OutboundRequest::GetChunks(_) => 1, } } @@ -107,7 +108,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => Protocol::Goodbye, OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::DataByHash(_) => Protocol::DataByHash, - OutboundRequest::AnnounceFile(_) => Protocol::AnnounceFile, + OutboundRequest::AnswerFile(_) => Protocol::AnswerFile, OutboundRequest::GetChunks(_) => Protocol::GetChunks, } } @@ -122,7 +123,7 @@ impl OutboundRequest { OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), - OutboundRequest::AnnounceFile(_) => unreachable!(), + OutboundRequest::AnswerFile(_) => unreachable!(), OutboundRequest::GetChunks(_) => unreachable!(), } } @@ -179,8 +180,8 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::DataByHash(req) => { write!(f, "Data by hash: {:?}", req) } - OutboundRequest::AnnounceFile(req) => { - write!(f, "AnnounceFile: {:?}", req) + OutboundRequest::AnswerFile(req) => { + write!(f, "AnswerFile: {:?}", req) } OutboundRequest::GetChunks(req) => { write!(f, "GetChunks: {:?}", req) diff --git a/node/network/src/rpc/protocol.rs b/node/network/src/rpc/protocol.rs index a9bdf41..f553060 100644 --- a/node/network/src/rpc/protocol.rs +++ b/node/network/src/rpc/protocol.rs @@ -8,7 +8,7 @@ use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; use futures::{FutureExt, StreamExt}; use libp2p::core::{InboundUpgrade, ProtocolName, UpgradeInfo}; -use shared_types::{ChunkArray, ChunkArrayWithProof, FlowRangeProof}; +use shared_types::{ChunkArray, ChunkArrayWithProof, FlowRangeProof, ShardedFile}; use ssz::Encode; use ssz_types::VariableList; use std::io; @@ -91,8 +91,8 @@ pub enum Protocol { /// TODO DataByHash, - /// The file announce protocol. - AnnounceFile, + /// The file answer protocol. + AnswerFile, /// The Chunk sync protocol. GetChunks, } @@ -117,7 +117,7 @@ impl std::fmt::Display for Protocol { Protocol::Goodbye => "goodbye", Protocol::Ping => "ping", Protocol::DataByHash => "data_by_hash", - Protocol::AnnounceFile => "announce_file", + Protocol::AnswerFile => "answer_file", Protocol::GetChunks => "get_chunks", }; f.write_str(repr) @@ -158,7 +158,7 @@ impl UpgradeInfo for RPCProtocol { ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::AnnounceFile, Version::V1, Encoding::SSZSnappy), + ProtocolId::new(Protocol::AnswerFile, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy), ] } @@ -220,9 +220,9 @@ impl ProtocolId { // TODO RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX) } - Protocol::AnnounceFile => RpcLimits::new( - ::ssz_fixed_len(), - ::ssz_fixed_len(), + Protocol::AnswerFile => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), ), Protocol::GetChunks => RpcLimits::new( ::ssz_fixed_len(), @@ -251,7 +251,7 @@ impl ProtocolId { ::ssz_fixed_len(), ), - Protocol::AnnounceFile => RpcLimits::new(0, 0), // AnnounceFile request has no response + Protocol::AnswerFile => RpcLimits::new(0, 0), // AnswerFile request has no response Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX), } } @@ -334,7 +334,7 @@ pub enum InboundRequest { Goodbye(GoodbyeReason), Ping(Ping), DataByHash(DataByHashRequest), - AnnounceFile(FileAnnouncement), + AnswerFile(ShardedFile), GetChunks(GetChunksRequest), } @@ -373,8 +373,8 @@ impl InboundRequest { Version::V1, Encoding::SSZSnappy, )], - InboundRequest::AnnounceFile(_) => vec![ProtocolId::new( - Protocol::AnnounceFile, + InboundRequest::AnswerFile(_) => vec![ProtocolId::new( + Protocol::AnswerFile, Version::V1, Encoding::SSZSnappy, )], @@ -395,7 +395,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => 0, InboundRequest::DataByHash(req) => req.hashes.len() as u64, InboundRequest::Ping(_) => 1, - InboundRequest::AnnounceFile(_) => 0, + InboundRequest::AnswerFile(_) => 0, InboundRequest::GetChunks(_) => 1, } } @@ -407,7 +407,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => Protocol::Goodbye, InboundRequest::Ping(_) => Protocol::Ping, InboundRequest::DataByHash(_) => Protocol::DataByHash, - InboundRequest::AnnounceFile(_) => Protocol::AnnounceFile, + InboundRequest::AnswerFile(_) => Protocol::AnswerFile, InboundRequest::GetChunks(_) => Protocol::GetChunks, } } @@ -422,7 +422,7 @@ impl InboundRequest { InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), - InboundRequest::AnnounceFile(_) => unreachable!(), + InboundRequest::AnswerFile(_) => unreachable!(), InboundRequest::GetChunks(_) => unreachable!(), } } @@ -541,8 +541,8 @@ impl std::fmt::Display for InboundRequest { InboundRequest::DataByHash(req) => { write!(f, "Data by hash: {:?}", req) } - InboundRequest::AnnounceFile(req) => { - write!(f, "Announce File: {:?}", req) + InboundRequest::AnswerFile(req) => { + write!(f, "Answer File: {:?}", req) } InboundRequest::GetChunks(req) => { write!(f, "Get Chunks: {:?}", req) diff --git a/node/network/src/rpc/rate_limiter.rs b/node/network/src/rpc/rate_limiter.rs index 9c95282..7560958 100644 --- a/node/network/src/rpc/rate_limiter.rs +++ b/node/network/src/rpc/rate_limiter.rs @@ -68,8 +68,8 @@ pub struct RPCRateLimiter { status_rl: Limiter, /// DataByHash rate limiter. data_by_hash_rl: Limiter, - /// AnnounceFile rate limiter. - announce_file_rl: Limiter, + /// AnswerFile rate limiter. + answer_file_rl: Limiter, /// GetChunks rate limiter. get_chunks_rl: Limiter, } @@ -93,8 +93,8 @@ pub struct RPCRateLimiterBuilder { status_quota: Option, /// Quota for the DataByHash protocol. data_by_hash_quota: Option, - /// Quota for the AnnounceFile protocol. - announce_file_quota: Option, + /// Quota for the AnswerFile protocol. + answer_file_quota: Option, /// Quota for the GetChunks protocol. get_chunks_quota: Option, } @@ -113,7 +113,7 @@ impl RPCRateLimiterBuilder { Protocol::Status => self.status_quota = q, Protocol::Goodbye => self.goodbye_quota = q, Protocol::DataByHash => self.data_by_hash_quota = q, - Protocol::AnnounceFile => self.announce_file_quota = q, + Protocol::AnswerFile => self.answer_file_quota = q, Protocol::GetChunks => self.get_chunks_quota = q, } self @@ -150,9 +150,9 @@ impl RPCRateLimiterBuilder { let data_by_hash_quota = self .data_by_hash_quota .ok_or("DataByHash quota not specified")?; - let announce_file_quota = self - .announce_file_quota - .ok_or("AnnounceFile quota not specified")?; + let answer_file_quota = self + .answer_file_quota + .ok_or("AnswerFile quota not specified")?; let get_chunks_quota = self .get_chunks_quota .ok_or("GetChunks quota not specified")?; @@ -162,7 +162,7 @@ impl RPCRateLimiterBuilder { let status_rl = Limiter::from_quota(status_quota)?; let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?; - let announce_file_rl = Limiter::from_quota(announce_file_quota)?; + let answer_file_rl = Limiter::from_quota(answer_file_quota)?; let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?; // check for peers to prune every 30 seconds, starting in 30 seconds @@ -175,7 +175,7 @@ impl RPCRateLimiterBuilder { status_rl, goodbye_rl, data_by_hash_rl, - announce_file_rl, + answer_file_rl, get_chunks_rl, init_time: Instant::now(), }) @@ -220,7 +220,7 @@ impl RPCRateLimiter { Protocol::Status => &mut self.status_rl, Protocol::Goodbye => &mut self.goodbye_rl, Protocol::DataByHash => &mut self.data_by_hash_rl, - Protocol::AnnounceFile => &mut self.announce_file_rl, + Protocol::AnswerFile => &mut self.answer_file_rl, Protocol::GetChunks => &mut self.get_chunks_rl, }; check(limiter) diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 3de768e..dcca522 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -245,6 +245,7 @@ impl Service { let mut topics = vec![ GossipKind::NewFile, + GossipKind::AskFile, GossipKind::FindFile, GossipKind::AnnounceFile, GossipKind::AnnounceShardConfig, diff --git a/node/network/src/types/mod.rs b/node/network/src/types/mod.rs index 8a19851..87058fe 100644 --- a/node/network/src/types/mod.rs +++ b/node/network/src/types/mod.rs @@ -7,7 +7,7 @@ pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::{ - AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, NewFile, PubsubMessage, + AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage, }; pub use topics::{GossipEncoding, GossipKind, GossipTopic}; diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index b516ef7..2dc2cc9 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -6,7 +6,7 @@ use libp2p::{ gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage}, Multiaddr, PeerId, }; -use shared_types::{timestamp_now, ShardConfig, TxID}; +use shared_types::{timestamp_now, ShardConfig, ShardedFile, TxID}; use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -114,15 +114,6 @@ impl ssz::Decode for WrappedPeerId { } } -/// Published when file uploaded or completed to sync from other peers. -#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] -pub struct NewFile { - pub tx_id: TxID, - pub num_shard: usize, - pub shard_id: usize, - pub timestamp: u32, -} - #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct FindFile { pub tx_id: TxID, @@ -231,7 +222,10 @@ type SignedAnnounceFiles = Vec; #[derive(Debug, Clone, PartialEq, Eq)] pub enum PubsubMessage { ExampleMessage(u64), - NewFile(NewFile), + /// Published to neighbors when new file uploaded or completed to sync file. + NewFile(TimedMessage), + /// Published to neighbors for file sync, and answered by `AnswerFile` RPC. + AskFile(TimedMessage), FindFile(FindFile), FindChunks(FindChunks), AnnounceFile(Vec), @@ -311,6 +305,7 @@ impl PubsubMessage { match self { PubsubMessage::ExampleMessage(_) => GossipKind::Example, PubsubMessage::NewFile(_) => GossipKind::NewFile, + PubsubMessage::AskFile(_) => GossipKind::AskFile, PubsubMessage::FindFile(_) => GossipKind::FindFile, PubsubMessage::FindChunks(_) => GossipKind::FindChunks, PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile, @@ -338,7 +333,12 @@ impl PubsubMessage { u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, )), GossipKind::NewFile => Ok(PubsubMessage::NewFile( - NewFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, + TimedMessage::::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + )), + GossipKind::AskFile => Ok(PubsubMessage::AskFile( + TimedMessage::::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, )), GossipKind::FindFile => Ok(PubsubMessage::FindFile( FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, @@ -373,6 +373,7 @@ impl PubsubMessage { match &self { PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(), PubsubMessage::NewFile(data) => data.as_ssz_bytes(), + PubsubMessage::AskFile(data) => data.as_ssz_bytes(), PubsubMessage::FindFile(data) => data.as_ssz_bytes(), PubsubMessage::FindChunks(data) => data.as_ssz_bytes(), PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(), @@ -391,6 +392,9 @@ impl std::fmt::Display for PubsubMessage { PubsubMessage::NewFile(msg) => { write!(f, "NewFile message: {:?}", msg) } + PubsubMessage::AskFile(msg) => { + write!(f, "AskFile message: {:?}", msg) + } PubsubMessage::FindFile(msg) => { write!(f, "FindFile message: {:?}", msg) } diff --git a/node/network/src/types/topics.rs b/node/network/src/types/topics.rs index 888193b..9552773 100644 --- a/node/network/src/types/topics.rs +++ b/node/network/src/types/topics.rs @@ -8,7 +8,8 @@ use strum::AsRefStr; pub const TOPIC_PREFIX: &str = "eth2"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const EXAMPLE_TOPIC: &str = "example"; -pub const NEW_FILE_TOPIC: &str = "new_file"; +pub const NEW_FILE_TOPIC: &str = "new_file_v2"; +pub const ASK_FILE_TOPIC: &str = "ask_file"; pub const FIND_FILE_TOPIC: &str = "find_file"; pub const FIND_CHUNKS_TOPIC: &str = "find_chunks"; pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file"; @@ -32,6 +33,7 @@ pub struct GossipTopic { pub enum GossipKind { Example, NewFile, + AskFile, FindFile, FindChunks, AnnounceFile, @@ -73,6 +75,7 @@ impl GossipTopic { let kind = match topic_parts[2] { EXAMPLE_TOPIC => GossipKind::Example, NEW_FILE_TOPIC => GossipKind::NewFile, + ASK_FILE_TOPIC => GossipKind::AskFile, FIND_FILE_TOPIC => GossipKind::FindFile, FIND_CHUNKS_TOPIC => GossipKind::FindChunks, ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile, @@ -103,6 +106,7 @@ impl From for String { let kind = match topic.kind { GossipKind::Example => EXAMPLE_TOPIC, GossipKind::NewFile => NEW_FILE_TOPIC, + GossipKind::AskFile => ASK_FILE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC, GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, @@ -123,6 +127,7 @@ impl std::fmt::Display for GossipTopic { let kind = match self.kind { GossipKind::Example => EXAMPLE_TOPIC, GossipKind::NewFile => NEW_FILE_TOPIC, + GossipKind::AskFile => ASK_FILE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC, GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index b4ee95f..feed888 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -5,8 +5,7 @@ use std::{ops::Neg, sync::Arc}; use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use network::multiaddr::Protocol; -use network::rpc::methods::FileAnnouncement; -use network::types::{NewFile, TimedMessage}; +use network::types::TimedMessage; use network::{ rpc::StatusMessage, types::{ @@ -17,7 +16,7 @@ use network::{ PublicKey, PubsubMessage, Request, RequestId, Response, }; use network::{Multiaddr, NetworkSender, PeerAction, ReportSource}; -use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID}; +use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, ShardedFile, TxID}; use storage::config::ShardConfig; use storage_async::Store; use sync::{SyncMessage, SyncSender}; @@ -25,36 +24,64 @@ use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{mpsc, RwLock}; use crate::batcher::Batcher; -use crate::metrics; +use crate::metrics::{self, PubsubMsgHandleMetrics}; use crate::peer_manager::PeerManager; use crate::Config; lazy_static::lazy_static! { - /// Timeout to publish NewFile message to neighbor nodes. - pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); - /// Timeout to publish FindFile message to neighbor nodes. - pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30); - /// Timeout to publish FindFile message in the whole network. - pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); - pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); - pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5); + /// Timeout to publish message to neighbor nodes. + pub static ref PUBSUB_TIMEOUT_NEIGHBORS: chrono::Duration = chrono::Duration::seconds(30); + /// Timeout to publish message to network. + pub static ref PUBSUB_TIMEOUT_NETWORK: chrono::Duration = chrono::Duration::minutes(5); pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10); } -fn duration_since(timestamp: u32, metric: Arc) -> chrono::Duration { +fn duration_since(timestamp: u32, latency_ms: Arc) -> chrono::Duration { let timestamp = i64::from(timestamp); let timestamp = chrono::DateTime::from_timestamp(timestamp, 0).expect("should fit"); let now = chrono::Utc::now(); let duration = now.signed_duration_since(timestamp); - let num_secs = duration.num_seconds(); - if num_secs > 0 { - metric.update(num_secs as u64); + let num_millis = duration.num_milliseconds(); + if num_millis > 0 { + latency_ms.update(num_millis as u64); } duration } +impl PubsubMsgHandleMetrics { + pub fn verify_timestamp( + &self, + from: PeerId, + timestamp: u32, + timeout: chrono::Duration, + sender: Option<&NetworkSender>, + ) -> bool { + self.qps.mark(1); + + let d = duration_since(timestamp, self.latency_ms.clone()); + if d >= TOLERABLE_DRIFT.neg() && d <= timeout { + return true; + } + + debug!(%from, ?timestamp, ?d, topic=%self.topic_name, "Ignore out of date pubsub message"); + + self.timeout.mark(1); + + if let Some(sender) = sender { + let _ = sender.send(NetworkMessage::ReportPeer { + peer_id: from, + action: PeerAction::LowToleranceError, + source: ReportSource::Gossipsub, + msg: "Received out of date pubsub message", + }); + } + + false + } +} + fn peer_id_to_public_key(peer_id: &PeerId) -> Result { // A libp2p peer id byte representation should be 2 length bytes + 4 protobuf bytes + compressed pk bytes // if generated from a PublicKey with Identity multihash. @@ -227,25 +254,19 @@ impl Libp2pEventHandler { }); metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1); } - Request::AnnounceFile(announcement) => { - match ShardConfig::new(announcement.shard_id, announcement.num_shard) { - Ok(v) => { - self.file_location_cache.insert_peer_config(peer_id, v); + Request::AnswerFile(file) => match ShardConfig::try_from(file.shard_config) { + Ok(v) => { + self.file_location_cache.insert_peer_config(peer_id, v); - self.send_to_sync(SyncMessage::AnnounceFile { - peer_id, - request_id, - announcement, - }); - } - Err(_) => self.send_to_network(NetworkMessage::ReportPeer { - peer_id, - action: PeerAction::Fatal, - source: ReportSource::RPC, - msg: "Invalid shard config in AnnounceFile RPC message", - }), + self.send_to_sync(SyncMessage::AnswerFile { peer_id, file }); } - } + Err(_) => self.send_to_network(NetworkMessage::ReportPeer { + peer_id, + action: PeerAction::Fatal, + source: ReportSource::RPC, + msg: "Invalid shard config in AnswerFile RPC message", + }), + }, Request::DataByHash(_) => { // ignore } @@ -352,18 +373,13 @@ impl Libp2pEventHandler { match message { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, - PubsubMessage::NewFile(msg) => { - metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1); - self.on_new_file(propagation_source, msg).await - } + PubsubMessage::NewFile(msg) => self.on_new_file(propagation_source, msg).await, + PubsubMessage::AskFile(msg) => self.on_ask_file(propagation_source, msg).await, PubsubMessage::FindFile(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); self.on_find_file(propagation_source, msg).await } - PubsubMessage::FindChunks(msg) => { - metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1); - self.on_find_chunks(msg).await - } + PubsubMessage::FindChunks(msg) => self.on_find_chunks(propagation_source, msg).await, PubsubMessage::AnnounceFile(msgs) => { metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1); @@ -377,38 +393,27 @@ impl Libp2pEventHandler { MessageAcceptance::Accept } - PubsubMessage::AnnounceChunks(msg) => { - metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS.mark(1); - self.on_announce_chunks(propagation_source, msg) - } + PubsubMessage::AnnounceChunks(msg) => self.on_announce_chunks(propagation_source, msg), PubsubMessage::AnnounceShardConfig(msg) => { - metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1); self.on_announce_shard_config(propagation_source, source, msg) } } } /// Handle NewFile pubsub message `msg` that published by `from` peer. - async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance { + async fn on_new_file(&self, from: PeerId, msg: TimedMessage) -> MessageAcceptance { // verify timestamp - let d = duration_since( + if !metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.verify_timestamp( + from, msg.timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY.clone(), - ); - if d < TOLERABLE_DRIFT.neg() || d > *NEW_FILE_TIMEOUT { - debug!(?d, ?msg, "Invalid timestamp, ignoring NewFile message"); - metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT.mark(1); - self.send_to_network(NetworkMessage::ReportPeer { - peer_id: from, - action: PeerAction::LowToleranceError, - source: ReportSource::Gossipsub, - msg: "Received out of date NewFile message", - }); + *PUBSUB_TIMEOUT_NEIGHBORS, + Some(&self.network_send), + ) { return MessageAcceptance::Ignore; } // verify announced shard config - let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) { + let announced_shard_config = match ShardConfig::try_from(msg.shard_config) { Ok(v) => v, Err(_) => return MessageAcceptance::Reject, }; @@ -419,28 +424,65 @@ impl Libp2pEventHandler { return MessageAcceptance::Ignore; } - // ignore if already exists - match self.store.check_tx_completed(msg.tx_id.seq).await { - Ok(true) => return MessageAcceptance::Ignore, - Ok(false) => {} + // ignore if already pruned or exists + match self.store.get_store().get_tx_status(msg.tx_id.seq) { + Ok(Some(_)) => return MessageAcceptance::Ignore, + Ok(None) => {} Err(err) => { - warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx completed"); - return MessageAcceptance::Ignore; - } - } - - // ignore if already pruned - match self.store.check_tx_pruned(msg.tx_id.seq).await { - Ok(true) => return MessageAcceptance::Ignore, - Ok(false) => {} - Err(err) => { - warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx pruned"); + warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to get tx status"); return MessageAcceptance::Ignore; } } // notify sync layer to handle in advance - self.send_to_sync(SyncMessage::NewFile { from, msg }); + self.send_to_sync(SyncMessage::NewFile { + from, + file: msg.inner, + }); + + MessageAcceptance::Ignore + } + + async fn on_ask_file(&self, from: PeerId, msg: TimedMessage) -> MessageAcceptance { + // verify timestamp + if !metrics::LIBP2P_HANDLE_PUBSUB_ASK_FILE.verify_timestamp( + from, + msg.timestamp, + *PUBSUB_TIMEOUT_NEIGHBORS, + Some(&self.network_send), + ) { + return MessageAcceptance::Ignore; + } + + // verify announced shard config + let announced_shard_config = match ShardConfig::try_from(msg.shard_config) { + Ok(v) => v, + Err(_) => return MessageAcceptance::Reject, + }; + + // handle on shard config mismatch + let my_shard_config = self.store.get_store().get_shard_config(); + if !my_shard_config.intersect(&announced_shard_config) { + return MessageAcceptance::Ignore; + } + + // check if we have it + if matches!(self.store.check_tx_completed(msg.tx_id.seq).await, Ok(true)) { + if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(msg.tx_id.seq).await { + if tx.id() == msg.tx_id { + trace!(?msg.tx_id, "Found file locally, responding to FindFile query"); + + self.send_to_network(NetworkMessage::SendRequest { + peer_id: from, + request: Request::AnswerFile(ShardedFile { + tx_id: msg.tx_id, + shard_config: my_shard_config.into(), + }), + request_id: RequestId::Router(Instant::now()), + }); + } + } + } MessageAcceptance::Ignore } @@ -564,9 +606,9 @@ impl Libp2pEventHandler { metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(), ); let timeout = if msg.neighbors_only { - *FIND_FILE_NEIGHBORS_TIMEOUT + *PUBSUB_TIMEOUT_NEIGHBORS } else { - *FIND_FILE_TIMEOUT + *PUBSUB_TIMEOUT_NETWORK }; if d < TOLERABLE_DRIFT.neg() || d > timeout { debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message"); @@ -608,10 +650,9 @@ impl Libp2pEventHandler { // announce file via RPC to avoid flooding pubsub message self.send_to_network(NetworkMessage::SendRequest { peer_id: from, - request: Request::AnnounceFile(FileAnnouncement { + request: Request::AnswerFile(ShardedFile { tx_id, - num_shard: my_shard_config.num_shard, - shard_id: my_shard_config.shard_id, + shard_config: my_shard_config.into(), }), request_id: RequestId::Router(Instant::now()), }); @@ -677,23 +718,27 @@ impl Libp2pEventHandler { Some(PubsubMessage::AnnounceChunks(signed)) } - async fn on_find_chunks(&self, msg: FindChunks) -> MessageAcceptance { + async fn on_find_chunks( + &self, + propagation_source: PeerId, + msg: FindChunks, + ) -> MessageAcceptance { + // verify timestamp + if !metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.verify_timestamp( + propagation_source, + msg.timestamp, + *PUBSUB_TIMEOUT_NETWORK, + None, + ) { + return MessageAcceptance::Ignore; + } + // validate message if msg.index_start >= msg.index_end { debug!(?msg, "Invalid chunk index range"); return MessageAcceptance::Reject; } - // verify timestamp - let d = duration_since( - msg.timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS_LATENCY.clone(), - ); - if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { - debug!(%msg.timestamp, ?d, "Invalid timestamp, ignoring FindChunks message"); - return MessageAcceptance::Ignore; - } - // check if we have specified chunks even file not finalized yet // validate end index let tx = match self.store.get_tx_by_seq_number(msg.tx_id.seq).await { @@ -816,7 +861,7 @@ impl Libp2pEventHandler { msg.resend_timestamp, metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY.clone(), ); - if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT { + if d < TOLERABLE_DRIFT.neg() || d > *PUBSUB_TIMEOUT_NETWORK { debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceFile message"); metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT.mark(1); return MessageAcceptance::Ignore; @@ -847,12 +892,12 @@ impl Libp2pEventHandler { msg: TimedMessage, ) -> MessageAcceptance { // validate timestamp - let d = duration_since( + if !metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.verify_timestamp( + propagation_source, msg.timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(), - ); - if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT { - debug!(?d, %propagation_source, %source, ?msg, "Invalid timestamp, ignoring AnnounceShardConfig message"); + *PUBSUB_TIMEOUT_NETWORK, + None, + ) { return MessageAcceptance::Ignore; } @@ -879,6 +924,16 @@ impl Libp2pEventHandler { propagation_source: PeerId, msg: SignedAnnounceChunks, ) -> MessageAcceptance { + // verify timestamp + if !metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS.verify_timestamp( + propagation_source, + msg.timestamp, + *PUBSUB_TIMEOUT_NETWORK, + None, + ) { + return MessageAcceptance::Ignore; + } + // verify message signature if !verify_signature(&msg, &msg.peer_id, propagation_source) { return MessageAcceptance::Reject; @@ -898,16 +953,6 @@ impl Libp2pEventHandler { return MessageAcceptance::Reject; } - // propagate gossip to peers - let d = duration_since( - msg.resend_timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS_LATENCY.clone(), - ); - if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT { - debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceChunks message"); - return MessageAcceptance::Ignore; - } - // notify sync layer self.send_to_sync(SyncMessage::AnnounceChunksGossip { msg: msg.inner }); @@ -1361,7 +1406,7 @@ mod tests { let result = handle_find_file_msg( &handler, TxID::random_hash(412), - timestamp_now() - 10 - FIND_FILE_TIMEOUT.num_seconds() as u32, + timestamp_now() - 10 - PUBSUB_TIMEOUT_NETWORK.num_seconds() as u32, ) .await; assert!(matches!(result, MessageAcceptance::Ignore)); diff --git a/node/router/src/metrics.rs b/node/router/src/metrics.rs index 979388c..a6cf06c 100644 --- a/node/router/src/metrics.rs +++ b/node/router/src/metrics.rs @@ -2,6 +2,30 @@ use std::sync::Arc; use metrics::{register_meter, register_meter_with_group, Histogram, Meter, Sample}; +pub struct PubsubMsgHandleMetrics { + pub(crate) topic_name: &'static str, + pub(crate) qps: Arc, + pub(crate) latency_ms: Arc, + pub(crate) timeout: Arc, +} + +impl PubsubMsgHandleMetrics { + pub fn new(topic_name: &'static str) -> Self { + let group_name = format!("router_libp2p_handle_pubsub_{}", topic_name); + + Self { + topic_name, + qps: register_meter_with_group(group_name.as_str(), "qps"), + latency_ms: Sample::ExpDecay(0.015).register_with_group( + group_name.as_str(), + "latency_ms", + 1024, + ), + timeout: register_meter_with_group(group_name.as_str(), "timeout"), + } + } +} + lazy_static::lazy_static! { // service pub static ref SERVICE_ROUTE_NETWORK_MESSAGE: Arc = register_meter("router_service_route_network_message"); @@ -42,10 +66,12 @@ lazy_static::lazy_static! { pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc = register_meter_with_group("router_libp2p_handle_response_error", "qps"); pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024); - // libp2p_event_handler: new file - pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "qps"); - pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_new_file", "latency", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT: Arc = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "timeout"); + // libp2p_event_handler: pubsub messages + pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: PubsubMsgHandleMetrics = PubsubMsgHandleMetrics::new("new_file"); + pub static ref LIBP2P_HANDLE_PUBSUB_ASK_FILE: PubsubMsgHandleMetrics = PubsubMsgHandleMetrics::new("ask_file"); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: PubsubMsgHandleMetrics = PubsubMsgHandleMetrics::new("find_chunks"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS: PubsubMsgHandleMetrics = PubsubMsgHandleMetrics::new("announce_chunks"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: PubsubMsgHandleMetrics = PubsubMsgHandleMetrics::new("announce_shard"); // libp2p_event_handler: find & announce file pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps"); @@ -60,16 +86,6 @@ lazy_static::lazy_static! { pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_ANNOUNCEMENTS: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "announcements"); pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_FILES: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "files"); - // libp2p_event_handler: find & announce chunks - pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_chunks", "qps"); - pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_chunks", "latency", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_chunks", "qps"); - pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_chunks", "latency", 1024); - - // libp2p_event_handler: announce shard config - pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_shard", "qps"); - pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_shard", "latency", 1024); - // libp2p_event_handler: verify IP address pub static ref LIBP2P_VERIFY_ANNOUNCED_IP: Arc = register_meter("router_libp2p_verify_announced_ip"); pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_UNSEEN: Arc = register_meter("router_libp2p_verify_announced_ip_unseen"); diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 29efccf..190ff0c 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -8,11 +8,11 @@ use miner::MinerMessage; use network::rpc::GoodbyeReason; use network::PeerId; use network::{ - types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, - NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm, + BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, NetworkReceiver, + NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm, }; use pruner::PrunerMessage; -use shared_types::timestamp_now; +use shared_types::ShardedFile; use std::sync::Arc; use storage::log_store::Store as LogStore; use storage_async::Store; @@ -336,13 +336,11 @@ impl RouterService { self.disconnect_peer(peer_id); } NetworkMessage::AnnounceLocalFile { tx_id } => { - let shard_config = self.store.get_shard_config(); - let msg = PubsubMessage::NewFile(NewFile { + let new_file = ShardedFile { tx_id, - num_shard: shard_config.num_shard, - shard_id: shard_config.shard_id, - timestamp: timestamp_now(), - }); + shard_config: self.store.get_shard_config().into(), + }; + let msg = PubsubMessage::NewFile(new_file.into()); self.libp2p.swarm.behaviour_mut().publish(vec![msg]); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); } diff --git a/node/shared_types/src/lib.rs b/node/shared_types/src/lib.rs index f0dbd98..18f75f9 100644 --- a/node/shared_types/src/lib.rs +++ b/node/shared_types/src/lib.rs @@ -403,13 +403,18 @@ pub enum TxSeqOrRoot { Root(DataRoot), } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, DeriveEncode, DeriveDecode)] -#[serde(rename_all = "camelCase")] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, DeriveEncode, DeriveDecode)] pub struct ShardConfig { pub num_shard: usize, pub shard_id: usize, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, DeriveEncode, DeriveDecode)] +pub struct ShardedFile { + pub tx_id: TxID, + pub shard_config: ShardConfig, +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index fef2101..7c4f885 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -10,7 +10,7 @@ use network::{ PeerAction, PeerId, PubsubMessage, SyncId as RequestId, }; use rand::Rng; -use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE}; +use shared_types::{timestamp_now, ChunkArrayWithProof, ShardedFile, TxID, CHUNK_SIZE}; use ssz::Encode; use std::{sync::Arc, time::Instant}; use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE}; @@ -211,13 +211,25 @@ impl SerialSyncController { fn do_publish_find_file(&self) { let shard_config = self.store.get_store().get_shard_config(); - self.ctx.publish(PubsubMessage::FindFile(FindFile { - tx_id: self.tx_id, - num_shard: shard_config.num_shard, - shard_id: shard_config.shard_id, - neighbors_only: self.config.neighbors_only, - timestamp: timestamp_now(), - })); + let msg = if self.config.neighbors_only { + PubsubMessage::AskFile( + ShardedFile { + tx_id: self.tx_id, + shard_config: shard_config.into(), + } + .into(), + ) + } else { + PubsubMessage::FindFile(FindFile { + tx_id: self.tx_id, + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, + neighbors_only: self.config.neighbors_only, + timestamp: timestamp_now(), + }) + }; + + self.ctx.publish(msg); } fn publish_find_chunks(&self) { diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 6a6c09e..15f18ba 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -8,13 +8,14 @@ use anyhow::{anyhow, bail, Result}; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; use log_entry_sync::LogSyncEvent; -use network::rpc::methods::FileAnnouncement; -use network::types::{AnnounceChunks, FindFile, NewFile}; +use network::types::{AnnounceChunks, FindFile}; use network::{ rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, NetworkSender, PeerId, PeerRequestId, PubsubMessage, SyncId as RequestId, }; -use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID}; +use shared_types::{ + bytes_to_chunks, timestamp_now, ChunkArrayWithProof, ShardedFile, Transaction, TxID, +}; use std::sync::atomic::Ordering; use std::{ cmp, @@ -71,12 +72,11 @@ pub enum SyncMessage { }, NewFile { from: PeerId, - msg: NewFile, + file: ShardedFile, }, - AnnounceFile { + AnswerFile { peer_id: PeerId, - request_id: PeerRequestId, - announcement: FileAnnouncement, + file: ShardedFile, }, } @@ -273,12 +273,8 @@ impl SyncService { SyncMessage::AnnounceShardConfig { .. } => { // FIXME: Check if controllers need to be reset? } - SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await, - SyncMessage::AnnounceFile { - peer_id, - announcement, - .. - } => self.on_announce_file(peer_id, announcement).await, + SyncMessage::NewFile { from, file } => self.on_new_file_gossip(from, file).await, + SyncMessage::AnswerFile { peer_id, file } => self.on_answer_file(peer_id, file).await, } } @@ -773,27 +769,25 @@ impl SyncService { } /// Handle on `NewFile` gossip message received. - async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) { - debug!(%from, ?msg, "Received NewFile gossip"); + async fn on_new_file_gossip(&mut self, from: PeerId, file: ShardedFile) { + debug!(%from, ?file, "Received NewFile gossip"); - if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) { + if let Some(controller) = self.controllers.get_mut(&file.tx_id.seq) { // Notify new peer announced if file already in sync - if let Ok(shard_config) = ShardConfig::new(msg.shard_id, msg.num_shard) { + if let Ok(shard_config) = ShardConfig::try_from(file.shard_config) { controller.on_peer_announced(from, shard_config); controller.transition(); } } else if let Some(manager) = &self.auto_sync_manager { - let _ = manager.new_file_send.send(msg.tx_id.seq); + let _ = manager.new_file_send.send(file.tx_id.seq); } } - /// Handle on `AnnounceFile` RPC message received. - async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) { + /// Handle on `AnswerFile` RPC message received. + async fn on_answer_file(&mut self, from: PeerId, file: ShardedFile) { // Notify new peer announced if file already in sync - if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) { - if let Ok(shard_config) = - ShardConfig::new(announcement.shard_id, announcement.num_shard) - { + if let Some(controller) = self.controllers.get_mut(&file.tx_id.seq) { + if let Ok(shard_config) = ShardConfig::try_from(file.shard_config) { controller.on_peer_announced(from, shard_config); controller.transition(); }