diff --git a/node/network/src/discovery/mod.rs b/node/network/src/discovery/mod.rs index 554b64e..9c737f4 100644 --- a/node/network/src/discovery/mod.rs +++ b/node/network/src/discovery/mod.rs @@ -97,7 +97,7 @@ pub struct Discovery { /// /// This is behind a Reference counter to allow for futures to be spawned and polled with a /// static lifetime. - discv5: Discv5, + pub discv5: Discv5, /// A collection of network constants that can be read from other threads. network_globals: Arc, diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 3540c7e..f4412c4 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -365,14 +365,22 @@ impl Libp2pEventHandler { self.on_find_chunks(msg).await } PubsubMessage::AnnounceFile(msgs) => { - let maybe_peer = self.network_globals.peers.read().peer_info(&source).cloned(); - let ip = maybe_peer.clone().map(|info| info.seen_ip_addresses().collect::>()); - let agent = maybe_peer.clone().map(|info| info.client().agent_string.clone()); + let maybe_peer = self + .network_globals + .peers + .read() + .peer_info(&source) + .cloned(); + let ip = maybe_peer + .clone() + .map(|info| info.seen_ip_addresses().collect::>()); + let agent = maybe_peer + .clone() + .map(|info| info.client().agent_string.clone()); let connection = maybe_peer.map(|info| info.connection_status().clone()); debug!(batch = %msgs.len(), %source, ?ip, ?agent, ?connection, "Received AnnounceFile pubsub message"); - metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1); for msg in msgs { diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 8b46a41..54007d2 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -13,7 +13,9 @@ use network::{ }; use pruner::PrunerMessage; use shared_types::timestamp_now; +use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use storage::log_store::Store as LogStore; use storage_async::Store; use sync::{SyncMessage, SyncSender}; @@ -102,6 +104,7 @@ impl RouterService { async fn main(mut self, mut shutdown_sender: Sender) { let mut heartbeat_service = interval(self.config.heartbeat_interval); let mut heartbeat_batcher = interval(self.config.batcher_timeout); + let mut heartbeat_dump_peers = interval(Duration::from_secs(5)); loop { tokio::select! { @@ -118,10 +121,45 @@ impl RouterService { // heartbeat for expire file batcher _ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await, + + _ = heartbeat_dump_peers.tick() => self.on_dump_peers().await, } } } + async fn on_dump_peers(&mut self) { + let discovered_nodes = self + .libp2p + .swarm + .behaviour_mut() + .discovery_mut() + .discv5 + .table_entries(); + let connected_nodes = discovered_nodes + .iter() + .filter(|n| n.2.is_connected()) + .count(); + let disconnected_nodes = discovered_nodes.len() - connected_nodes; + debug!(%connected_nodes, %disconnected_nodes, "qbit - network statistics for discv5"); + + let gossip = self.libp2p.swarm.behaviour().gs(); + let all_peers = gossip.all_peers().count(); + let mut peers_by_topic = HashMap::new(); + for topic in gossip.topics() { + let topic_str = topic.as_str().to_string(); + let peers = gossip.mesh_peers(topic).count(); + peers_by_topic.insert(topic_str, peers); + } + debug!(%all_peers, ?peers_by_topic, "qbit - network statistics for gossip"); + + let peer_db = self.network_globals.peers.read(); + let total_peers = peer_db.peers().count(); + let connected_peers = peer_db.connected_peers().count(); + let disconnected_peers = peer_db.disconnected_peers().count(); + let banned_peers = peer_db.banned_peers().count(); + debug!(%total_peers, %connected_peers, %disconnected_peers, %banned_peers, "qbit - network statistics for peerdb"); + } + async fn try_recv(maybe_recv: &mut Option>) -> Option { match maybe_recv { None => None,