mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-24 23:35:18 +00:00
Compare commits
2 Commits
7a9a3b2ab0
...
b4eedb9ec8
Author | SHA1 | Date | |
---|---|---|---|
|
b4eedb9ec8 | ||
|
ff4d7dabef |
@ -63,14 +63,14 @@ impl Default for Config {
|
||||
#[derive(Clone)]
|
||||
|
||||
pub struct Filters {
|
||||
/// Decide whether to dail to specified peer.
|
||||
pub dail_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
|
||||
/// Decide whether to dial to specified peer.
|
||||
pub dial_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
|
||||
}
|
||||
|
||||
impl Default for Filters {
|
||||
fn default() -> Self {
|
||||
Filters {
|
||||
dail_peer_filter: None,
|
||||
dial_peer_filter: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -78,7 +78,7 @@ impl Default for Filters {
|
||||
impl Debug for Filters {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Filters")
|
||||
.field("dail_peer_filter", &self.dail_peer_filter.is_some())
|
||||
.field("dial_peer_filter", &self.dial_peer_filter.is_some())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
@ -281,8 +281,8 @@ impl PeerManager {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(dail_peer_filter) = self.filters.dail_peer_filter.clone() {
|
||||
to_dial_peers.retain(|peer_id| dail_peer_filter(peer_id));
|
||||
if let Some(dial_peer_filter) = self.filters.dial_peer_filter.clone() {
|
||||
to_dial_peers.retain(|peer_id| dial_peer_filter(peer_id));
|
||||
}
|
||||
|
||||
// Queue another discovery if we need to
|
||||
|
@ -35,7 +35,7 @@ pub struct PeerDBConfig {
|
||||
pub allowed_negative_gossipsub_factor: f32,
|
||||
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
pub dail_timeout: Duration,
|
||||
pub dial_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for PeerDBConfig {
|
||||
@ -45,7 +45,7 @@ impl Default for PeerDBConfig {
|
||||
max_banned_peers: 1000,
|
||||
banned_peers_per_ip_threshold: 5,
|
||||
allowed_negative_gossipsub_factor: 0.1,
|
||||
dail_timeout: Duration::from_secs(15),
|
||||
dial_timeout: Duration::from_secs(15),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -339,7 +339,7 @@ impl PeerDB {
|
||||
.iter()
|
||||
.filter_map(|(peer_id, info)| {
|
||||
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
|
||||
if (*since) + self.config.dail_timeout < std::time::Instant::now() {
|
||||
if (*since) + self.config.dial_timeout < std::time::Instant::now() {
|
||||
return Some(*peer_id);
|
||||
}
|
||||
}
|
||||
|
@ -11,10 +11,10 @@ lazy_static::lazy_static! {
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "all");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "already");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "ok");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "fail");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
||||
|
||||
|
@ -311,23 +311,23 @@ impl RouterService {
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
|
||||
}
|
||||
NetworkMessage::DialPeer { address, peer_id } => {
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1);
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER.mark(1);
|
||||
|
||||
if self.libp2p.swarm.is_connected(&peer_id) {
|
||||
self.libp2p_event_handler
|
||||
.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1);
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY.mark(1);
|
||||
} else {
|
||||
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
|
||||
Ok(()) => {
|
||||
debug!(%address, "Dialing libp2p peer");
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1);
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK.mark(1);
|
||||
}
|
||||
Err(err) => {
|
||||
info!(%address, error = ?err, "Failed to dial peer");
|
||||
self.libp2p_event_handler
|
||||
.send_to_sync(SyncMessage::DailFailed { peer_id, err });
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1);
|
||||
.send_to_sync(SyncMessage::DialFailed { peer_id, err });
|
||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL.mark(1);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -139,8 +139,8 @@ impl ClientBuilder {
|
||||
let store = require!("network", self, store).clone();
|
||||
let file_location_cache = require!("network", self, file_location_cache).clone();
|
||||
|
||||
// only dail to peers that shard config matched
|
||||
config.peer_manager.filters.dail_peer_filter = Some(Arc::new(move |peer_id| {
|
||||
// only dial to peers that shard config matched
|
||||
config.peer_manager.filters.dial_peer_filter = Some(Arc::new(move |peer_id| {
|
||||
match file_location_cache.get_peer_config(peer_id) {
|
||||
Some(v) => store.get_shard_config().intersect(&v),
|
||||
None => true,
|
||||
|
@ -194,7 +194,7 @@ impl SyncPeers {
|
||||
ctx.report_peer(
|
||||
*peer_id,
|
||||
PeerAction::LowToleranceError,
|
||||
"Dail timeout",
|
||||
"Dial timeout",
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -232,7 +232,7 @@ impl SerialSyncController {
|
||||
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
|
||||
/// data in all shards.
|
||||
fn try_connect(&mut self) {
|
||||
let mut num_peers_dailed = 0;
|
||||
let mut num_peers_dialed = 0;
|
||||
|
||||
// select a random peer
|
||||
while !self
|
||||
@ -256,10 +256,10 @@ impl SerialSyncController {
|
||||
self.peers
|
||||
.update_state(&peer_id, PeerState::Found, PeerState::Connecting);
|
||||
|
||||
num_peers_dailed += 1;
|
||||
num_peers_dialed += 1;
|
||||
}
|
||||
|
||||
info!(%self.tx_seq, %num_peers_dailed, "Connecting peers");
|
||||
info!(%self.tx_seq, %num_peers_dialed, "Connecting peers");
|
||||
|
||||
self.state = SyncState::ConnectingPeers {
|
||||
origin: self.since,
|
||||
@ -358,14 +358,14 @@ impl SerialSyncController {
|
||||
.update_state_force(&peer_id, PeerState::Connected);
|
||||
}
|
||||
|
||||
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
||||
pub fn on_dial_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
||||
match err {
|
||||
DialError::ConnectionLimit(_) => {
|
||||
if let Some(true) =
|
||||
self.peers
|
||||
.update_state(&peer_id, PeerState::Connecting, PeerState::Found)
|
||||
{
|
||||
info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation");
|
||||
info!(%self.tx_seq, %peer_id, "Failed to dial peer due to outgoing connection limitation");
|
||||
self.state = SyncState::AwaitingOutgoingConnection {
|
||||
since: Instant::now().into(),
|
||||
};
|
||||
@ -377,7 +377,7 @@ impl SerialSyncController {
|
||||
PeerState::Connecting,
|
||||
PeerState::Disconnected,
|
||||
) {
|
||||
info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer");
|
||||
info!(%self.tx_seq, %peer_id, %err, "Failed to dial peer");
|
||||
self.state = SyncState::Idle;
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SyncMessage {
|
||||
DailFailed {
|
||||
DialFailed {
|
||||
peer_id: PeerId,
|
||||
err: DialError,
|
||||
},
|
||||
@ -227,8 +227,8 @@ impl SyncService {
|
||||
trace!("Sync received message {:?}", msg);
|
||||
|
||||
match msg {
|
||||
SyncMessage::DailFailed { peer_id, err } => {
|
||||
self.on_dail_failed(peer_id, err);
|
||||
SyncMessage::DialFailed { peer_id, err } => {
|
||||
self.on_dial_failed(peer_id, err);
|
||||
}
|
||||
SyncMessage::PeerConnected { peer_id } => {
|
||||
self.on_peer_connected(peer_id);
|
||||
@ -369,11 +369,11 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) {
|
||||
info!(%peer_id, ?err, "Dail to peer failed");
|
||||
fn on_dial_failed(&mut self, peer_id: PeerId, err: DialError) {
|
||||
info!(%peer_id, ?err, "Dial to peer failed");
|
||||
|
||||
for controller in self.controllers.values_mut() {
|
||||
controller.on_dail_failed(peer_id, &err);
|
||||
controller.on_dial_failed(peer_id, &err);
|
||||
controller.transition();
|
||||
}
|
||||
}
|
||||
|
@ -245,7 +245,7 @@ neighbors_only = true
|
||||
# Maximum number of continous failures to terminate a file sync.
|
||||
# max_request_failures = 5
|
||||
|
||||
# Timeout to dail peers.
|
||||
# Timeout to dial peers.
|
||||
# peer_connect_timeout = "15s"
|
||||
|
||||
# Timeout to disconnect peers.
|
||||
|
@ -257,7 +257,7 @@ neighbors_only = true
|
||||
# Maximum number of continous failures to terminate a file sync.
|
||||
# max_request_failures = 5
|
||||
|
||||
# Timeout to dail peers.
|
||||
# Timeout to dial peers.
|
||||
# peer_connect_timeout = "15s"
|
||||
|
||||
# Timeout to disconnect peers.
|
||||
|
@ -259,7 +259,7 @@ neighbors_only = true
|
||||
# Maximum number of continous failures to terminate a file sync.
|
||||
# max_request_failures = 5
|
||||
|
||||
# Timeout to dail peers.
|
||||
# Timeout to dial peers.
|
||||
# peer_connect_timeout = "15s"
|
||||
|
||||
# Timeout to disconnect peers.
|
||||
|
Loading…
Reference in New Issue
Block a user