Compare commits

..

No commits in common. "b4eedb9ec8ad4bf69bed588f145c1b93c19ba62b" and "7a9a3b2ab0347a2927f207bdb74f1c096c5c1ffb" have entirely different histories.

12 changed files with 36 additions and 36 deletions

View File

@ -63,14 +63,14 @@ impl Default for Config {
#[derive(Clone)] #[derive(Clone)]
pub struct Filters { pub struct Filters {
/// Decide whether to dial to specified peer. /// Decide whether to dail to specified peer.
pub dial_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>, pub dail_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
} }
impl Default for Filters { impl Default for Filters {
fn default() -> Self { fn default() -> Self {
Filters { Filters {
dial_peer_filter: None, dail_peer_filter: None,
} }
} }
} }
@ -78,7 +78,7 @@ impl Default for Filters {
impl Debug for Filters { impl Debug for Filters {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Filters") f.debug_struct("Filters")
.field("dial_peer_filter", &self.dial_peer_filter.is_some()) .field("dail_peer_filter", &self.dail_peer_filter.is_some())
.finish() .finish()
} }
} }

View File

@ -281,8 +281,8 @@ impl PeerManager {
} }
} }
if let Some(dial_peer_filter) = self.filters.dial_peer_filter.clone() { if let Some(dail_peer_filter) = self.filters.dail_peer_filter.clone() {
to_dial_peers.retain(|peer_id| dial_peer_filter(peer_id)); to_dial_peers.retain(|peer_id| dail_peer_filter(peer_id));
} }
// Queue another discovery if we need to // Queue another discovery if we need to

View File

@ -35,7 +35,7 @@ pub struct PeerDBConfig {
pub allowed_negative_gossipsub_factor: f32, pub allowed_negative_gossipsub_factor: f32,
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state. /// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
#[serde(deserialize_with = "deserialize_duration")] #[serde(deserialize_with = "deserialize_duration")]
pub dial_timeout: Duration, pub dail_timeout: Duration,
} }
impl Default for PeerDBConfig { impl Default for PeerDBConfig {
@ -45,7 +45,7 @@ impl Default for PeerDBConfig {
max_banned_peers: 1000, max_banned_peers: 1000,
banned_peers_per_ip_threshold: 5, banned_peers_per_ip_threshold: 5,
allowed_negative_gossipsub_factor: 0.1, allowed_negative_gossipsub_factor: 0.1,
dial_timeout: Duration::from_secs(15), dail_timeout: Duration::from_secs(15),
} }
} }
} }
@ -339,7 +339,7 @@ impl PeerDB {
.iter() .iter()
.filter_map(|(peer_id, info)| { .filter_map(|(peer_id, info)| {
if let PeerConnectionStatus::Dialing { since } = info.connection_status() { if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
if (*since) + self.config.dial_timeout < std::time::Instant::now() { if (*since) + self.config.dail_timeout < std::time::Instant::now() {
return Some(*peer_id); return Some(*peer_id);
} }
} }

View File

@ -11,10 +11,10 @@ lazy_static::lazy_static! {
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "all"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "already"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "ok"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "fail"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");

View File

@ -311,23 +311,23 @@ impl RouterService {
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
} }
NetworkMessage::DialPeer { address, peer_id } => { NetworkMessage::DialPeer { address, peer_id } => {
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER.mark(1); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1);
if self.libp2p.swarm.is_connected(&peer_id) { if self.libp2p.swarm.is_connected(&peer_id) {
self.libp2p_event_handler self.libp2p_event_handler
.send_to_sync(SyncMessage::PeerConnected { peer_id }); .send_to_sync(SyncMessage::PeerConnected { peer_id });
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY.mark(1); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1);
} else { } else {
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) { match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
Ok(()) => { Ok(()) => {
debug!(%address, "Dialing libp2p peer"); debug!(%address, "Dialing libp2p peer");
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK.mark(1); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1);
} }
Err(err) => { Err(err) => {
info!(%address, error = ?err, "Failed to dial peer"); info!(%address, error = ?err, "Failed to dial peer");
self.libp2p_event_handler self.libp2p_event_handler
.send_to_sync(SyncMessage::DialFailed { peer_id, err }); .send_to_sync(SyncMessage::DailFailed { peer_id, err });
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL.mark(1); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1);
} }
}; };
} }

View File

@ -139,8 +139,8 @@ impl ClientBuilder {
let store = require!("network", self, store).clone(); let store = require!("network", self, store).clone();
let file_location_cache = require!("network", self, file_location_cache).clone(); let file_location_cache = require!("network", self, file_location_cache).clone();
// only dial to peers that shard config matched // only dail to peers that shard config matched
config.peer_manager.filters.dial_peer_filter = Some(Arc::new(move |peer_id| { config.peer_manager.filters.dail_peer_filter = Some(Arc::new(move |peer_id| {
match file_location_cache.get_peer_config(peer_id) { match file_location_cache.get_peer_config(peer_id) {
Some(v) => store.get_shard_config().intersect(&v), Some(v) => store.get_shard_config().intersect(&v),
None => true, None => true,

View File

@ -194,7 +194,7 @@ impl SyncPeers {
ctx.report_peer( ctx.report_peer(
*peer_id, *peer_id,
PeerAction::LowToleranceError, PeerAction::LowToleranceError,
"Dial timeout", "Dail timeout",
); );
} }

View File

@ -232,7 +232,7 @@ impl SerialSyncController {
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover /// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
/// data in all shards. /// data in all shards.
fn try_connect(&mut self) { fn try_connect(&mut self) {
let mut num_peers_dialed = 0; let mut num_peers_dailed = 0;
// select a random peer // select a random peer
while !self while !self
@ -256,10 +256,10 @@ impl SerialSyncController {
self.peers self.peers
.update_state(&peer_id, PeerState::Found, PeerState::Connecting); .update_state(&peer_id, PeerState::Found, PeerState::Connecting);
num_peers_dialed += 1; num_peers_dailed += 1;
} }
info!(%self.tx_seq, %num_peers_dialed, "Connecting peers"); info!(%self.tx_seq, %num_peers_dailed, "Connecting peers");
self.state = SyncState::ConnectingPeers { self.state = SyncState::ConnectingPeers {
origin: self.since, origin: self.since,
@ -358,14 +358,14 @@ impl SerialSyncController {
.update_state_force(&peer_id, PeerState::Connected); .update_state_force(&peer_id, PeerState::Connected);
} }
pub fn on_dial_failed(&mut self, peer_id: PeerId, err: &DialError) { pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
match err { match err {
DialError::ConnectionLimit(_) => { DialError::ConnectionLimit(_) => {
if let Some(true) = if let Some(true) =
self.peers self.peers
.update_state(&peer_id, PeerState::Connecting, PeerState::Found) .update_state(&peer_id, PeerState::Connecting, PeerState::Found)
{ {
info!(%self.tx_seq, %peer_id, "Failed to dial peer due to outgoing connection limitation"); info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation");
self.state = SyncState::AwaitingOutgoingConnection { self.state = SyncState::AwaitingOutgoingConnection {
since: Instant::now().into(), since: Instant::now().into(),
}; };
@ -377,7 +377,7 @@ impl SerialSyncController {
PeerState::Connecting, PeerState::Connecting,
PeerState::Disconnected, PeerState::Disconnected,
) { ) {
info!(%self.tx_seq, %peer_id, %err, "Failed to dial peer"); info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer");
self.state = SyncState::Idle; self.state = SyncState::Idle;
} }
} }

View File

@ -33,7 +33,7 @@ pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse
#[derive(Debug)] #[derive(Debug)]
pub enum SyncMessage { pub enum SyncMessage {
DialFailed { DailFailed {
peer_id: PeerId, peer_id: PeerId,
err: DialError, err: DialError,
}, },
@ -227,8 +227,8 @@ impl SyncService {
trace!("Sync received message {:?}", msg); trace!("Sync received message {:?}", msg);
match msg { match msg {
SyncMessage::DialFailed { peer_id, err } => { SyncMessage::DailFailed { peer_id, err } => {
self.on_dial_failed(peer_id, err); self.on_dail_failed(peer_id, err);
} }
SyncMessage::PeerConnected { peer_id } => { SyncMessage::PeerConnected { peer_id } => {
self.on_peer_connected(peer_id); self.on_peer_connected(peer_id);
@ -369,11 +369,11 @@ impl SyncService {
} }
} }
fn on_dial_failed(&mut self, peer_id: PeerId, err: DialError) { fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) {
info!(%peer_id, ?err, "Dial to peer failed"); info!(%peer_id, ?err, "Dail to peer failed");
for controller in self.controllers.values_mut() { for controller in self.controllers.values_mut() {
controller.on_dial_failed(peer_id, &err); controller.on_dail_failed(peer_id, &err);
controller.transition(); controller.transition();
} }
} }

View File

@ -245,7 +245,7 @@ neighbors_only = true
# Maximum number of continous failures to terminate a file sync. # Maximum number of continous failures to terminate a file sync.
# max_request_failures = 5 # max_request_failures = 5
# Timeout to dial peers. # Timeout to dail peers.
# peer_connect_timeout = "15s" # peer_connect_timeout = "15s"
# Timeout to disconnect peers. # Timeout to disconnect peers.

View File

@ -257,7 +257,7 @@ neighbors_only = true
# Maximum number of continous failures to terminate a file sync. # Maximum number of continous failures to terminate a file sync.
# max_request_failures = 5 # max_request_failures = 5
# Timeout to dial peers. # Timeout to dail peers.
# peer_connect_timeout = "15s" # peer_connect_timeout = "15s"
# Timeout to disconnect peers. # Timeout to disconnect peers.

View File

@ -259,7 +259,7 @@ neighbors_only = true
# Maximum number of continous failures to terminate a file sync. # Maximum number of continous failures to terminate a file sync.
# max_request_failures = 5 # max_request_failures = 5
# Timeout to dial peers. # Timeout to dail peers.
# peer_connect_timeout = "15s" # peer_connect_timeout = "15s"
# Timeout to disconnect peers. # Timeout to disconnect peers.