From e92778b750e90a218884bda740a466ce2957aa64 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 22 Nov 2024 19:02:31 +0800 Subject: [PATCH] disconnect peer instead of ban peer if shard config mismatch --- node/network/src/lib.rs | 4 +++- node/network/src/peer_manager/mod.rs | 2 +- node/router/src/libp2p_event_handler.rs | 22 +++++++++---------- node/router/src/metrics.rs | 2 -- node/router/src/service.rs | 29 +++++++++++-------------- 5 files changed, 27 insertions(+), 32 deletions(-) diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 7b176c4..29a503f 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -152,6 +152,8 @@ pub enum NetworkMessage { }, /// Start dialing a new peer. DialPeer { address: Multiaddr, peer_id: PeerId }, + /// Disconnect a peer. + DisconnectPeer { peer_id: PeerId }, /// Notify that new file stored in db. AnnounceLocalFile { tx_id: TxID }, /// Called if a known external TCP socket address has been updated. @@ -167,5 +169,5 @@ pub type NetworkSender = channel::metrics::Sender; pub type NetworkReceiver = channel::metrics::Receiver; pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) { - channel::metrics::unbounded_channel("network") + channel::metrics::unbounded_channel("network_channel") } diff --git a/node/network/src/peer_manager/mod.rs b/node/network/src/peer_manager/mod.rs index 81d4806..d8b1211 100644 --- a/node/network/src/peer_manager/mod.rs +++ b/node/network/src/peer_manager/mod.rs @@ -701,7 +701,7 @@ impl PeerManager { } // Gracefully disconnects a peer without banning them. - fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); self.network_globals diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index c225999..146d1fe 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -257,15 +257,8 @@ impl Libp2pEventHandler { let network_id = self.network_globals.network_id(); let shard_config = self.store.get_store().get_shard_config(); - - if !self.verify_status_message(peer_id, status, network_id.clone(), &shard_config) { - return; - } - - self.send_to_sync(SyncMessage::PeerConnected { peer_id }); - let status_message = StatusMessage { - data: network_id, + data: network_id.clone(), num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, }; @@ -276,6 +269,10 @@ impl Libp2pEventHandler { id: request_id, response: Response::Status(status_message), }); + + if self.verify_status_message(peer_id, status, network_id, &shard_config) { + self.send_to_sync(SyncMessage::PeerConnected { peer_id }); + } } fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) { @@ -996,20 +993,21 @@ impl Libp2pEventHandler { } }; + self.file_location_cache + .insert_peer_config(peer_id, peer_shard_config); + if !peer_shard_config.intersect(shard_config) { info!(%peer_id, ?shard_config, ?status, "Report peer with mismatched shard config"); self.send_to_network(NetworkMessage::ReportPeer { peer_id, - action: PeerAction::Fatal, + action: PeerAction::LowToleranceError, source: ReportSource::RPC, msg: "Shard config mismatch in StatusMessage", }); + self.send_to_network(NetworkMessage::DisconnectPeer { peer_id }); return false; } - self.file_location_cache - .insert_peer_config(peer_id, peer_shard_config); - true } diff --git a/node/router/src/metrics.rs b/node/router/src/metrics.rs index 141d034..3e34466 100644 --- a/node/router/src/metrics.rs +++ b/node/router/src/metrics.rs @@ -19,8 +19,6 @@ lazy_static::lazy_static! { pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc = register_meter("router_service_route_network_message_upnp"); pub static ref SERVICE_EXPIRED_PEERS: Arc = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024); - pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_OK: Arc = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_ok", 1024); - pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024); // libp2p_event_handler diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 2b53153..ae1bfbf 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -5,6 +5,8 @@ use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use futures::{channel::mpsc::Sender, prelude::*}; use miner::MinerMessage; +use network::rpc::GoodbyeReason; +use network::PeerId; use network::{ types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm, @@ -330,6 +332,9 @@ impl RouterService { }; } } + NetworkMessage::DisconnectPeer { peer_id } => { + self.disconnect_peer(peer_id); + } NetworkMessage::AnnounceLocalFile { tx_id } => { let shard_config = self.store.get_shard_config(); let msg = PubsubMessage::NewFile(NewFile { @@ -399,24 +404,16 @@ impl RouterService { debug!(%num_expired_peers, "Heartbeat, remove expired peers") } - let mut num_succeeded = 0; - let mut num_failed = 0; for peer_id in expired_peers { - // async operation, once peer disconnected, swarm event `PeerDisconnected` - // will be polled to handle in advance. - match self.libp2p.swarm.disconnect_peer_id(peer_id) { - Ok(_) => { - debug!(%peer_id, "Peer expired and disconnect it"); - num_succeeded += 1; - } - Err(_) => { - debug!(%peer_id, "Peer expired but failed to disconnect"); - num_failed += 1; - } - } + self.disconnect_peer(peer_id); + } + } + + fn disconnect_peer(&mut self, peer_id: PeerId) { + let pm = self.libp2p.swarm.behaviour_mut().peer_manager_mut(); + if pm.is_connected(&peer_id) { + pm.disconnect_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } - metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_OK.update(num_succeeded); - metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL.update(num_failed); } }