mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-24 07:15:17 +00:00
Compare commits
6 Commits
d3480784fb
...
18bcd8f591
Author | SHA1 | Date | |
---|---|---|---|
|
18bcd8f591 | ||
|
686e7a42e8 | ||
|
3bb242954b | ||
|
c548cb388c | ||
|
5b3946c363 | ||
|
43ba0b3079 |
@ -97,7 +97,7 @@ pub struct Discovery {
|
|||||||
///
|
///
|
||||||
/// This is behind a Reference counter to allow for futures to be spawned and polled with a
|
/// This is behind a Reference counter to allow for futures to be spawned and polled with a
|
||||||
/// static lifetime.
|
/// static lifetime.
|
||||||
discv5: Discv5,
|
pub discv5: Discv5,
|
||||||
|
|
||||||
/// A collection of network constants that can be read from other threads.
|
/// A collection of network constants that can be read from other threads.
|
||||||
network_globals: Arc<NetworkGlobals>,
|
network_globals: Arc<NetworkGlobals>,
|
||||||
|
@ -365,14 +365,6 @@ impl Libp2pEventHandler {
|
|||||||
self.on_find_chunks(msg).await
|
self.on_find_chunks(msg).await
|
||||||
}
|
}
|
||||||
PubsubMessage::AnnounceFile(msgs) => {
|
PubsubMessage::AnnounceFile(msgs) => {
|
||||||
let maybe_peer = self.network_globals.peers.read().peer_info(&source).cloned();
|
|
||||||
let ip = maybe_peer.clone().map(|info| info.seen_ip_addresses().collect::<Vec<IpAddr>>());
|
|
||||||
let agent = maybe_peer.clone().map(|info| info.client().agent_string.clone());
|
|
||||||
let connection = maybe_peer.map(|info| info.connection_status().clone());
|
|
||||||
|
|
||||||
debug!(batch = %msgs.len(), %source, ?ip, ?agent, ?connection, "Received AnnounceFile pubsub message");
|
|
||||||
|
|
||||||
|
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1);
|
||||||
|
|
||||||
for msg in msgs {
|
for msg in msgs {
|
||||||
|
@ -13,7 +13,9 @@ use network::{
|
|||||||
};
|
};
|
||||||
use pruner::PrunerMessage;
|
use pruner::PrunerMessage;
|
||||||
use shared_types::timestamp_now;
|
use shared_types::timestamp_now;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
use storage::log_store::Store as LogStore;
|
use storage::log_store::Store as LogStore;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use sync::{SyncMessage, SyncSender};
|
use sync::{SyncMessage, SyncSender};
|
||||||
@ -102,6 +104,7 @@ impl RouterService {
|
|||||||
async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
|
async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
|
||||||
let mut heartbeat_service = interval(self.config.heartbeat_interval);
|
let mut heartbeat_service = interval(self.config.heartbeat_interval);
|
||||||
let mut heartbeat_batcher = interval(self.config.batcher_timeout);
|
let mut heartbeat_batcher = interval(self.config.batcher_timeout);
|
||||||
|
let mut heartbeat_dump_peers = interval(Duration::from_secs(5));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@ -118,10 +121,50 @@ impl RouterService {
|
|||||||
|
|
||||||
// heartbeat for expire file batcher
|
// heartbeat for expire file batcher
|
||||||
_ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await,
|
_ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await,
|
||||||
|
|
||||||
|
_ = heartbeat_dump_peers.tick() => self.on_dump_peers().await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn on_dump_peers(&mut self) {
|
||||||
|
let discovered_nodes = self
|
||||||
|
.libp2p
|
||||||
|
.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.discovery_mut()
|
||||||
|
.discv5
|
||||||
|
.table_entries();
|
||||||
|
let connected_nodes = discovered_nodes
|
||||||
|
.iter()
|
||||||
|
.filter(|n| n.2.is_connected())
|
||||||
|
.count();
|
||||||
|
let disconnected_nodes = discovered_nodes.len() - connected_nodes;
|
||||||
|
let incoming_nodes = discovered_nodes
|
||||||
|
.iter()
|
||||||
|
.filter(|n| n.2.is_incoming())
|
||||||
|
.count();
|
||||||
|
let outgoing_nodes = discovered_nodes.len() - incoming_nodes;
|
||||||
|
debug!(%connected_nodes, %disconnected_nodes, %incoming_nodes, %outgoing_nodes, "qbit - network statistics for discv5");
|
||||||
|
|
||||||
|
let gossip = self.libp2p.swarm.behaviour().gs();
|
||||||
|
let all_peers = gossip.all_peers().count();
|
||||||
|
let mut peers_by_topic = HashMap::new();
|
||||||
|
for topic in gossip.topics() {
|
||||||
|
let topic_str = topic.as_str().to_string();
|
||||||
|
let peers = gossip.mesh_peers(topic).count();
|
||||||
|
peers_by_topic.insert(topic_str, peers);
|
||||||
|
}
|
||||||
|
debug!(%all_peers, ?peers_by_topic, "qbit - network statistics for gossip");
|
||||||
|
|
||||||
|
let peer_db = self.network_globals.peers.read();
|
||||||
|
let total_peers = peer_db.peers().count();
|
||||||
|
let connected_peers = peer_db.connected_peers().count();
|
||||||
|
let disconnected_peers = peer_db.disconnected_peers().count();
|
||||||
|
let banned_peers = peer_db.banned_peers().count();
|
||||||
|
debug!(%total_peers, %connected_peers, %disconnected_peers, %banned_peers, "qbit - network statistics for peerdb");
|
||||||
|
}
|
||||||
|
|
||||||
async fn try_recv<T>(maybe_recv: &mut Option<mpsc::UnboundedReceiver<T>>) -> Option<T> {
|
async fn try_recv<T>(maybe_recv: &mut Option<mpsc::UnboundedReceiver<T>>) -> Option<T> {
|
||||||
match maybe_recv {
|
match maybe_recv {
|
||||||
None => None,
|
None => None,
|
||||||
@ -192,6 +235,24 @@ impl RouterService {
|
|||||||
message,
|
message,
|
||||||
..
|
..
|
||||||
} => {
|
} => {
|
||||||
|
if matches!(message, PubsubMessage::AnnounceFile(..)) {
|
||||||
|
let source_version = self
|
||||||
|
.network_globals
|
||||||
|
.peers
|
||||||
|
.read()
|
||||||
|
.peer_info(&source)
|
||||||
|
.map(|i| i.client().version.clone());
|
||||||
|
|
||||||
|
let prop_source_version = self
|
||||||
|
.network_globals
|
||||||
|
.peers
|
||||||
|
.read()
|
||||||
|
.peer_info(&propagation_source)
|
||||||
|
.map(|i| i.client().version.clone());
|
||||||
|
|
||||||
|
debug!(%source, ?source_version, %propagation_source, ?prop_source_version, "Received AnnounceFile pubsub message");
|
||||||
|
}
|
||||||
|
|
||||||
let result = self
|
let result = self
|
||||||
.libp2p_event_handler
|
.libp2p_event_handler
|
||||||
.on_pubsub_message(propagation_source, source, &id, message)
|
.on_pubsub_message(propagation_source, source, &id, message)
|
||||||
|
Loading…
Reference in New Issue
Block a user