From 35c4760724dc9a4d50d92a19af417f03d336df6a Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 18:01:24 +0800 Subject: [PATCH] Add AnnounceFile RPC message in network layer --- node/network/src/behaviour/mod.rs | 10 ++++++++++ node/network/src/peer_manager/mod.rs | 3 +++ node/network/src/rpc/codec/ssz_snappy.rs | 8 ++++++++ node/network/src/rpc/methods.rs | 6 ++++++ node/network/src/rpc/mod.rs | 1 + node/network/src/rpc/outbound.rs | 12 ++++++++++++ node/network/src/rpc/protocol.rs | 21 +++++++++++++++++++++ node/network/src/rpc/rate_limiter.rs | 11 +++++++++++ node/router/src/libp2p_event_handler.rs | 7 +++++++ node/sync/src/service.rs | 22 ++++++++++++++++++++++ 10 files changed, 101 insertions(+) diff --git a/node/network/src/behaviour/mod.rs b/node/network/src/behaviour/mod.rs index f3c6c26..fb562ac 100644 --- a/node/network/src/behaviour/mod.rs +++ b/node/network/src/behaviour/mod.rs @@ -6,6 +6,7 @@ use crate::peer_manager::{ ConnectionDirection, PeerManager, PeerManagerEvent, }; use crate::rpc::methods::DataByHashRequest; +use crate::rpc::methods::FileAnnouncement; use crate::rpc::methods::GetChunksRequest; use crate::rpc::*; use crate::service::Context as ServiceContext; @@ -546,6 +547,9 @@ impl Behaviour { Request::DataByHash { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"]) } + Request::AnnounceFile { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"]) + } Request::GetChunks { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"]) } @@ -758,6 +762,9 @@ where InboundRequest::DataByHash(req) => { self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req)) } + InboundRequest::AnnounceFile(req) => { + self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req)) + } InboundRequest::GetChunks(req) => { self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req)) } @@ -972,6 +979,8 @@ pub enum Request { Status(StatusMessage), /// A data by hash request. DataByHash(DataByHashRequest), + /// An AnnounceFile message. + AnnounceFile(FileAnnouncement), /// A GetChunks request. GetChunks(GetChunksRequest), } @@ -981,6 +990,7 @@ impl std::convert::From for OutboundRequest { match req { Request::Status(s) => OutboundRequest::Status(s), Request::DataByHash(r) => OutboundRequest::DataByHash(r), + Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r), Request::GetChunks(r) => OutboundRequest::GetChunks(r), } } diff --git a/node/network/src/peer_manager/mod.rs b/node/network/src/peer_manager/mod.rs index f7b0f76..e9b33d0 100644 --- a/node/network/src/peer_manager/mod.rs +++ b/node/network/src/peer_manager/mod.rs @@ -460,6 +460,7 @@ impl PeerManager { Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, Protocol::DataByHash => PeerAction::MidToleranceError, + Protocol::AnnounceFile => PeerAction::MidToleranceError, Protocol::GetChunks => PeerAction::MidToleranceError, }, }, @@ -474,6 +475,7 @@ impl PeerManager { Protocol::Goodbye => return, Protocol::Status => PeerAction::LowToleranceError, Protocol::DataByHash => return, + Protocol::AnnounceFile => return, Protocol::GetChunks => return, } } @@ -488,6 +490,7 @@ impl PeerManager { Protocol::Goodbye => return, Protocol::Status => return, Protocol::DataByHash => PeerAction::MidToleranceError, + Protocol::AnnounceFile => PeerAction::MidToleranceError, Protocol::GetChunks => PeerAction::MidToleranceError, }, }, diff --git a/node/network/src/rpc/codec/ssz_snappy.rs b/node/network/src/rpc/codec/ssz_snappy.rs index 14ecec8..d0e9370 100644 --- a/node/network/src/rpc/codec/ssz_snappy.rs +++ b/node/network/src/rpc/codec/ssz_snappy.rs @@ -159,6 +159,7 @@ impl Encoder for SSZSnappyOutboundCodec { OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(), + OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(), OutboundRequest::GetChunks(req) => req.as_ssz_bytes(), }; // SSZ encoded bytes should be within `max_packet_size` @@ -346,6 +347,9 @@ fn handle_v1_request( Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest { hashes: VariableList::from_ssz_bytes(decoded_buffer)?, }))), + Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile( + FileAnnouncement::from_ssz_bytes(decoded_buffer)?, + ))), Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks( GetChunksRequest::from_ssz_bytes(decoded_buffer)?, ))), @@ -373,6 +377,10 @@ fn handle_v1_response( Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new( ZgsData::from_ssz_bytes(decoded_buffer)?, )))), + // This case should be unreachable as `AnnounceFile` has no response. + Protocol::AnnounceFile => Err(RPCError::InvalidData( + "AnnounceFile RPC message has no valid response".to_string(), + )), Protocol::GetChunks => Ok(Some(RPCResponse::Chunks( ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?, ))), diff --git a/node/network/src/rpc/methods.rs b/node/network/src/rpc/methods.rs index e2ec11e..8d8e381 100644 --- a/node/network/src/rpc/methods.rs +++ b/node/network/src/rpc/methods.rs @@ -178,6 +178,12 @@ pub struct DataByHashRequest { pub hashes: VariableList, } +// The message of `AnnounceFile` RPC message. +#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] +pub struct FileAnnouncement { + pub tx_id: TxID, +} + /// Request a chunk array from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] pub struct GetChunksRequest { diff --git a/node/network/src/rpc/mod.rs b/node/network/src/rpc/mod.rs index 18e66d6..4907876 100644 --- a/node/network/src/rpc/mod.rs +++ b/node/network/src/rpc/mod.rs @@ -118,6 +118,7 @@ impl RPC { .n_every(Protocol::Status, 5, Duration::from_secs(15)) .one_every(Protocol::Goodbye, Duration::from_secs(10)) .n_every(Protocol::DataByHash, 128, Duration::from_secs(10)) + .n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10)) .n_every(Protocol::GetChunks, 4096, Duration::from_secs(10)) .build() .expect("Configuration parameters are valid"); diff --git a/node/network/src/rpc/outbound.rs b/node/network/src/rpc/outbound.rs index 30a6bdc..6ba1f4a 100644 --- a/node/network/src/rpc/outbound.rs +++ b/node/network/src/rpc/outbound.rs @@ -34,6 +34,7 @@ pub enum OutboundRequest { Goodbye(GoodbyeReason), Ping(Ping), DataByHash(DataByHashRequest), + AnnounceFile(FileAnnouncement), GetChunks(GetChunksRequest), } @@ -72,6 +73,11 @@ impl OutboundRequest { Version::V1, Encoding::SSZSnappy, )], + OutboundRequest::AnnounceFile(_) => vec![ProtocolId::new( + Protocol::AnnounceFile, + Version::V1, + Encoding::SSZSnappy, + )], OutboundRequest::GetChunks(_) => vec![ProtocolId::new( Protocol::GetChunks, Version::V1, @@ -89,6 +95,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => 0, OutboundRequest::Ping(_) => 1, OutboundRequest::DataByHash(req) => req.hashes.len() as u64, + OutboundRequest::AnnounceFile(_) => 0, OutboundRequest::GetChunks(_) => 1, } } @@ -100,6 +107,7 @@ impl OutboundRequest { OutboundRequest::Goodbye(_) => Protocol::Goodbye, OutboundRequest::Ping(_) => Protocol::Ping, OutboundRequest::DataByHash(_) => Protocol::DataByHash, + OutboundRequest::AnnounceFile(_) => Protocol::AnnounceFile, OutboundRequest::GetChunks(_) => Protocol::GetChunks, } } @@ -114,6 +122,7 @@ impl OutboundRequest { OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), + OutboundRequest::AnnounceFile(_) => unreachable!(), OutboundRequest::GetChunks(_) => unreachable!(), } } @@ -170,6 +179,9 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::DataByHash(req) => { write!(f, "Data by hash: {:?}", req) } + OutboundRequest::AnnounceFile(req) => { + write!(f, "AnnounceFile: {:?}", req) + } OutboundRequest::GetChunks(req) => { write!(f, "GetChunks: {:?}", req) } diff --git a/node/network/src/rpc/protocol.rs b/node/network/src/rpc/protocol.rs index 53de8df..a9bdf41 100644 --- a/node/network/src/rpc/protocol.rs +++ b/node/network/src/rpc/protocol.rs @@ -91,6 +91,8 @@ pub enum Protocol { /// TODO DataByHash, + /// The file announce protocol. + AnnounceFile, /// The Chunk sync protocol. GetChunks, } @@ -115,6 +117,7 @@ impl std::fmt::Display for Protocol { Protocol::Goodbye => "goodbye", Protocol::Ping => "ping", Protocol::DataByHash => "data_by_hash", + Protocol::AnnounceFile => "announce_file", Protocol::GetChunks => "get_chunks", }; f.write_str(repr) @@ -155,6 +158,7 @@ impl UpgradeInfo for RPCProtocol { ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy), + ProtocolId::new(Protocol::AnnounceFile, Version::V1, Encoding::SSZSnappy), ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy), ] } @@ -216,6 +220,10 @@ impl ProtocolId { // TODO RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX) } + Protocol::AnnounceFile => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), Protocol::GetChunks => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -243,6 +251,7 @@ impl ProtocolId { ::ssz_fixed_len(), ), + Protocol::AnnounceFile => RpcLimits::new(0, 0), // AnnounceFile request has no response Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX), } } @@ -325,6 +334,7 @@ pub enum InboundRequest { Goodbye(GoodbyeReason), Ping(Ping), DataByHash(DataByHashRequest), + AnnounceFile(FileAnnouncement), GetChunks(GetChunksRequest), } @@ -363,6 +373,11 @@ impl InboundRequest { Version::V1, Encoding::SSZSnappy, )], + InboundRequest::AnnounceFile(_) => vec![ProtocolId::new( + Protocol::AnnounceFile, + Version::V1, + Encoding::SSZSnappy, + )], InboundRequest::GetChunks(_) => vec![ProtocolId::new( Protocol::GetChunks, Version::V1, @@ -380,6 +395,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => 0, InboundRequest::DataByHash(req) => req.hashes.len() as u64, InboundRequest::Ping(_) => 1, + InboundRequest::AnnounceFile(_) => 0, InboundRequest::GetChunks(_) => 1, } } @@ -391,6 +407,7 @@ impl InboundRequest { InboundRequest::Goodbye(_) => Protocol::Goodbye, InboundRequest::Ping(_) => Protocol::Ping, InboundRequest::DataByHash(_) => Protocol::DataByHash, + InboundRequest::AnnounceFile(_) => Protocol::AnnounceFile, InboundRequest::GetChunks(_) => Protocol::GetChunks, } } @@ -405,6 +422,7 @@ impl InboundRequest { InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), + InboundRequest::AnnounceFile(_) => unreachable!(), InboundRequest::GetChunks(_) => unreachable!(), } } @@ -523,6 +541,9 @@ impl std::fmt::Display for InboundRequest { InboundRequest::DataByHash(req) => { write!(f, "Data by hash: {:?}", req) } + InboundRequest::AnnounceFile(req) => { + write!(f, "Announce File: {:?}", req) + } InboundRequest::GetChunks(req) => { write!(f, "Get Chunks: {:?}", req) } diff --git a/node/network/src/rpc/rate_limiter.rs b/node/network/src/rpc/rate_limiter.rs index c213146..9c95282 100644 --- a/node/network/src/rpc/rate_limiter.rs +++ b/node/network/src/rpc/rate_limiter.rs @@ -68,6 +68,8 @@ pub struct RPCRateLimiter { status_rl: Limiter, /// DataByHash rate limiter. data_by_hash_rl: Limiter, + /// AnnounceFile rate limiter. + announce_file_rl: Limiter, /// GetChunks rate limiter. get_chunks_rl: Limiter, } @@ -91,6 +93,8 @@ pub struct RPCRateLimiterBuilder { status_quota: Option, /// Quota for the DataByHash protocol. data_by_hash_quota: Option, + /// Quota for the AnnounceFile protocol. + announce_file_quota: Option, /// Quota for the GetChunks protocol. get_chunks_quota: Option, } @@ -109,6 +113,7 @@ impl RPCRateLimiterBuilder { Protocol::Status => self.status_quota = q, Protocol::Goodbye => self.goodbye_quota = q, Protocol::DataByHash => self.data_by_hash_quota = q, + Protocol::AnnounceFile => self.announce_file_quota = q, Protocol::GetChunks => self.get_chunks_quota = q, } self @@ -145,6 +150,9 @@ impl RPCRateLimiterBuilder { let data_by_hash_quota = self .data_by_hash_quota .ok_or("DataByHash quota not specified")?; + let announce_file_quota = self + .announce_file_quota + .ok_or("AnnounceFile quota not specified")?; let get_chunks_quota = self .get_chunks_quota .ok_or("GetChunks quota not specified")?; @@ -154,6 +162,7 @@ impl RPCRateLimiterBuilder { let status_rl = Limiter::from_quota(status_quota)?; let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?; + let announce_file_rl = Limiter::from_quota(announce_file_quota)?; let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?; // check for peers to prune every 30 seconds, starting in 30 seconds @@ -166,6 +175,7 @@ impl RPCRateLimiterBuilder { status_rl, goodbye_rl, data_by_hash_rl, + announce_file_rl, get_chunks_rl, init_time: Instant::now(), }) @@ -210,6 +220,7 @@ impl RPCRateLimiter { Protocol::Status => &mut self.status_rl, Protocol::Goodbye => &mut self.goodbye_rl, Protocol::DataByHash => &mut self.data_by_hash_rl, + Protocol::AnnounceFile => &mut self.announce_file_rl, Protocol::GetChunks => &mut self.get_chunks_rl, }; check(limiter) diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 9dff174..5c28991 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -220,6 +220,13 @@ impl Libp2pEventHandler { }); metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1); } + Request::AnnounceFile(announcement) => { + self.send_to_sync(SyncMessage::AnnounceFile { + peer_id, + request_id, + announcement, + }); + } Request::DataByHash(_) => { // ignore } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 9c164a3..353b3ea 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, Result}; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; use log_entry_sync::LogSyncEvent; +use network::rpc::methods::FileAnnouncement; use network::types::{AnnounceChunks, FindFile, NewFile}; use network::PubsubMessage; use network::{ @@ -74,6 +75,11 @@ pub enum SyncMessage { from: PeerId, msg: NewFile, }, + AnnounceFile { + peer_id: PeerId, + request_id: PeerRequestId, + announcement: FileAnnouncement, + }, } #[derive(Debug)] @@ -270,6 +276,11 @@ impl SyncService { // FIXME: Check if controllers need to be reset? } SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await, + SyncMessage::AnnounceFile { + peer_id, + announcement, + .. + } => self.on_announce_file(peer_id, announcement).await, } } @@ -762,6 +773,7 @@ impl SyncService { } } + /// Handle on NewFile gossip message received. async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) { debug!(%from, ?msg, "Received NewFile gossip"); @@ -775,6 +787,16 @@ impl SyncService { } } + /// Handle on AnnounceFile RPC message received. + async fn on_announce_file(&mut self, _peer_id: PeerId, announcement: FileAnnouncement) { + if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) { + // Notify new peer found if file already in sync + // TODO qbit: do not require remote address since already TCP connected + // controller.on_peer_found(from, addr); + controller.transition(); + } + } + /// Terminate file sync of `min_tx_seq`. /// If `is_reverted` is `true` (means confirmed transactions reverted), /// also terminate `tx_seq` greater than `min_tx_seq`