Do not dail to shard config mismatched peers

This commit is contained in:
boqiu 2024-11-22 17:00:35 +08:00
parent 2be910cd39
commit 35860e73f1
5 changed files with 51 additions and 6 deletions

View File

@ -1,6 +1,7 @@
use std::time::Duration;
use std::{fmt::Debug, sync::Arc, time::Duration};
use duration_str::deserialize_duration;
use libp2p::PeerId;
use serde::{Deserialize, Serialize};
/// The time in seconds between re-status's peers.
@ -16,7 +17,7 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20;
pub const DEFAULT_TARGET_PEERS: usize = 50;
/// Configurations for the PeerManager.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
/* Peer count related configurations */
@ -40,6 +41,9 @@ pub struct Config {
pub ping_interval_inbound: u64,
/// Interval between PING events for peers dialed by us.
pub ping_interval_outbound: u64,
#[serde(skip)]
pub filters: Filters,
}
impl Default for Config {
@ -52,6 +56,29 @@ impl Default for Config {
status_interval: DEFAULT_STATUS_INTERVAL,
ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND,
ping_interval_outbound: DEFAULT_PING_INTERVAL_OUTBOUND,
filters: Default::default(),
}
}
}
#[derive(Clone)]
pub struct Filters {
/// Decide whether to dail to specified peer.
pub dail_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
}
impl Default for Filters {
fn default() -> Self {
Filters {
dail_peer_filter: None,
}
}
}
impl Debug for Filters {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Filters")
.field("dail_peer_filter", &self.dail_peer_filter.is_some())
.finish()
}
}

View File

@ -69,6 +69,8 @@ pub struct PeerManager {
discovery_enabled: bool,
/// Keeps track if the current instance is reporting metrics or not.
metrics_enabled: bool,
filters: config::Filters,
}
/// The events that the `PeerManager` outputs (requests).
@ -108,6 +110,7 @@ impl PeerManager {
status_interval,
ping_interval_inbound,
ping_interval_outbound,
filters,
} = cfg;
// Set up the peer manager heartbeat interval
@ -123,6 +126,7 @@ impl PeerManager {
heartbeat,
discovery_enabled,
metrics_enabled,
filters,
})
}
@ -277,6 +281,10 @@ impl PeerManager {
}
}
if let Some(dail_peer_filter) = self.filters.dail_peer_filter.clone() {
to_dial_peers.retain(|peer_id| dail_peer_filter(peer_id));
}
// Queue another discovery if we need to
self.maintain_peer_count(to_dial_peers.len());

View File

@ -134,11 +134,21 @@ impl ClientBuilder {
}
/// Starts the networking stack.
pub async fn with_network(mut self, config: &NetworkConfig) -> Result<Self, String> {
pub async fn with_network(mut self, mut config: NetworkConfig) -> Result<Self, String> {
let executor = require!("network", self, runtime_context).clone().executor;
let store = require!("network", self, store).clone();
let file_location_cache = require!("network", self, file_location_cache).clone();
// only dail to peers that shard config matched
config.peer_manager.filters.dail_peer_filter = Some(Arc::new(move |peer_id| {
match file_location_cache.get_peer_config(peer_id) {
Some(v) => store.get_shard_config().intersect(&v),
None => true,
}
}));
// construct the libp2p service context
let service_context = network::Context { config };
let service_context = network::Context { config: &config };
// construct communication channel
let (send, recv) = new_network_channel();

View File

@ -110,7 +110,7 @@ impl ZgsConfig {
network_config.private = self.network_private;
network_config.peer_db = self.network_peer_db;
network_config.peer_manager = self.network_peer_manager;
network_config.peer_manager = self.network_peer_manager.clone();
network_config.disable_enr_network_id = self.discv5_disable_enr_network_id;
Ok(network_config)

View File

@ -26,7 +26,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.with_log_sync(log_sync_config)
.await?
.with_file_location_cache(config.file_location_cache)
.with_network(&network_config)
.with_network(network_config)
.await?
.with_chunk_pool(chunk_pool_config)
.await?