From b9e6431a4d0de2197360dd004427c433c55d357e Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:15:30 +0800 Subject: [PATCH] Add shard config in STATUS message and only dail to shard config matched peers (#285) * Add shard config in status message * verify shard config for status message * Notify peer connected to sync layer after status message exchanged * Do not dial to shard config mismatched peers * Upgrade network protocol version * disconnect peer instead of ban peer if shard config mismatch * Add python test for TCP connection by shard config --- node/network/src/lib.rs | 6 +- node/network/src/peer_manager/config.rs | 31 +++++++++- node/network/src/peer_manager/mod.rs | 10 +++- node/network/src/peer_manager/peerdb.rs | 6 +- node/network/src/rpc/codec/ssz_snappy.rs | 9 +-- node/network/src/rpc/methods.rs | 6 +- node/network/tests/rpc_tests.rs | 8 +-- node/router/src/libp2p_event_handler.rs | 65 ++++++++++++++++---- node/router/src/metrics.rs | 10 ++-- node/router/src/service.rs | 39 ++++++------ node/src/client/builder.rs | 14 ++++- node/src/config/convert.rs | 17 +++--- node/src/main.rs | 2 +- node/sync/src/controllers/peers.rs | 2 +- node/sync/src/controllers/serial.rs | 12 ++-- node/sync/src/service.rs | 12 ++-- run/config-testnet-standard.toml | 2 +- run/config-testnet-turbo.toml | 2 +- run/config.toml | 2 +- tests/network_tcp_shard_test.py | 76 ++++++++++++++++++++++++ 20 files changed, 244 insertions(+), 87 deletions(-) create mode 100644 tests/network_tcp_shard_test.py diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 426303b..29a503f 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -96,8 +96,10 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F /// Defines the current P2P protocol version. /// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey. /// - v2: Publish NewFile to neighbors only and announce file via RPC message. +/// - v3: Add shard config in Status message. pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1]; pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1]; +pub const PROTOCOL_VERSION_V3: [u8; 3] = [0, 3, 0]; /// Application level requests sent to the network. #[derive(Debug, Clone, Copy)] @@ -150,6 +152,8 @@ pub enum NetworkMessage { }, /// Start dialing a new peer. DialPeer { address: Multiaddr, peer_id: PeerId }, + /// Disconnect a peer. + DisconnectPeer { peer_id: PeerId }, /// Notify that new file stored in db. AnnounceLocalFile { tx_id: TxID }, /// Called if a known external TCP socket address has been updated. @@ -165,5 +169,5 @@ pub type NetworkSender = channel::metrics::Sender; pub type NetworkReceiver = channel::metrics::Receiver; pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) { - channel::metrics::unbounded_channel("network") + channel::metrics::unbounded_channel("network_channel") } diff --git a/node/network/src/peer_manager/config.rs b/node/network/src/peer_manager/config.rs index 5520601..b6e4489 100644 --- a/node/network/src/peer_manager/config.rs +++ b/node/network/src/peer_manager/config.rs @@ -1,6 +1,7 @@ -use std::time::Duration; +use std::{fmt::Debug, sync::Arc, time::Duration}; use duration_str::deserialize_duration; +use libp2p::PeerId; use serde::{Deserialize, Serialize}; /// The time in seconds between re-status's peers. @@ -16,7 +17,7 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20; pub const DEFAULT_TARGET_PEERS: usize = 50; /// Configurations for the PeerManager. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct Config { /* Peer count related configurations */ @@ -40,6 +41,9 @@ pub struct Config { pub ping_interval_inbound: u64, /// Interval between PING events for peers dialed by us. pub ping_interval_outbound: u64, + + #[serde(skip)] + pub filters: Filters, } impl Default for Config { @@ -52,6 +56,29 @@ impl Default for Config { status_interval: DEFAULT_STATUS_INTERVAL, ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND, ping_interval_outbound: DEFAULT_PING_INTERVAL_OUTBOUND, + filters: Default::default(), } } } +#[derive(Clone)] + +pub struct Filters { + /// Decide whether to dial to specified peer. + pub dial_peer_filter: Option bool + Sync + Send + 'static>>, +} + +impl Default for Filters { + fn default() -> Self { + Filters { + dial_peer_filter: None, + } + } +} + +impl Debug for Filters { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Filters") + .field("dial_peer_filter", &self.dial_peer_filter.is_some()) + .finish() + } +} diff --git a/node/network/src/peer_manager/mod.rs b/node/network/src/peer_manager/mod.rs index 9ee4936..7da167f 100644 --- a/node/network/src/peer_manager/mod.rs +++ b/node/network/src/peer_manager/mod.rs @@ -69,6 +69,8 @@ pub struct PeerManager { discovery_enabled: bool, /// Keeps track if the current instance is reporting metrics or not. metrics_enabled: bool, + + filters: config::Filters, } /// The events that the `PeerManager` outputs (requests). @@ -108,6 +110,7 @@ impl PeerManager { status_interval, ping_interval_inbound, ping_interval_outbound, + filters, } = cfg; // Set up the peer manager heartbeat interval @@ -123,6 +126,7 @@ impl PeerManager { heartbeat, discovery_enabled, metrics_enabled, + filters, }) } @@ -277,6 +281,10 @@ impl PeerManager { } } + if let Some(dial_peer_filter) = self.filters.dial_peer_filter.clone() { + to_dial_peers.retain(|peer_id| dial_peer_filter(peer_id)); + } + // Queue another discovery if we need to self.maintain_peer_count(to_dial_peers.len()); @@ -693,7 +701,7 @@ impl PeerManager { } // Gracefully disconnects a peer without banning them. - fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); self.network_globals diff --git a/node/network/src/peer_manager/peerdb.rs b/node/network/src/peer_manager/peerdb.rs index 04e0803..7d03021 100644 --- a/node/network/src/peer_manager/peerdb.rs +++ b/node/network/src/peer_manager/peerdb.rs @@ -35,7 +35,7 @@ pub struct PeerDBConfig { pub allowed_negative_gossipsub_factor: f32, /// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state. #[serde(deserialize_with = "deserialize_duration")] - pub dail_timeout: Duration, + pub dial_timeout: Duration, } impl Default for PeerDBConfig { @@ -45,7 +45,7 @@ impl Default for PeerDBConfig { max_banned_peers: 1000, banned_peers_per_ip_threshold: 5, allowed_negative_gossipsub_factor: 0.1, - dail_timeout: Duration::from_secs(15), + dial_timeout: Duration::from_secs(15), } } } @@ -339,7 +339,7 @@ impl PeerDB { .iter() .filter_map(|(peer_id, info)| { if let PeerConnectionStatus::Dialing { since } = info.connection_status() { - if (*since) + self.config.dail_timeout < std::time::Instant::now() { + if (*since) + self.config.dial_timeout < std::time::Instant::now() { return Some(*peer_id); } } diff --git a/node/network/src/rpc/codec/ssz_snappy.rs b/node/network/src/rpc/codec/ssz_snappy.rs index d0e9370..e35c197 100644 --- a/node/network/src/rpc/codec/ssz_snappy.rs +++ b/node/network/src/rpc/codec/ssz_snappy.rs @@ -399,9 +399,7 @@ mod tests { use std::io::Write; fn status_message() -> StatusMessage { - StatusMessage { - data: Default::default(), - } + Default::default() } fn ping_message() -> Ping { @@ -570,10 +568,7 @@ mod tests { assert_eq!(stream_identifier.len(), 10); // Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130. - let status_message_bytes = StatusMessage { - data: Default::default(), - } - .as_ssz_bytes(); + let status_message_bytes = StatusMessage::default().as_ssz_bytes(); let mut uvi_codec: Uvi = Uvi::default(); let mut dst = BytesMut::with_capacity(1024); diff --git a/node/network/src/rpc/methods.rs b/node/network/src/rpc/methods.rs index 554eb8b..8322492 100644 --- a/node/network/src/rpc/methods.rs +++ b/node/network/src/rpc/methods.rs @@ -69,9 +69,13 @@ impl ToString for ErrorType { /* Requests */ /// The STATUS request/response handshake message. -#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)] +#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq, Default)] pub struct StatusMessage { pub data: NetworkIdentity, + + // shard config + pub num_shard: usize, + pub shard_id: usize, } /// The PING request/response message. diff --git a/node/network/tests/rpc_tests.rs b/node/network/tests/rpc_tests.rs index 6c7feba..cb14c35 100644 --- a/node/network/tests/rpc_tests.rs +++ b/node/network/tests/rpc_tests.rs @@ -23,14 +23,10 @@ fn test_status_rpc() { let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt)).await; // Dummy STATUS RPC message - let rpc_request = Request::Status(StatusMessage { - data: Default::default(), - }); + let rpc_request = Request::Status(Default::default()); // Dummy STATUS RPC message - let rpc_response = Response::Status(StatusMessage { - data: Default::default(), - }); + let rpc_response = Response::Status(Default::default()); // build the sender future let sender_future = async { diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 9a69dee..146d1fe 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -172,8 +172,11 @@ impl Libp2pEventHandler { } pub fn send_status(&self, peer_id: PeerId) { + let shard_config = self.store.get_store().get_shard_config(); let status_message = StatusMessage { data: self.network_globals.network_id(), + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, }; debug!(%peer_id, ?status_message, "Sending Status request"); @@ -191,7 +194,6 @@ impl Libp2pEventHandler { if outgoing { self.send_status(peer_id); - self.send_to_sync(SyncMessage::PeerConnected { peer_id }); metrics::LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING.mark(1); } else { metrics::LIBP2P_HANDLE_PEER_CONNECTED_INCOMING.mark(1); @@ -254,8 +256,11 @@ impl Libp2pEventHandler { debug!(%peer_id, ?status, "Received Status request"); let network_id = self.network_globals.network_id(); + let shard_config = self.store.get_store().get_shard_config(); let status_message = StatusMessage { data: network_id.clone(), + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, }; debug!(%peer_id, ?status_message, "Sending Status response"); @@ -264,12 +269,18 @@ impl Libp2pEventHandler { id: request_id, response: Response::Status(status_message), }); - self.on_status_message(peer_id, status, network_id); + + if self.verify_status_message(peer_id, status, network_id, &shard_config) { + self.send_to_sync(SyncMessage::PeerConnected { peer_id }); + } } fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) { let network_id = self.network_globals.network_id(); - self.on_status_message(peer_id, status, network_id); + let shard_config = self.store.get_store().get_shard_config(); + if self.verify_status_message(peer_id, status, network_id, &shard_config) { + self.send_to_sync(SyncMessage::PeerConnected { peer_id }); + } } pub async fn on_rpc_response( @@ -950,21 +961,54 @@ impl Libp2pEventHandler { MessageAcceptance::Accept } - fn on_status_message( + fn verify_status_message( &self, peer_id: PeerId, status: StatusMessage, network_id: NetworkIdentity, - ) { + shard_config: &ShardConfig, + ) -> bool { if status.data != network_id { warn!(%peer_id, ?network_id, ?status.data, "Report peer with incompatible network id"); self.send_to_network(NetworkMessage::ReportPeer { peer_id, action: PeerAction::Fatal, - source: ReportSource::Gossipsub, + source: ReportSource::RPC, msg: "Incompatible network id in StatusMessage", - }) + }); + return false; } + + let peer_shard_config = match ShardConfig::new(status.shard_id, status.num_shard) { + Ok(v) => v, + Err(err) => { + warn!(%peer_id, ?status, ?err, "Report peer with invalid shard config"); + self.send_to_network(NetworkMessage::ReportPeer { + peer_id, + action: PeerAction::Fatal, + source: ReportSource::RPC, + msg: "Invalid shard config in StatusMessage", + }); + return false; + } + }; + + self.file_location_cache + .insert_peer_config(peer_id, peer_shard_config); + + if !peer_shard_config.intersect(shard_config) { + info!(%peer_id, ?shard_config, ?status, "Report peer with mismatched shard config"); + self.send_to_network(NetworkMessage::ReportPeer { + peer_id, + action: PeerAction::LowToleranceError, + source: ReportSource::RPC, + msg: "Shard config mismatch in StatusMessage", + }); + self.send_to_network(NetworkMessage::DisconnectPeer { peer_id }); + return false; + } + + true } async fn publish_file(&self, tx_id: TxID) -> Option { @@ -1184,10 +1228,7 @@ mod tests { assert_eq!(handler.peers.read().await.size(), 1); ctx.assert_status_request(alice); - assert!(matches!( - ctx.sync_recv.try_recv(), - Ok(Notification(SyncMessage::PeerConnected {peer_id})) if peer_id == alice - )); + assert!(matches!(ctx.sync_recv.try_recv(), Err(TryRecvError::Empty))); } #[tokio::test] @@ -1216,6 +1257,8 @@ mod tests { let req_id = (ConnectionId::new(4), SubstreamId(12)); let request = Request::Status(StatusMessage { data: Default::default(), + num_shard: 1, + shard_id: 0, }); handler.on_rpc_request(alice, req_id, request).await; diff --git a/node/router/src/metrics.rs b/node/router/src/metrics.rs index 141d034..979388c 100644 --- a/node/router/src/metrics.rs +++ b/node/router/src/metrics.rs @@ -11,16 +11,14 @@ lazy_static::lazy_static! { pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc = register_meter("router_service_route_network_message_publish"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc = register_meter("router_service_route_network_message_report_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc = register_meter("router_service_route_network_message_goodbye_peer"); - pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc = register_meter_with_group("router_service_route_network_message_dail_peer", "all"); - pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc = register_meter_with_group("router_service_route_network_message_dail_peer", "already"); - pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc = register_meter_with_group("router_service_route_network_message_dail_peer", "ok"); - pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc = register_meter_with_group("router_service_route_network_message_dail_peer", "fail"); + pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER: Arc = register_meter_with_group("router_service_route_network_message_dial_peer", "all"); + pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY: Arc = register_meter_with_group("router_service_route_network_message_dial_peer", "already"); + pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK: Arc = register_meter_with_group("router_service_route_network_message_dial_peer", "ok"); + pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL: Arc = register_meter_with_group("router_service_route_network_message_dial_peer", "fail"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc = register_meter("router_service_route_network_message_announce_local_file"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc = register_meter("router_service_route_network_message_upnp"); pub static ref SERVICE_EXPIRED_PEERS: Arc = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024); - pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_OK: Arc = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_ok", 1024); - pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024); // libp2p_event_handler diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 2b53153..8b46a41 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -5,6 +5,8 @@ use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use futures::{channel::mpsc::Sender, prelude::*}; use miner::MinerMessage; +use network::rpc::GoodbyeReason; +use network::PeerId; use network::{ types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm, @@ -309,27 +311,30 @@ impl RouterService { metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1); } NetworkMessage::DialPeer { address, peer_id } => { - metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1); + metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER.mark(1); if self.libp2p.swarm.is_connected(&peer_id) { self.libp2p_event_handler .send_to_sync(SyncMessage::PeerConnected { peer_id }); - metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1); + metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY.mark(1); } else { match Swarm::dial(&mut self.libp2p.swarm, address.clone()) { Ok(()) => { debug!(%address, "Dialing libp2p peer"); - metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1); + metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK.mark(1); } Err(err) => { info!(%address, error = ?err, "Failed to dial peer"); self.libp2p_event_handler - .send_to_sync(SyncMessage::DailFailed { peer_id, err }); - metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1); + .send_to_sync(SyncMessage::DialFailed { peer_id, err }); + metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL.mark(1); } }; } } + NetworkMessage::DisconnectPeer { peer_id } => { + self.disconnect_peer(peer_id); + } NetworkMessage::AnnounceLocalFile { tx_id } => { let shard_config = self.store.get_shard_config(); let msg = PubsubMessage::NewFile(NewFile { @@ -399,24 +404,16 @@ impl RouterService { debug!(%num_expired_peers, "Heartbeat, remove expired peers") } - let mut num_succeeded = 0; - let mut num_failed = 0; for peer_id in expired_peers { - // async operation, once peer disconnected, swarm event `PeerDisconnected` - // will be polled to handle in advance. - match self.libp2p.swarm.disconnect_peer_id(peer_id) { - Ok(_) => { - debug!(%peer_id, "Peer expired and disconnect it"); - num_succeeded += 1; - } - Err(_) => { - debug!(%peer_id, "Peer expired but failed to disconnect"); - num_failed += 1; - } - } + self.disconnect_peer(peer_id); + } + } + + fn disconnect_peer(&mut self, peer_id: PeerId) { + let pm = self.libp2p.swarm.behaviour_mut().peer_manager_mut(); + if pm.is_connected(&peer_id) { + pm.disconnect_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } - metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_OK.update(num_succeeded); - metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL.update(num_failed); } } diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 7d345c8..2544238 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -134,11 +134,21 @@ impl ClientBuilder { } /// Starts the networking stack. - pub async fn with_network(mut self, config: &NetworkConfig) -> Result { + pub async fn with_network(mut self, mut config: NetworkConfig) -> Result { let executor = require!("network", self, runtime_context).clone().executor; + let store = require!("network", self, store).clone(); + let file_location_cache = require!("network", self, file_location_cache).clone(); + + // only dial to peers that shard config matched + config.peer_manager.filters.dial_peer_filter = Some(Arc::new(move |peer_id| { + match file_location_cache.get_peer_config(peer_id) { + Some(v) => store.get_shard_config().intersect(&v), + None => true, + } + })); // construct the libp2p service context - let service_context = network::Context { config }; + let service_context = network::Context { config: &config }; // construct communication channel let (send, recv) = new_network_channel(); diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 67346a2..660fdfd 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -39,18 +39,17 @@ impl ZgsConfig { .await .map_err(|e| format!("Unable to get chain id: {:?}", e))? .as_u64(); - let network_protocol_version = if self.sync.neighbors_only { - network::PROTOCOL_VERSION_V2 - } else { - network::PROTOCOL_VERSION_V1 - }; let local_network_id = NetworkIdentity { chain_id, flow_address, p2p_protocol_version: ProtocolVersion { - major: network_protocol_version[0], - minor: network_protocol_version[1], - build: network_protocol_version[2], + major: network::PROTOCOL_VERSION_V3[0], + minor: network::PROTOCOL_VERSION_V3[1], + build: if self.sync.neighbors_only { + network::PROTOCOL_VERSION_V3[2] + 1 + } else { + network::PROTOCOL_VERSION_V3[2] + }, }, }; network_config.network_id = local_network_id.clone(); @@ -110,7 +109,7 @@ impl ZgsConfig { network_config.private = self.network_private; network_config.peer_db = self.network_peer_db; - network_config.peer_manager = self.network_peer_manager; + network_config.peer_manager = self.network_peer_manager.clone(); network_config.disable_enr_network_id = self.discv5_disable_enr_network_id; Ok(network_config) diff --git a/node/src/main.rs b/node/src/main.rs index 29db29e..9c42046 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -26,7 +26,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result { if let Some(true) = self.peers .update_state(&peer_id, PeerState::Connecting, PeerState::Found) { - info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation"); + info!(%self.tx_seq, %peer_id, "Failed to dial peer due to outgoing connection limitation"); self.state = SyncState::AwaitingOutgoingConnection { since: Instant::now().into(), }; @@ -377,7 +377,7 @@ impl SerialSyncController { PeerState::Connecting, PeerState::Disconnected, ) { - info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer"); + info!(%self.tx_seq, %peer_id, %err, "Failed to dial peer"); self.state = SyncState::Idle; } } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 35bed45..a913426 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -33,7 +33,7 @@ pub type SyncReceiver = channel::Receiver { - self.on_dail_failed(peer_id, err); + SyncMessage::DialFailed { peer_id, err } => { + self.on_dial_failed(peer_id, err); } SyncMessage::PeerConnected { peer_id } => { self.on_peer_connected(peer_id); @@ -369,11 +369,11 @@ impl SyncService { } } - fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) { - info!(%peer_id, ?err, "Dail to peer failed"); + fn on_dial_failed(&mut self, peer_id: PeerId, err: DialError) { + info!(%peer_id, ?err, "Dial to peer failed"); for controller in self.controllers.values_mut() { - controller.on_dail_failed(peer_id, &err); + controller.on_dial_failed(peer_id, &err); controller.transition(); } } diff --git a/run/config-testnet-standard.toml b/run/config-testnet-standard.toml index fce26c6..d39b57d 100644 --- a/run/config-testnet-standard.toml +++ b/run/config-testnet-standard.toml @@ -245,7 +245,7 @@ neighbors_only = true # Maximum number of continous failures to terminate a file sync. # max_request_failures = 5 -# Timeout to dail peers. +# Timeout to dial peers. # peer_connect_timeout = "15s" # Timeout to disconnect peers. diff --git a/run/config-testnet-turbo.toml b/run/config-testnet-turbo.toml index 8db9aab..f3083cc 100644 --- a/run/config-testnet-turbo.toml +++ b/run/config-testnet-turbo.toml @@ -257,7 +257,7 @@ neighbors_only = true # Maximum number of continous failures to terminate a file sync. # max_request_failures = 5 -# Timeout to dail peers. +# Timeout to dial peers. # peer_connect_timeout = "15s" # Timeout to disconnect peers. diff --git a/run/config.toml b/run/config.toml index 193f989..fe7ab12 100644 --- a/run/config.toml +++ b/run/config.toml @@ -259,7 +259,7 @@ neighbors_only = true # Maximum number of continous failures to terminate a file sync. # max_request_failures = 5 -# Timeout to dail peers. +# Timeout to dial peers. # peer_connect_timeout = "15s" # Timeout to disconnect peers. diff --git a/tests/network_tcp_shard_test.py b/tests/network_tcp_shard_test.py new file mode 100644 index 0000000..8c3896b --- /dev/null +++ b/tests/network_tcp_shard_test.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +import os +import time + +from config.node_config import ZGS_KEY_FILE, ZGS_NODEID +from test_framework.test_framework import TestFramework +from utility.utils import p2p_port + +class NetworkTcpShardTest(TestFramework): + """ + This is to test TCP connection for shard config mismatched peers of UDP discovery. + """ + + def setup_params(self): + # 1 bootnode and 2 community nodes + self.num_nodes = 3 + + # setup for node 0 as bootnode + self.zgs_node_key_files = [ZGS_KEY_FILE] + bootnode_port = p2p_port(0) + self.zgs_node_configs[0] = { + # enable UDP discovery relevant configs + "network_enr_address": "127.0.0.1", + "network_enr_tcp_port": bootnode_port, + "network_enr_udp_port": bootnode_port, + + # disable trusted nodes + "network_libp2p_nodes": [], + + # custom shard config + "shard_position": "0/4" + } + + # setup node 1 & 2 as community nodes + bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"] + for i in range(1, self.num_nodes): + self.zgs_node_configs[i] = { + # enable UDP discovery relevant configs + "network_enr_address": "127.0.0.1", + "network_enr_tcp_port": p2p_port(i), + "network_enr_udp_port": p2p_port(i), + + # disable trusted nodes and enable bootnodes + "network_libp2p_nodes": [], + "network_boot_nodes": bootnodes, + + # custom shard config + "shard_position": f"{i}/4" + } + + def run_test(self): + timeout_secs = 10 + + for iter in range(timeout_secs): + time.sleep(1) + self.log.info("==================================== iter %s", iter) + + for i in range(self.num_nodes): + info = self.nodes[i].rpc.admin_getNetworkInfo() + self.log.info( + "Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)", + i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"], + ) + + if i == timeout_secs - 1: + assert info["totalPeers"] == self.num_nodes - 1 + assert info["bannedPeers"] == 0 + assert info["disconnectedPeers"] == self.num_nodes - 1 + assert info["connectedPeers"] == 0 + + self.log.info("====================================") + self.log.info("All nodes discovered but not connected for each other") + +if __name__ == "__main__": + NetworkTcpShardTest().main()