mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-12 16:15:17 +00:00
Compare commits
2 Commits
7a9a3b2ab0
...
b4eedb9ec8
Author | SHA1 | Date | |
---|---|---|---|
|
b4eedb9ec8 | ||
|
ff4d7dabef |
@ -63,14 +63,14 @@ impl Default for Config {
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
||||||
pub struct Filters {
|
pub struct Filters {
|
||||||
/// Decide whether to dail to specified peer.
|
/// Decide whether to dial to specified peer.
|
||||||
pub dail_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
|
pub dial_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Filters {
|
impl Default for Filters {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Filters {
|
Filters {
|
||||||
dail_peer_filter: None,
|
dial_peer_filter: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -78,7 +78,7 @@ impl Default for Filters {
|
|||||||
impl Debug for Filters {
|
impl Debug for Filters {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("Filters")
|
f.debug_struct("Filters")
|
||||||
.field("dail_peer_filter", &self.dail_peer_filter.is_some())
|
.field("dial_peer_filter", &self.dial_peer_filter.is_some())
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -281,8 +281,8 @@ impl PeerManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(dail_peer_filter) = self.filters.dail_peer_filter.clone() {
|
if let Some(dial_peer_filter) = self.filters.dial_peer_filter.clone() {
|
||||||
to_dial_peers.retain(|peer_id| dail_peer_filter(peer_id));
|
to_dial_peers.retain(|peer_id| dial_peer_filter(peer_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Queue another discovery if we need to
|
// Queue another discovery if we need to
|
||||||
|
@ -35,7 +35,7 @@ pub struct PeerDBConfig {
|
|||||||
pub allowed_negative_gossipsub_factor: f32,
|
pub allowed_negative_gossipsub_factor: f32,
|
||||||
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
|
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
|
||||||
#[serde(deserialize_with = "deserialize_duration")]
|
#[serde(deserialize_with = "deserialize_duration")]
|
||||||
pub dail_timeout: Duration,
|
pub dial_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PeerDBConfig {
|
impl Default for PeerDBConfig {
|
||||||
@ -45,7 +45,7 @@ impl Default for PeerDBConfig {
|
|||||||
max_banned_peers: 1000,
|
max_banned_peers: 1000,
|
||||||
banned_peers_per_ip_threshold: 5,
|
banned_peers_per_ip_threshold: 5,
|
||||||
allowed_negative_gossipsub_factor: 0.1,
|
allowed_negative_gossipsub_factor: 0.1,
|
||||||
dail_timeout: Duration::from_secs(15),
|
dial_timeout: Duration::from_secs(15),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -339,7 +339,7 @@ impl PeerDB {
|
|||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(peer_id, info)| {
|
.filter_map(|(peer_id, info)| {
|
||||||
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
|
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
|
||||||
if (*since) + self.config.dail_timeout < std::time::Instant::now() {
|
if (*since) + self.config.dial_timeout < std::time::Instant::now() {
|
||||||
return Some(*peer_id);
|
return Some(*peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,10 +11,10 @@ lazy_static::lazy_static! {
|
|||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "all");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "already");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "ok");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "fail");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
||||||
|
|
||||||
|
@ -311,23 +311,23 @@ impl RouterService {
|
|||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
|
||||||
}
|
}
|
||||||
NetworkMessage::DialPeer { address, peer_id } => {
|
NetworkMessage::DialPeer { address, peer_id } => {
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER.mark(1);
|
||||||
|
|
||||||
if self.libp2p.swarm.is_connected(&peer_id) {
|
if self.libp2p.swarm.is_connected(&peer_id) {
|
||||||
self.libp2p_event_handler
|
self.libp2p_event_handler
|
||||||
.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY.mark(1);
|
||||||
} else {
|
} else {
|
||||||
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
|
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
debug!(%address, "Dialing libp2p peer");
|
debug!(%address, "Dialing libp2p peer");
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK.mark(1);
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
info!(%address, error = ?err, "Failed to dial peer");
|
info!(%address, error = ?err, "Failed to dial peer");
|
||||||
self.libp2p_event_handler
|
self.libp2p_event_handler
|
||||||
.send_to_sync(SyncMessage::DailFailed { peer_id, err });
|
.send_to_sync(SyncMessage::DialFailed { peer_id, err });
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL.mark(1);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -139,8 +139,8 @@ impl ClientBuilder {
|
|||||||
let store = require!("network", self, store).clone();
|
let store = require!("network", self, store).clone();
|
||||||
let file_location_cache = require!("network", self, file_location_cache).clone();
|
let file_location_cache = require!("network", self, file_location_cache).clone();
|
||||||
|
|
||||||
// only dail to peers that shard config matched
|
// only dial to peers that shard config matched
|
||||||
config.peer_manager.filters.dail_peer_filter = Some(Arc::new(move |peer_id| {
|
config.peer_manager.filters.dial_peer_filter = Some(Arc::new(move |peer_id| {
|
||||||
match file_location_cache.get_peer_config(peer_id) {
|
match file_location_cache.get_peer_config(peer_id) {
|
||||||
Some(v) => store.get_shard_config().intersect(&v),
|
Some(v) => store.get_shard_config().intersect(&v),
|
||||||
None => true,
|
None => true,
|
||||||
|
@ -194,7 +194,7 @@ impl SyncPeers {
|
|||||||
ctx.report_peer(
|
ctx.report_peer(
|
||||||
*peer_id,
|
*peer_id,
|
||||||
PeerAction::LowToleranceError,
|
PeerAction::LowToleranceError,
|
||||||
"Dail timeout",
|
"Dial timeout",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ impl SerialSyncController {
|
|||||||
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
|
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
|
||||||
/// data in all shards.
|
/// data in all shards.
|
||||||
fn try_connect(&mut self) {
|
fn try_connect(&mut self) {
|
||||||
let mut num_peers_dailed = 0;
|
let mut num_peers_dialed = 0;
|
||||||
|
|
||||||
// select a random peer
|
// select a random peer
|
||||||
while !self
|
while !self
|
||||||
@ -256,10 +256,10 @@ impl SerialSyncController {
|
|||||||
self.peers
|
self.peers
|
||||||
.update_state(&peer_id, PeerState::Found, PeerState::Connecting);
|
.update_state(&peer_id, PeerState::Found, PeerState::Connecting);
|
||||||
|
|
||||||
num_peers_dailed += 1;
|
num_peers_dialed += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(%self.tx_seq, %num_peers_dailed, "Connecting peers");
|
info!(%self.tx_seq, %num_peers_dialed, "Connecting peers");
|
||||||
|
|
||||||
self.state = SyncState::ConnectingPeers {
|
self.state = SyncState::ConnectingPeers {
|
||||||
origin: self.since,
|
origin: self.since,
|
||||||
@ -358,14 +358,14 @@ impl SerialSyncController {
|
|||||||
.update_state_force(&peer_id, PeerState::Connected);
|
.update_state_force(&peer_id, PeerState::Connected);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
pub fn on_dial_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
||||||
match err {
|
match err {
|
||||||
DialError::ConnectionLimit(_) => {
|
DialError::ConnectionLimit(_) => {
|
||||||
if let Some(true) =
|
if let Some(true) =
|
||||||
self.peers
|
self.peers
|
||||||
.update_state(&peer_id, PeerState::Connecting, PeerState::Found)
|
.update_state(&peer_id, PeerState::Connecting, PeerState::Found)
|
||||||
{
|
{
|
||||||
info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation");
|
info!(%self.tx_seq, %peer_id, "Failed to dial peer due to outgoing connection limitation");
|
||||||
self.state = SyncState::AwaitingOutgoingConnection {
|
self.state = SyncState::AwaitingOutgoingConnection {
|
||||||
since: Instant::now().into(),
|
since: Instant::now().into(),
|
||||||
};
|
};
|
||||||
@ -377,7 +377,7 @@ impl SerialSyncController {
|
|||||||
PeerState::Connecting,
|
PeerState::Connecting,
|
||||||
PeerState::Disconnected,
|
PeerState::Disconnected,
|
||||||
) {
|
) {
|
||||||
info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer");
|
info!(%self.tx_seq, %peer_id, %err, "Failed to dial peer");
|
||||||
self.state = SyncState::Idle;
|
self.state = SyncState::Idle;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum SyncMessage {
|
pub enum SyncMessage {
|
||||||
DailFailed {
|
DialFailed {
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
err: DialError,
|
err: DialError,
|
||||||
},
|
},
|
||||||
@ -227,8 +227,8 @@ impl SyncService {
|
|||||||
trace!("Sync received message {:?}", msg);
|
trace!("Sync received message {:?}", msg);
|
||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
SyncMessage::DailFailed { peer_id, err } => {
|
SyncMessage::DialFailed { peer_id, err } => {
|
||||||
self.on_dail_failed(peer_id, err);
|
self.on_dial_failed(peer_id, err);
|
||||||
}
|
}
|
||||||
SyncMessage::PeerConnected { peer_id } => {
|
SyncMessage::PeerConnected { peer_id } => {
|
||||||
self.on_peer_connected(peer_id);
|
self.on_peer_connected(peer_id);
|
||||||
@ -369,11 +369,11 @@ impl SyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) {
|
fn on_dial_failed(&mut self, peer_id: PeerId, err: DialError) {
|
||||||
info!(%peer_id, ?err, "Dail to peer failed");
|
info!(%peer_id, ?err, "Dial to peer failed");
|
||||||
|
|
||||||
for controller in self.controllers.values_mut() {
|
for controller in self.controllers.values_mut() {
|
||||||
controller.on_dail_failed(peer_id, &err);
|
controller.on_dial_failed(peer_id, &err);
|
||||||
controller.transition();
|
controller.transition();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -245,7 +245,7 @@ neighbors_only = true
|
|||||||
# Maximum number of continous failures to terminate a file sync.
|
# Maximum number of continous failures to terminate a file sync.
|
||||||
# max_request_failures = 5
|
# max_request_failures = 5
|
||||||
|
|
||||||
# Timeout to dail peers.
|
# Timeout to dial peers.
|
||||||
# peer_connect_timeout = "15s"
|
# peer_connect_timeout = "15s"
|
||||||
|
|
||||||
# Timeout to disconnect peers.
|
# Timeout to disconnect peers.
|
||||||
|
@ -257,7 +257,7 @@ neighbors_only = true
|
|||||||
# Maximum number of continous failures to terminate a file sync.
|
# Maximum number of continous failures to terminate a file sync.
|
||||||
# max_request_failures = 5
|
# max_request_failures = 5
|
||||||
|
|
||||||
# Timeout to dail peers.
|
# Timeout to dial peers.
|
||||||
# peer_connect_timeout = "15s"
|
# peer_connect_timeout = "15s"
|
||||||
|
|
||||||
# Timeout to disconnect peers.
|
# Timeout to disconnect peers.
|
||||||
|
@ -259,7 +259,7 @@ neighbors_only = true
|
|||||||
# Maximum number of continous failures to terminate a file sync.
|
# Maximum number of continous failures to terminate a file sync.
|
||||||
# max_request_failures = 5
|
# max_request_failures = 5
|
||||||
|
|
||||||
# Timeout to dail peers.
|
# Timeout to dial peers.
|
||||||
# peer_connect_timeout = "15s"
|
# peer_connect_timeout = "15s"
|
||||||
|
|
||||||
# Timeout to disconnect peers.
|
# Timeout to disconnect peers.
|
||||||
|
Loading…
Reference in New Issue
Block a user