Compare commits

...

6 Commits

Author SHA1 Message Date
boqiu
18bcd8f591 fmt 2024-12-04 18:17:07 +08:00
boqiu
686e7a42e8 aaa 2024-12-04 18:15:37 +08:00
boqiu
3bb242954b only sub NewFile gossip msg 2024-12-04 17:05:46 +08:00
boqiu
c548cb388c refactor log 2024-12-04 16:12:32 +08:00
boqiu
5b3946c363 add incoming stats for discv5 2024-12-04 15:56:21 +08:00
boqiu
43ba0b3079 dump network peers 2024-12-04 15:50:08 +08:00
3 changed files with 62 additions and 9 deletions

View File

@ -97,7 +97,7 @@ pub struct Discovery {
/// ///
/// This is behind a Reference counter to allow for futures to be spawned and polled with a /// This is behind a Reference counter to allow for futures to be spawned and polled with a
/// static lifetime. /// static lifetime.
discv5: Discv5, pub discv5: Discv5,
/// A collection of network constants that can be read from other threads. /// A collection of network constants that can be read from other threads.
network_globals: Arc<NetworkGlobals>, network_globals: Arc<NetworkGlobals>,

View File

@ -365,14 +365,6 @@ impl Libp2pEventHandler {
self.on_find_chunks(msg).await self.on_find_chunks(msg).await
} }
PubsubMessage::AnnounceFile(msgs) => { PubsubMessage::AnnounceFile(msgs) => {
let maybe_peer = self.network_globals.peers.read().peer_info(&source).cloned();
let ip = maybe_peer.clone().map(|info| info.seen_ip_addresses().collect::<Vec<IpAddr>>());
let agent = maybe_peer.clone().map(|info| info.client().agent_string.clone());
let connection = maybe_peer.map(|info| info.connection_status().clone());
debug!(batch = %msgs.len(), %source, ?ip, ?agent, ?connection, "Received AnnounceFile pubsub message");
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1); metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1);
for msg in msgs { for msg in msgs {

View File

@ -13,7 +13,9 @@ use network::{
}; };
use pruner::PrunerMessage; use pruner::PrunerMessage;
use shared_types::timestamp_now; use shared_types::timestamp_now;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use storage::log_store::Store as LogStore; use storage::log_store::Store as LogStore;
use storage_async::Store; use storage_async::Store;
use sync::{SyncMessage, SyncSender}; use sync::{SyncMessage, SyncSender};
@ -102,6 +104,7 @@ impl RouterService {
async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) { async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
let mut heartbeat_service = interval(self.config.heartbeat_interval); let mut heartbeat_service = interval(self.config.heartbeat_interval);
let mut heartbeat_batcher = interval(self.config.batcher_timeout); let mut heartbeat_batcher = interval(self.config.batcher_timeout);
let mut heartbeat_dump_peers = interval(Duration::from_secs(5));
loop { loop {
tokio::select! { tokio::select! {
@ -118,10 +121,50 @@ impl RouterService {
// heartbeat for expire file batcher // heartbeat for expire file batcher
_ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await, _ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await,
_ = heartbeat_dump_peers.tick() => self.on_dump_peers().await,
} }
} }
} }
async fn on_dump_peers(&mut self) {
let discovered_nodes = self
.libp2p
.swarm
.behaviour_mut()
.discovery_mut()
.discv5
.table_entries();
let connected_nodes = discovered_nodes
.iter()
.filter(|n| n.2.is_connected())
.count();
let disconnected_nodes = discovered_nodes.len() - connected_nodes;
let incoming_nodes = discovered_nodes
.iter()
.filter(|n| n.2.is_incoming())
.count();
let outgoing_nodes = discovered_nodes.len() - incoming_nodes;
debug!(%connected_nodes, %disconnected_nodes, %incoming_nodes, %outgoing_nodes, "qbit - network statistics for discv5");
let gossip = self.libp2p.swarm.behaviour().gs();
let all_peers = gossip.all_peers().count();
let mut peers_by_topic = HashMap::new();
for topic in gossip.topics() {
let topic_str = topic.as_str().to_string();
let peers = gossip.mesh_peers(topic).count();
peers_by_topic.insert(topic_str, peers);
}
debug!(%all_peers, ?peers_by_topic, "qbit - network statistics for gossip");
let peer_db = self.network_globals.peers.read();
let total_peers = peer_db.peers().count();
let connected_peers = peer_db.connected_peers().count();
let disconnected_peers = peer_db.disconnected_peers().count();
let banned_peers = peer_db.banned_peers().count();
debug!(%total_peers, %connected_peers, %disconnected_peers, %banned_peers, "qbit - network statistics for peerdb");
}
async fn try_recv<T>(maybe_recv: &mut Option<mpsc::UnboundedReceiver<T>>) -> Option<T> { async fn try_recv<T>(maybe_recv: &mut Option<mpsc::UnboundedReceiver<T>>) -> Option<T> {
match maybe_recv { match maybe_recv {
None => None, None => None,
@ -192,6 +235,24 @@ impl RouterService {
message, message,
.. ..
} => { } => {
if matches!(message, PubsubMessage::AnnounceFile(..)) {
let source_version = self
.network_globals
.peers
.read()
.peer_info(&source)
.map(|i| i.client().version.clone());
let prop_source_version = self
.network_globals
.peers
.read()
.peer_info(&propagation_source)
.map(|i| i.client().version.clone());
debug!(%source, ?source_version, %propagation_source, ?prop_source_version, "Received AnnounceFile pubsub message");
}
let result = self let result = self
.libp2p_event_handler .libp2p_event_handler
.on_pubsub_message(propagation_source, source, &id, message) .on_pubsub_message(propagation_source, source, &id, message)