diff --git a/node/network/src/peer_manager/config.rs b/node/network/src/peer_manager/config.rs index 5520601..56607d1 100644 --- a/node/network/src/peer_manager/config.rs +++ b/node/network/src/peer_manager/config.rs @@ -1,6 +1,7 @@ -use std::time::Duration; +use std::{fmt::Debug, sync::Arc, time::Duration}; use duration_str::deserialize_duration; +use libp2p::PeerId; use serde::{Deserialize, Serialize}; /// The time in seconds between re-status's peers. @@ -16,7 +17,7 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20; pub const DEFAULT_TARGET_PEERS: usize = 50; /// Configurations for the PeerManager. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct Config { /* Peer count related configurations */ @@ -40,6 +41,9 @@ pub struct Config { pub ping_interval_inbound: u64, /// Interval between PING events for peers dialed by us. pub ping_interval_outbound: u64, + + #[serde(skip)] + pub filters: Filters, } impl Default for Config { @@ -52,6 +56,29 @@ impl Default for Config { status_interval: DEFAULT_STATUS_INTERVAL, ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND, ping_interval_outbound: DEFAULT_PING_INTERVAL_OUTBOUND, + filters: Default::default(), } } } +#[derive(Clone)] + +pub struct Filters { + /// Decide whether to dail to specified peer. + pub dail_peer_filter: Option bool + Sync + Send + 'static>>, +} + +impl Default for Filters { + fn default() -> Self { + Filters { + dail_peer_filter: None, + } + } +} + +impl Debug for Filters { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Filters") + .field("dail_peer_filter", &self.dail_peer_filter.is_some()) + .finish() + } +} diff --git a/node/network/src/peer_manager/mod.rs b/node/network/src/peer_manager/mod.rs index 9ee4936..81d4806 100644 --- a/node/network/src/peer_manager/mod.rs +++ b/node/network/src/peer_manager/mod.rs @@ -69,6 +69,8 @@ pub struct PeerManager { discovery_enabled: bool, /// Keeps track if the current instance is reporting metrics or not. metrics_enabled: bool, + + filters: config::Filters, } /// The events that the `PeerManager` outputs (requests). @@ -108,6 +110,7 @@ impl PeerManager { status_interval, ping_interval_inbound, ping_interval_outbound, + filters, } = cfg; // Set up the peer manager heartbeat interval @@ -123,6 +126,7 @@ impl PeerManager { heartbeat, discovery_enabled, metrics_enabled, + filters, }) } @@ -277,6 +281,10 @@ impl PeerManager { } } + if let Some(dail_peer_filter) = self.filters.dail_peer_filter.clone() { + to_dial_peers.retain(|peer_id| dail_peer_filter(peer_id)); + } + // Queue another discovery if we need to self.maintain_peer_count(to_dial_peers.len()); diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 7d345c8..e8f916d 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -134,11 +134,21 @@ impl ClientBuilder { } /// Starts the networking stack. - pub async fn with_network(mut self, config: &NetworkConfig) -> Result { + pub async fn with_network(mut self, mut config: NetworkConfig) -> Result { let executor = require!("network", self, runtime_context).clone().executor; + let store = require!("network", self, store).clone(); + let file_location_cache = require!("network", self, file_location_cache).clone(); + + // only dail to peers that shard config matched + config.peer_manager.filters.dail_peer_filter = Some(Arc::new(move |peer_id| { + match file_location_cache.get_peer_config(peer_id) { + Some(v) => store.get_shard_config().intersect(&v), + None => true, + } + })); // construct the libp2p service context - let service_context = network::Context { config }; + let service_context = network::Context { config: &config }; // construct communication channel let (send, recv) = new_network_channel(); diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 67346a2..a0dc345 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -110,7 +110,7 @@ impl ZgsConfig { network_config.private = self.network_private; network_config.peer_db = self.network_peer_db; - network_config.peer_manager = self.network_peer_manager; + network_config.peer_manager = self.network_peer_manager.clone(); network_config.disable_enr_network_id = self.discv5_disable_enr_network_id; Ok(network_config) diff --git a/node/src/main.rs b/node/src/main.rs index 29db29e..9c42046 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -26,7 +26,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result