mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-12 08:05:17 +00:00
disconnect peer instead of ban peer if shard config mismatch
This commit is contained in:
parent
922ffbe0e6
commit
e92778b750
@ -152,6 +152,8 @@ pub enum NetworkMessage {
|
||||
},
|
||||
/// Start dialing a new peer.
|
||||
DialPeer { address: Multiaddr, peer_id: PeerId },
|
||||
/// Disconnect a peer.
|
||||
DisconnectPeer { peer_id: PeerId },
|
||||
/// Notify that new file stored in db.
|
||||
AnnounceLocalFile { tx_id: TxID },
|
||||
/// Called if a known external TCP socket address has been updated.
|
||||
@ -167,5 +169,5 @@ pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
|
||||
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
|
||||
|
||||
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
|
||||
channel::metrics::unbounded_channel("network")
|
||||
channel::metrics::unbounded_channel("network_channel")
|
||||
}
|
||||
|
@ -701,7 +701,7 @@ impl PeerManager {
|
||||
}
|
||||
|
||||
// Gracefully disconnects a peer without banning them.
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||
pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||
self.events
|
||||
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
|
||||
self.network_globals
|
||||
|
@ -257,15 +257,8 @@ impl Libp2pEventHandler {
|
||||
|
||||
let network_id = self.network_globals.network_id();
|
||||
let shard_config = self.store.get_store().get_shard_config();
|
||||
|
||||
if !self.verify_status_message(peer_id, status, network_id.clone(), &shard_config) {
|
||||
return;
|
||||
}
|
||||
|
||||
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||
|
||||
let status_message = StatusMessage {
|
||||
data: network_id,
|
||||
data: network_id.clone(),
|
||||
num_shard: shard_config.num_shard,
|
||||
shard_id: shard_config.shard_id,
|
||||
};
|
||||
@ -276,6 +269,10 @@ impl Libp2pEventHandler {
|
||||
id: request_id,
|
||||
response: Response::Status(status_message),
|
||||
});
|
||||
|
||||
if self.verify_status_message(peer_id, status, network_id, &shard_config) {
|
||||
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||
}
|
||||
}
|
||||
|
||||
fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) {
|
||||
@ -996,20 +993,21 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
};
|
||||
|
||||
self.file_location_cache
|
||||
.insert_peer_config(peer_id, peer_shard_config);
|
||||
|
||||
if !peer_shard_config.intersect(shard_config) {
|
||||
info!(%peer_id, ?shard_config, ?status, "Report peer with mismatched shard config");
|
||||
self.send_to_network(NetworkMessage::ReportPeer {
|
||||
peer_id,
|
||||
action: PeerAction::Fatal,
|
||||
action: PeerAction::LowToleranceError,
|
||||
source: ReportSource::RPC,
|
||||
msg: "Shard config mismatch in StatusMessage",
|
||||
});
|
||||
self.send_to_network(NetworkMessage::DisconnectPeer { peer_id });
|
||||
return false;
|
||||
}
|
||||
|
||||
self.file_location_cache
|
||||
.insert_peer_config(peer_id, peer_shard_config);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
|
@ -19,8 +19,6 @@ lazy_static::lazy_static! {
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
||||
|
||||
pub static ref SERVICE_EXPIRED_PEERS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024);
|
||||
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_OK: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_ok", 1024);
|
||||
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024);
|
||||
|
||||
// libp2p_event_handler
|
||||
|
||||
|
@ -5,6 +5,8 @@ use chunk_pool::ChunkPoolMessage;
|
||||
use file_location_cache::FileLocationCache;
|
||||
use futures::{channel::mpsc::Sender, prelude::*};
|
||||
use miner::MinerMessage;
|
||||
use network::rpc::GoodbyeReason;
|
||||
use network::PeerId;
|
||||
use network::{
|
||||
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
|
||||
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
|
||||
@ -330,6 +332,9 @@ impl RouterService {
|
||||
};
|
||||
}
|
||||
}
|
||||
NetworkMessage::DisconnectPeer { peer_id } => {
|
||||
self.disconnect_peer(peer_id);
|
||||
}
|
||||
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
||||
let shard_config = self.store.get_shard_config();
|
||||
let msg = PubsubMessage::NewFile(NewFile {
|
||||
@ -399,24 +404,16 @@ impl RouterService {
|
||||
debug!(%num_expired_peers, "Heartbeat, remove expired peers")
|
||||
}
|
||||
|
||||
let mut num_succeeded = 0;
|
||||
let mut num_failed = 0;
|
||||
for peer_id in expired_peers {
|
||||
// async operation, once peer disconnected, swarm event `PeerDisconnected`
|
||||
// will be polled to handle in advance.
|
||||
match self.libp2p.swarm.disconnect_peer_id(peer_id) {
|
||||
Ok(_) => {
|
||||
debug!(%peer_id, "Peer expired and disconnect it");
|
||||
num_succeeded += 1;
|
||||
}
|
||||
Err(_) => {
|
||||
debug!(%peer_id, "Peer expired but failed to disconnect");
|
||||
num_failed += 1;
|
||||
self.disconnect_peer(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId) {
|
||||
let pm = self.libp2p.swarm.behaviour_mut().peer_manager_mut();
|
||||
if pm.is_connected(&peer_id) {
|
||||
pm.disconnect_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
|
||||
}
|
||||
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_OK.update(num_succeeded);
|
||||
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL.update(num_failed);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user