mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-23 13:36:08 +00:00
Add shard config in STATUS message and only dail to shard config matched peers (#285)
* Add shard config in status message * verify shard config for status message * Notify peer connected to sync layer after status message exchanged * Do not dial to shard config mismatched peers * Upgrade network protocol version * disconnect peer instead of ban peer if shard config mismatch * Add python test for TCP connection by shard config
This commit is contained in:
parent
bfe434972d
commit
b9e6431a4d
@ -96,8 +96,10 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
|
||||
/// Defines the current P2P protocol version.
|
||||
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
|
||||
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
|
||||
/// - v3: Add shard config in Status message.
|
||||
pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1];
|
||||
pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1];
|
||||
pub const PROTOCOL_VERSION_V3: [u8; 3] = [0, 3, 0];
|
||||
|
||||
/// Application level requests sent to the network.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@ -150,6 +152,8 @@ pub enum NetworkMessage {
|
||||
},
|
||||
/// Start dialing a new peer.
|
||||
DialPeer { address: Multiaddr, peer_id: PeerId },
|
||||
/// Disconnect a peer.
|
||||
DisconnectPeer { peer_id: PeerId },
|
||||
/// Notify that new file stored in db.
|
||||
AnnounceLocalFile { tx_id: TxID },
|
||||
/// Called if a known external TCP socket address has been updated.
|
||||
@ -165,5 +169,5 @@ pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
|
||||
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
|
||||
|
||||
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
|
||||
channel::metrics::unbounded_channel("network")
|
||||
channel::metrics::unbounded_channel("network_channel")
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::time::Duration;
|
||||
use std::{fmt::Debug, sync::Arc, time::Duration};
|
||||
|
||||
use duration_str::deserialize_duration;
|
||||
use libp2p::PeerId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// The time in seconds between re-status's peers.
|
||||
@ -16,7 +17,7 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20;
|
||||
pub const DEFAULT_TARGET_PEERS: usize = 50;
|
||||
|
||||
/// Configurations for the PeerManager.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct Config {
|
||||
/* Peer count related configurations */
|
||||
@ -40,6 +41,9 @@ pub struct Config {
|
||||
pub ping_interval_inbound: u64,
|
||||
/// Interval between PING events for peers dialed by us.
|
||||
pub ping_interval_outbound: u64,
|
||||
|
||||
#[serde(skip)]
|
||||
pub filters: Filters,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@ -52,6 +56,29 @@ impl Default for Config {
|
||||
status_interval: DEFAULT_STATUS_INTERVAL,
|
||||
ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND,
|
||||
ping_interval_outbound: DEFAULT_PING_INTERVAL_OUTBOUND,
|
||||
filters: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Clone)]
|
||||
|
||||
pub struct Filters {
|
||||
/// Decide whether to dial to specified peer.
|
||||
pub dial_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
|
||||
}
|
||||
|
||||
impl Default for Filters {
|
||||
fn default() -> Self {
|
||||
Filters {
|
||||
dial_peer_filter: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Filters {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Filters")
|
||||
.field("dial_peer_filter", &self.dial_peer_filter.is_some())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
@ -69,6 +69,8 @@ pub struct PeerManager {
|
||||
discovery_enabled: bool,
|
||||
/// Keeps track if the current instance is reporting metrics or not.
|
||||
metrics_enabled: bool,
|
||||
|
||||
filters: config::Filters,
|
||||
}
|
||||
|
||||
/// The events that the `PeerManager` outputs (requests).
|
||||
@ -108,6 +110,7 @@ impl PeerManager {
|
||||
status_interval,
|
||||
ping_interval_inbound,
|
||||
ping_interval_outbound,
|
||||
filters,
|
||||
} = cfg;
|
||||
|
||||
// Set up the peer manager heartbeat interval
|
||||
@ -123,6 +126,7 @@ impl PeerManager {
|
||||
heartbeat,
|
||||
discovery_enabled,
|
||||
metrics_enabled,
|
||||
filters,
|
||||
})
|
||||
}
|
||||
|
||||
@ -277,6 +281,10 @@ impl PeerManager {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(dial_peer_filter) = self.filters.dial_peer_filter.clone() {
|
||||
to_dial_peers.retain(|peer_id| dial_peer_filter(peer_id));
|
||||
}
|
||||
|
||||
// Queue another discovery if we need to
|
||||
self.maintain_peer_count(to_dial_peers.len());
|
||||
|
||||
@ -693,7 +701,7 @@ impl PeerManager {
|
||||
}
|
||||
|
||||
// Gracefully disconnects a peer without banning them.
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||
pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||
self.events
|
||||
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
|
||||
self.network_globals
|
||||
|
@ -35,7 +35,7 @@ pub struct PeerDBConfig {
|
||||
pub allowed_negative_gossipsub_factor: f32,
|
||||
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
pub dail_timeout: Duration,
|
||||
pub dial_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for PeerDBConfig {
|
||||
@ -45,7 +45,7 @@ impl Default for PeerDBConfig {
|
||||
max_banned_peers: 1000,
|
||||
banned_peers_per_ip_threshold: 5,
|
||||
allowed_negative_gossipsub_factor: 0.1,
|
||||
dail_timeout: Duration::from_secs(15),
|
||||
dial_timeout: Duration::from_secs(15),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -339,7 +339,7 @@ impl PeerDB {
|
||||
.iter()
|
||||
.filter_map(|(peer_id, info)| {
|
||||
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
|
||||
if (*since) + self.config.dail_timeout < std::time::Instant::now() {
|
||||
if (*since) + self.config.dial_timeout < std::time::Instant::now() {
|
||||
return Some(*peer_id);
|
||||
}
|
||||
}
|
||||
|
@ -399,9 +399,7 @@ mod tests {
|
||||
use std::io::Write;
|
||||
|
||||
fn status_message() -> StatusMessage {
|
||||
StatusMessage {
|
||||
data: Default::default(),
|
||||
}
|
||||
Default::default()
|
||||
}
|
||||
|
||||
fn ping_message() -> Ping {
|
||||
@ -570,10 +568,7 @@ mod tests {
|
||||
assert_eq!(stream_identifier.len(), 10);
|
||||
|
||||
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
|
||||
let status_message_bytes = StatusMessage {
|
||||
data: Default::default(),
|
||||
}
|
||||
.as_ssz_bytes();
|
||||
let status_message_bytes = StatusMessage::default().as_ssz_bytes();
|
||||
|
||||
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
||||
let mut dst = BytesMut::with_capacity(1024);
|
||||
|
@ -69,9 +69,13 @@ impl ToString for ErrorType {
|
||||
/* Requests */
|
||||
|
||||
/// The STATUS request/response handshake message.
|
||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq, Default)]
|
||||
pub struct StatusMessage {
|
||||
pub data: NetworkIdentity,
|
||||
|
||||
// shard config
|
||||
pub num_shard: usize,
|
||||
pub shard_id: usize,
|
||||
}
|
||||
|
||||
/// The PING request/response message.
|
||||
|
@ -23,14 +23,10 @@ fn test_status_rpc() {
|
||||
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt)).await;
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_request = Request::Status(StatusMessage {
|
||||
data: Default::default(),
|
||||
});
|
||||
let rpc_request = Request::Status(Default::default());
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_response = Response::Status(StatusMessage {
|
||||
data: Default::default(),
|
||||
});
|
||||
let rpc_response = Response::Status(Default::default());
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
|
@ -172,8 +172,11 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
|
||||
pub fn send_status(&self, peer_id: PeerId) {
|
||||
let shard_config = self.store.get_store().get_shard_config();
|
||||
let status_message = StatusMessage {
|
||||
data: self.network_globals.network_id(),
|
||||
num_shard: shard_config.num_shard,
|
||||
shard_id: shard_config.shard_id,
|
||||
};
|
||||
debug!(%peer_id, ?status_message, "Sending Status request");
|
||||
|
||||
@ -191,7 +194,6 @@ impl Libp2pEventHandler {
|
||||
|
||||
if outgoing {
|
||||
self.send_status(peer_id);
|
||||
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||
metrics::LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING.mark(1);
|
||||
} else {
|
||||
metrics::LIBP2P_HANDLE_PEER_CONNECTED_INCOMING.mark(1);
|
||||
@ -254,8 +256,11 @@ impl Libp2pEventHandler {
|
||||
debug!(%peer_id, ?status, "Received Status request");
|
||||
|
||||
let network_id = self.network_globals.network_id();
|
||||
let shard_config = self.store.get_store().get_shard_config();
|
||||
let status_message = StatusMessage {
|
||||
data: network_id.clone(),
|
||||
num_shard: shard_config.num_shard,
|
||||
shard_id: shard_config.shard_id,
|
||||
};
|
||||
debug!(%peer_id, ?status_message, "Sending Status response");
|
||||
|
||||
@ -264,12 +269,18 @@ impl Libp2pEventHandler {
|
||||
id: request_id,
|
||||
response: Response::Status(status_message),
|
||||
});
|
||||
self.on_status_message(peer_id, status, network_id);
|
||||
|
||||
if self.verify_status_message(peer_id, status, network_id, &shard_config) {
|
||||
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||
}
|
||||
}
|
||||
|
||||
fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) {
|
||||
let network_id = self.network_globals.network_id();
|
||||
self.on_status_message(peer_id, status, network_id);
|
||||
let shard_config = self.store.get_store().get_shard_config();
|
||||
if self.verify_status_message(peer_id, status, network_id, &shard_config) {
|
||||
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn on_rpc_response(
|
||||
@ -950,21 +961,54 @@ impl Libp2pEventHandler {
|
||||
MessageAcceptance::Accept
|
||||
}
|
||||
|
||||
fn on_status_message(
|
||||
fn verify_status_message(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
status: StatusMessage,
|
||||
network_id: NetworkIdentity,
|
||||
) {
|
||||
shard_config: &ShardConfig,
|
||||
) -> bool {
|
||||
if status.data != network_id {
|
||||
warn!(%peer_id, ?network_id, ?status.data, "Report peer with incompatible network id");
|
||||
self.send_to_network(NetworkMessage::ReportPeer {
|
||||
peer_id,
|
||||
action: PeerAction::Fatal,
|
||||
source: ReportSource::Gossipsub,
|
||||
source: ReportSource::RPC,
|
||||
msg: "Incompatible network id in StatusMessage",
|
||||
})
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
let peer_shard_config = match ShardConfig::new(status.shard_id, status.num_shard) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
warn!(%peer_id, ?status, ?err, "Report peer with invalid shard config");
|
||||
self.send_to_network(NetworkMessage::ReportPeer {
|
||||
peer_id,
|
||||
action: PeerAction::Fatal,
|
||||
source: ReportSource::RPC,
|
||||
msg: "Invalid shard config in StatusMessage",
|
||||
});
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
self.file_location_cache
|
||||
.insert_peer_config(peer_id, peer_shard_config);
|
||||
|
||||
if !peer_shard_config.intersect(shard_config) {
|
||||
info!(%peer_id, ?shard_config, ?status, "Report peer with mismatched shard config");
|
||||
self.send_to_network(NetworkMessage::ReportPeer {
|
||||
peer_id,
|
||||
action: PeerAction::LowToleranceError,
|
||||
source: ReportSource::RPC,
|
||||
msg: "Shard config mismatch in StatusMessage",
|
||||
});
|
||||
self.send_to_network(NetworkMessage::DisconnectPeer { peer_id });
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
||||
@ -1184,10 +1228,7 @@ mod tests {
|
||||
|
||||
assert_eq!(handler.peers.read().await.size(), 1);
|
||||
ctx.assert_status_request(alice);
|
||||
assert!(matches!(
|
||||
ctx.sync_recv.try_recv(),
|
||||
Ok(Notification(SyncMessage::PeerConnected {peer_id})) if peer_id == alice
|
||||
));
|
||||
assert!(matches!(ctx.sync_recv.try_recv(), Err(TryRecvError::Empty)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -1216,6 +1257,8 @@ mod tests {
|
||||
let req_id = (ConnectionId::new(4), SubstreamId(12));
|
||||
let request = Request::Status(StatusMessage {
|
||||
data: Default::default(),
|
||||
num_shard: 1,
|
||||
shard_id: 0,
|
||||
});
|
||||
handler.on_rpc_request(alice, req_id, request).await;
|
||||
|
||||
|
@ -11,16 +11,14 @@ lazy_static::lazy_static! {
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "all");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "already");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "ok");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "fail");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
||||
|
||||
pub static ref SERVICE_EXPIRED_PEERS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024);
|
||||
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_OK: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_ok", 1024);
|
||||
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024);
|
||||
|
||||
// libp2p_event_handler
|
||||
|
||||
|
@ -5,6 +5,8 @@ use chunk_pool::ChunkPoolMessage;
|
||||
use file_location_cache::FileLocationCache;
|
||||
use futures::{channel::mpsc::Sender, prelude::*};
|
||||
use miner::MinerMessage;
|
||||
use network::rpc::GoodbyeReason;
|
||||
use network::PeerId;
|
||||
use network::{
|
||||
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
|
||||
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
|
||||
@ -309,27 +311,30 @@ impl RouterService {
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
|
||||
}
|
||||
NetworkMessage::DialPeer { address, peer_id } => {
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1);
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER.mark(1);
|
||||
|
||||
if self.libp2p.swarm.is_connected(&peer_id) {
|
||||
self.libp2p_event_handler
|
||||
.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1);
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY.mark(1);
|
||||
} else {
|
||||
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
|
||||
Ok(()) => {
|
||||
debug!(%address, "Dialing libp2p peer");
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1);
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK.mark(1);
|
||||
}
|
||||
Err(err) => {
|
||||
info!(%address, error = ?err, "Failed to dial peer");
|
||||
self.libp2p_event_handler
|
||||
.send_to_sync(SyncMessage::DailFailed { peer_id, err });
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1);
|
||||
.send_to_sync(SyncMessage::DialFailed { peer_id, err });
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL.mark(1);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
NetworkMessage::DisconnectPeer { peer_id } => {
|
||||
self.disconnect_peer(peer_id);
|
||||
}
|
||||
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
||||
let shard_config = self.store.get_shard_config();
|
||||
let msg = PubsubMessage::NewFile(NewFile {
|
||||
@ -399,24 +404,16 @@ impl RouterService {
|
||||
debug!(%num_expired_peers, "Heartbeat, remove expired peers")
|
||||
}
|
||||
|
||||
let mut num_succeeded = 0;
|
||||
let mut num_failed = 0;
|
||||
for peer_id in expired_peers {
|
||||
// async operation, once peer disconnected, swarm event `PeerDisconnected`
|
||||
// will be polled to handle in advance.
|
||||
match self.libp2p.swarm.disconnect_peer_id(peer_id) {
|
||||
Ok(_) => {
|
||||
debug!(%peer_id, "Peer expired and disconnect it");
|
||||
num_succeeded += 1;
|
||||
}
|
||||
Err(_) => {
|
||||
debug!(%peer_id, "Peer expired but failed to disconnect");
|
||||
num_failed += 1;
|
||||
}
|
||||
}
|
||||
self.disconnect_peer(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId) {
|
||||
let pm = self.libp2p.swarm.behaviour_mut().peer_manager_mut();
|
||||
if pm.is_connected(&peer_id) {
|
||||
pm.disconnect_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
|
||||
}
|
||||
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_OK.update(num_succeeded);
|
||||
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL.update(num_failed);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -134,11 +134,21 @@ impl ClientBuilder {
|
||||
}
|
||||
|
||||
/// Starts the networking stack.
|
||||
pub async fn with_network(mut self, config: &NetworkConfig) -> Result<Self, String> {
|
||||
pub async fn with_network(mut self, mut config: NetworkConfig) -> Result<Self, String> {
|
||||
let executor = require!("network", self, runtime_context).clone().executor;
|
||||
let store = require!("network", self, store).clone();
|
||||
let file_location_cache = require!("network", self, file_location_cache).clone();
|
||||
|
||||
// only dial to peers that shard config matched
|
||||
config.peer_manager.filters.dial_peer_filter = Some(Arc::new(move |peer_id| {
|
||||
match file_location_cache.get_peer_config(peer_id) {
|
||||
Some(v) => store.get_shard_config().intersect(&v),
|
||||
None => true,
|
||||
}
|
||||
}));
|
||||
|
||||
// construct the libp2p service context
|
||||
let service_context = network::Context { config };
|
||||
let service_context = network::Context { config: &config };
|
||||
|
||||
// construct communication channel
|
||||
let (send, recv) = new_network_channel();
|
||||
|
@ -39,18 +39,17 @@ impl ZgsConfig {
|
||||
.await
|
||||
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
||||
.as_u64();
|
||||
let network_protocol_version = if self.sync.neighbors_only {
|
||||
network::PROTOCOL_VERSION_V2
|
||||
} else {
|
||||
network::PROTOCOL_VERSION_V1
|
||||
};
|
||||
let local_network_id = NetworkIdentity {
|
||||
chain_id,
|
||||
flow_address,
|
||||
p2p_protocol_version: ProtocolVersion {
|
||||
major: network_protocol_version[0],
|
||||
minor: network_protocol_version[1],
|
||||
build: network_protocol_version[2],
|
||||
major: network::PROTOCOL_VERSION_V3[0],
|
||||
minor: network::PROTOCOL_VERSION_V3[1],
|
||||
build: if self.sync.neighbors_only {
|
||||
network::PROTOCOL_VERSION_V3[2] + 1
|
||||
} else {
|
||||
network::PROTOCOL_VERSION_V3[2]
|
||||
},
|
||||
},
|
||||
};
|
||||
network_config.network_id = local_network_id.clone();
|
||||
@ -110,7 +109,7 @@ impl ZgsConfig {
|
||||
network_config.private = self.network_private;
|
||||
|
||||
network_config.peer_db = self.network_peer_db;
|
||||
network_config.peer_manager = self.network_peer_manager;
|
||||
network_config.peer_manager = self.network_peer_manager.clone();
|
||||
network_config.disable_enr_network_id = self.discv5_disable_enr_network_id;
|
||||
|
||||
Ok(network_config)
|
||||
|
@ -26,7 +26,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
.with_log_sync(log_sync_config)
|
||||
.await?
|
||||
.with_file_location_cache(config.file_location_cache)
|
||||
.with_network(&network_config)
|
||||
.with_network(network_config)
|
||||
.await?
|
||||
.with_chunk_pool(chunk_pool_config)
|
||||
.await?
|
||||
|
@ -194,7 +194,7 @@ impl SyncPeers {
|
||||
ctx.report_peer(
|
||||
*peer_id,
|
||||
PeerAction::LowToleranceError,
|
||||
"Dail timeout",
|
||||
"Dial timeout",
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -232,7 +232,7 @@ impl SerialSyncController {
|
||||
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
|
||||
/// data in all shards.
|
||||
fn try_connect(&mut self) {
|
||||
let mut num_peers_dailed = 0;
|
||||
let mut num_peers_dialed = 0;
|
||||
|
||||
// select a random peer
|
||||
while !self
|
||||
@ -256,10 +256,10 @@ impl SerialSyncController {
|
||||
self.peers
|
||||
.update_state(&peer_id, PeerState::Found, PeerState::Connecting);
|
||||
|
||||
num_peers_dailed += 1;
|
||||
num_peers_dialed += 1;
|
||||
}
|
||||
|
||||
info!(%self.tx_seq, %num_peers_dailed, "Connecting peers");
|
||||
info!(%self.tx_seq, %num_peers_dialed, "Connecting peers");
|
||||
|
||||
self.state = SyncState::ConnectingPeers {
|
||||
origin: self.since,
|
||||
@ -358,14 +358,14 @@ impl SerialSyncController {
|
||||
.update_state_force(&peer_id, PeerState::Connected);
|
||||
}
|
||||
|
||||
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
||||
pub fn on_dial_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
||||
match err {
|
||||
DialError::ConnectionLimit(_) => {
|
||||
if let Some(true) =
|
||||
self.peers
|
||||
.update_state(&peer_id, PeerState::Connecting, PeerState::Found)
|
||||
{
|
||||
info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation");
|
||||
info!(%self.tx_seq, %peer_id, "Failed to dial peer due to outgoing connection limitation");
|
||||
self.state = SyncState::AwaitingOutgoingConnection {
|
||||
since: Instant::now().into(),
|
||||
};
|
||||
@ -377,7 +377,7 @@ impl SerialSyncController {
|
||||
PeerState::Connecting,
|
||||
PeerState::Disconnected,
|
||||
) {
|
||||
info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer");
|
||||
info!(%self.tx_seq, %peer_id, %err, "Failed to dial peer");
|
||||
self.state = SyncState::Idle;
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SyncMessage {
|
||||
DailFailed {
|
||||
DialFailed {
|
||||
peer_id: PeerId,
|
||||
err: DialError,
|
||||
},
|
||||
@ -227,8 +227,8 @@ impl SyncService {
|
||||
trace!("Sync received message {:?}", msg);
|
||||
|
||||
match msg {
|
||||
SyncMessage::DailFailed { peer_id, err } => {
|
||||
self.on_dail_failed(peer_id, err);
|
||||
SyncMessage::DialFailed { peer_id, err } => {
|
||||
self.on_dial_failed(peer_id, err);
|
||||
}
|
||||
SyncMessage::PeerConnected { peer_id } => {
|
||||
self.on_peer_connected(peer_id);
|
||||
@ -369,11 +369,11 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) {
|
||||
info!(%peer_id, ?err, "Dail to peer failed");
|
||||
fn on_dial_failed(&mut self, peer_id: PeerId, err: DialError) {
|
||||
info!(%peer_id, ?err, "Dial to peer failed");
|
||||
|
||||
for controller in self.controllers.values_mut() {
|
||||
controller.on_dail_failed(peer_id, &err);
|
||||
controller.on_dial_failed(peer_id, &err);
|
||||
controller.transition();
|
||||
}
|
||||
}
|
||||
|
@ -245,7 +245,7 @@ neighbors_only = true
|
||||
# Maximum number of continous failures to terminate a file sync.
|
||||
# max_request_failures = 5
|
||||
|
||||
# Timeout to dail peers.
|
||||
# Timeout to dial peers.
|
||||
# peer_connect_timeout = "15s"
|
||||
|
||||
# Timeout to disconnect peers.
|
||||
|
@ -257,7 +257,7 @@ neighbors_only = true
|
||||
# Maximum number of continous failures to terminate a file sync.
|
||||
# max_request_failures = 5
|
||||
|
||||
# Timeout to dail peers.
|
||||
# Timeout to dial peers.
|
||||
# peer_connect_timeout = "15s"
|
||||
|
||||
# Timeout to disconnect peers.
|
||||
|
@ -259,7 +259,7 @@ neighbors_only = true
|
||||
# Maximum number of continous failures to terminate a file sync.
|
||||
# max_request_failures = 5
|
||||
|
||||
# Timeout to dail peers.
|
||||
# Timeout to dial peers.
|
||||
# peer_connect_timeout = "15s"
|
||||
|
||||
# Timeout to disconnect peers.
|
||||
|
76
tests/network_tcp_shard_test.py
Normal file
76
tests/network_tcp_shard_test.py
Normal file
@ -0,0 +1,76 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.utils import p2p_port
|
||||
|
||||
class NetworkTcpShardTest(TestFramework):
|
||||
"""
|
||||
This is to test TCP connection for shard config mismatched peers of UDP discovery.
|
||||
"""
|
||||
|
||||
def setup_params(self):
|
||||
# 1 bootnode and 2 community nodes
|
||||
self.num_nodes = 3
|
||||
|
||||
# setup for node 0 as bootnode
|
||||
self.zgs_node_key_files = [ZGS_KEY_FILE]
|
||||
bootnode_port = p2p_port(0)
|
||||
self.zgs_node_configs[0] = {
|
||||
# enable UDP discovery relevant configs
|
||||
"network_enr_address": "127.0.0.1",
|
||||
"network_enr_tcp_port": bootnode_port,
|
||||
"network_enr_udp_port": bootnode_port,
|
||||
|
||||
# disable trusted nodes
|
||||
"network_libp2p_nodes": [],
|
||||
|
||||
# custom shard config
|
||||
"shard_position": "0/4"
|
||||
}
|
||||
|
||||
# setup node 1 & 2 as community nodes
|
||||
bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"]
|
||||
for i in range(1, self.num_nodes):
|
||||
self.zgs_node_configs[i] = {
|
||||
# enable UDP discovery relevant configs
|
||||
"network_enr_address": "127.0.0.1",
|
||||
"network_enr_tcp_port": p2p_port(i),
|
||||
"network_enr_udp_port": p2p_port(i),
|
||||
|
||||
# disable trusted nodes and enable bootnodes
|
||||
"network_libp2p_nodes": [],
|
||||
"network_boot_nodes": bootnodes,
|
||||
|
||||
# custom shard config
|
||||
"shard_position": f"{i}/4"
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
timeout_secs = 10
|
||||
|
||||
for iter in range(timeout_secs):
|
||||
time.sleep(1)
|
||||
self.log.info("==================================== iter %s", iter)
|
||||
|
||||
for i in range(self.num_nodes):
|
||||
info = self.nodes[i].rpc.admin_getNetworkInfo()
|
||||
self.log.info(
|
||||
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
|
||||
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
|
||||
)
|
||||
|
||||
if i == timeout_secs - 1:
|
||||
assert info["totalPeers"] == self.num_nodes - 1
|
||||
assert info["bannedPeers"] == 0
|
||||
assert info["disconnectedPeers"] == self.num_nodes - 1
|
||||
assert info["connectedPeers"] == 0
|
||||
|
||||
self.log.info("====================================")
|
||||
self.log.info("All nodes discovered but not connected for each other")
|
||||
|
||||
if __name__ == "__main__":
|
||||
NetworkTcpShardTest().main()
|
Loading…
Reference in New Issue
Block a user