From 2fd9712d592716287031b678e09fd262e0c9fa84 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Thu, 29 Aug 2024 09:55:24 +0800 Subject: [PATCH] Enhance P2P network protocol to support batch messages for performance concern (#173) * Add p2p protocol version in network identity * Cache annouce file pubsub messages to publish in batch * fix file location cache * opt sync metrics * opt file location cache default configs * publish files announcements in batch * enhance announce file pubsub msg metrics * opt metrics * fix ci * fix clippy * fix batcher * minor fix * opt batcher: publish all if expired --- .../src/file_location_cache.rs | 30 ++-- node/file_location_cache/src/lib.rs | 2 +- node/file_location_cache/src/test_util.rs | 2 +- node/network/src/lib.rs | 2 + node/network/src/types/pubsub.rs | 9 +- node/router/src/batcher.rs | 117 +++++++++++++ node/router/src/lib.rs | 14 ++ node/router/src/libp2p_event_handler.rs | 161 +++++++++++++----- node/router/src/metrics.rs | 57 +++++-- node/router/src/service.rs | 16 +- node/shared_types/src/lib.rs | 12 ++ node/src/config/convert.rs | 7 +- run/config-testnet-standard.toml | 17 +- run/config-testnet-turbo.toml | 17 +- run/config.toml | 17 +- 15 files changed, 390 insertions(+), 90 deletions(-) create mode 100644 node/router/src/batcher.rs diff --git a/node/file_location_cache/src/file_location_cache.rs b/node/file_location_cache/src/file_location_cache.rs index 6e278d6..b26b51f 100644 --- a/node/file_location_cache/src/file_location_cache.rs +++ b/node/file_location_cache/src/file_location_cache.rs @@ -161,9 +161,7 @@ impl FileCache { } /// Insert the specified `announcement` into cache. - fn insert(&mut self, announcement: SignedAnnounceFile) { - let tx_id = announcement.tx_id; - + fn insert(&mut self, tx_id: TxID, announcement: SignedAnnounceFile) { let item = self.files.entry(tx_id).or_insert_with(|| { AnnouncementCache::new( self.config.max_entries_per_file, @@ -290,8 +288,12 @@ impl FileLocationCache { shard_id: announcement.shard_id, num_shard: announcement.num_shard, }; - self.cache.lock().insert(announcement); self.insert_peer_config(peer_id, shard_config); + + let mut cache = self.cache.lock(); + for tx_id in announcement.tx_ids.iter() { + cache.insert(*tx_id, announcement.clone()); + } } pub fn get_one(&self, tx_id: TxID) -> Option { @@ -534,7 +536,7 @@ mod tests { } fn assert_file(file: &SignedAnnounceFile, tx_id: TxID, peer_id: PeerId, timestamp: u32) { - assert_eq!(file.tx_id, tx_id); + assert_eq!(file.tx_ids[0], tx_id); assert_eq!(PeerId::from(file.peer_id.clone()), peer_id); assert_eq!(file.timestamp, timestamp); } @@ -551,11 +553,11 @@ mod tests { let tx1 = TxID::random_hash(1); let tx2 = TxID::random_hash(2); - cache.insert(create_file_2(tx1, peer1, now - 1)); + cache.insert(tx1, create_file_2(tx1, peer1, now - 1)); assert_eq!(cache.total_announcements, 1); - cache.insert(create_file_2(tx2, peer1, now - 2)); + cache.insert(tx2, create_file_2(tx2, peer1, now - 2)); assert_eq!(cache.total_announcements, 2); - cache.insert(create_file_2(tx1, peer2, now - 3)); + cache.insert(tx1, create_file_2(tx1, peer2, now - 3)); assert_eq!(cache.total_announcements, 3); assert_file(&cache.pop().unwrap(), tx1, peer2, now - 3); @@ -573,18 +575,18 @@ mod tests { let now = timestamp_now(); let tx1 = TxID::random_hash(1); - cache.insert(create_file_2(tx1, PeerId::random(), now - 7)); - cache.insert(create_file_2(tx1, PeerId::random(), now - 8)); - cache.insert(create_file_2(tx1, PeerId::random(), now - 9)); + cache.insert(tx1, create_file_2(tx1, PeerId::random(), now - 7)); + cache.insert(tx1, create_file_2(tx1, PeerId::random(), now - 8)); + cache.insert(tx1, create_file_2(tx1, PeerId::random(), now - 9)); assert_eq!(cache.total_announcements, 3); // insert more files and cause to max entries limited let tx2 = TxID::random_hash(2); - cache.insert(create_file_2(tx2, PeerId::random(), now - 1)); + cache.insert(tx2, create_file_2(tx2, PeerId::random(), now - 1)); assert_all_files(cache.all(tx1).unwrap_or_default(), vec![now - 8, now - 7]); - cache.insert(create_file_2(tx2, PeerId::random(), now - 2)); + cache.insert(tx2, create_file_2(tx2, PeerId::random(), now - 2)); assert_all_files(cache.all(tx1).unwrap_or_default(), vec![now - 7]); - cache.insert(create_file_2(tx2, PeerId::random(), now - 3)); + cache.insert(tx2, create_file_2(tx2, PeerId::random(), now - 3)); assert_all_files(cache.all(tx1).unwrap_or_default(), vec![]); assert_all_files( diff --git a/node/file_location_cache/src/lib.rs b/node/file_location_cache/src/lib.rs index 1636868..ff12c5e 100644 --- a/node/file_location_cache/src/lib.rs +++ b/node/file_location_cache/src/lib.rs @@ -16,7 +16,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Config { - max_entries_total: 4096, + max_entries_total: 256000, max_entries_per_file: 4, entry_expiration_time_secs: 3600, } diff --git a/node/file_location_cache/src/test_util.rs b/node/file_location_cache/src/test_util.rs index 6456e37..b43f6d2 100644 --- a/node/file_location_cache/src/test_util.rs +++ b/node/file_location_cache/src/test_util.rs @@ -35,7 +35,7 @@ impl AnnounceFileBuilder { let timestamp = self.timestamp.unwrap_or_else(timestamp_now); let msg = AnnounceFile { - tx_id, + tx_ids: vec![tx_id], num_shard: 1, shard_id: 0, peer_id: peer_id.into(), diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index f30ca3c..5cc8550 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -95,6 +95,8 @@ pub use peer_manager::{ }; pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME}; +pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0]; + /// Application level requests sent to the network. #[derive(Debug, Clone, Copy)] pub enum RequestId { diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 6e476a4..13f6a60 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -130,7 +130,7 @@ pub struct FindChunks { #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] pub struct AnnounceFile { - pub tx_id: TxID, + pub tx_ids: Vec, pub num_shard: usize, pub shard_id: usize, pub peer_id: WrappedPeerId, @@ -200,12 +200,14 @@ pub type SignedAnnounceFile = SignedMessage; pub type SignedAnnounceShardConfig = SignedMessage; pub type SignedAnnounceChunks = SignedMessage; +type SignedAnnounceFiles = Vec; + #[derive(Debug, Clone, PartialEq, Eq)] pub enum PubsubMessage { ExampleMessage(u64), FindFile(FindFile), FindChunks(FindChunks), - AnnounceFile(SignedAnnounceFile), + AnnounceFile(Vec), AnnounceShardConfig(SignedAnnounceShardConfig), AnnounceChunks(SignedAnnounceChunks), } @@ -314,7 +316,8 @@ impl PubsubMessage { FindChunks::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, )), GossipKind::AnnounceFile => Ok(PubsubMessage::AnnounceFile( - SignedAnnounceFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, + SignedAnnounceFiles::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, )), GossipKind::AnnounceChunks => Ok(PubsubMessage::AnnounceChunks( SignedAnnounceChunks::from_ssz_bytes(data) diff --git a/node/router/src/batcher.rs b/node/router/src/batcher.rs new file mode 100644 index 0000000..fc7bf05 --- /dev/null +++ b/node/router/src/batcher.rs @@ -0,0 +1,117 @@ +use std::{ + collections::VecDeque, + sync::Arc, + time::{Duration, Instant}, +}; + +use ::metrics::{Histogram, Sample}; + +/// `Batcher` is used to handle data in batch, when `capacity` or `timeout` matches. +pub(crate) struct Batcher { + items: VecDeque, + earliest_time: Option, + capacity: usize, + timeout: Duration, + metrics_batch_size: Arc, +} + +impl Batcher { + pub fn new(capacity: usize, timeout: Duration, name: &str) -> Self { + Self { + items: VecDeque::with_capacity(capacity), + earliest_time: None, + capacity, + timeout, + metrics_batch_size: Sample::ExpDecay(0.015).register_with_group( + "router_batcher_size", + name, + 1024, + ), + } + } + + fn remove_all(&mut self) -> Option> { + let size = self.items.len(); + if size == 0 { + return None; + } + + self.metrics_batch_size.update(size as u64); + self.earliest_time = None; + + Some(Vec::from_iter(self.items.split_off(0).into_iter().rev())) + } + + pub fn add(&mut self, value: T) -> Option> { + self.add_with_time(value, Instant::now()) + } + + fn add_with_time(&mut self, value: T, now: Instant) -> Option> { + // push at front so as to use `split_off` to remove expired items + self.items.push_front(value); + if self.earliest_time.is_none() { + self.earliest_time = Some(now); + } + + // cache if not full + let size = self.items.len(); + if size < self.capacity { + return None; + } + + // cache is full + self.remove_all() + } + + pub fn expire(&mut self) -> Option> { + self.expire_with_time(Instant::now()) + } + + fn expire_with_time(&mut self, now: Instant) -> Option> { + if now.duration_since(self.earliest_time?) < self.timeout { + None + } else { + self.remove_all() + } + } +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use super::Batcher; + + #[test] + fn test_add() { + let mut batcher: Batcher = Batcher::new(3, Duration::from_secs(10), "test"); + + assert_eq!(batcher.add(1), None); + assert_eq!(batcher.add(2), None); + assert_eq!(batcher.add(3), Some(vec![1, 2, 3])); + assert_eq!(batcher.items.len(), 0); + } + + #[test] + fn test_expire() { + let mut batcher: Batcher = Batcher::new(5, Duration::from_secs(10), "test"); + + let now = Instant::now(); + + // enqueue: 1, 2, 3, 4 + assert_eq!(batcher.add_with_time(1, now + Duration::from_secs(1)), None); + assert_eq!(batcher.add_with_time(2, now + Duration::from_secs(2)), None); + assert_eq!(batcher.add_with_time(3, now + Duration::from_secs(4)), None); + assert_eq!(batcher.add_with_time(4, now + Duration::from_secs(5)), None); + + // expire None + assert_eq!(batcher.expire_with_time(now + Duration::from_secs(6)), None); + + // expire all + assert_eq!( + batcher.expire_with_time(now + Duration::from_secs(13)), + Some(vec![1, 2, 3, 4]) + ); + assert_eq!(batcher.items.len(), 0); + } +} diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 8685f5e..5cd21c7 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate tracing; +mod batcher; mod libp2p_event_handler; mod metrics; mod peer_manager; @@ -25,6 +26,15 @@ pub struct Config { pub libp2p_nodes: Vec, pub private_ip_enabled: bool, pub check_announced_ip: bool, + + // batcher + /// Timeout to publish messages in batch + #[serde(deserialize_with = "deserialize_duration")] + pub batcher_timeout: Duration, + /// Number of files in an announcement + pub batcher_file_capacity: usize, + /// Number of announcements in a pubsub message + pub batcher_announcement_capacity: usize, } impl Default for Config { @@ -37,6 +47,10 @@ impl Default for Config { libp2p_nodes: vec![], private_ip_enabled: false, check_announced_ip: false, + + batcher_timeout: Duration::from_secs(1), + batcher_file_capacity: 1, + batcher_announcement_capacity: 1, } } } diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 1ca73c0..05619b4 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -23,6 +23,7 @@ use sync::{SyncMessage, SyncSender}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{mpsc, RwLock}; +use crate::batcher::Batcher; use crate::metrics; use crate::peer_manager::PeerManager; use crate::Config; @@ -94,6 +95,10 @@ pub struct Libp2pEventHandler { file_location_cache: Arc, /// All connected peers. peers: Arc>, + /// Files to announce in batch + file_batcher: RwLock>, + /// Announcements to publish in batch + announcement_batcher: RwLock>, } impl Libp2pEventHandler { @@ -109,6 +114,18 @@ impl Libp2pEventHandler { file_location_cache: Arc, peers: Arc>, ) -> Self { + let file_batcher = RwLock::new(Batcher::new( + config.batcher_file_capacity, + config.batcher_timeout, + "file", + )); + + let announcement_batcher = RwLock::new(Batcher::new( + config.batcher_announcement_capacity, + config.batcher_timeout, + "announcement", + )); + Self { config, network_globals, @@ -119,6 +136,8 @@ impl Libp2pEventHandler { store, file_location_cache, peers, + file_batcher, + announcement_batcher, } } @@ -190,7 +209,7 @@ impl Libp2pEventHandler { match request { Request::Status(status) => { self.on_status_request(peer_id, request_id, status); - metrics::LIBP2P_HANDLE_REQUEST_STATUS.mark(1); + metrics::LIBP2P_HANDLE_STATUS_REQUEST.mark(1); } Request::GetChunks(request) => { self.send_to_sync(SyncMessage::RequestChunks { @@ -198,7 +217,7 @@ impl Libp2pEventHandler { request_id, request, }); - metrics::LIBP2P_HANDLE_REQUEST_GET_CHUNKS.mark(1); + metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1); } Request::DataByHash(_) => { // ignore @@ -241,8 +260,8 @@ impl Libp2pEventHandler { debug!(%peer_id, ?status_message, "Received Status response"); match request_id { RequestId::Router(since) => { - metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1); - metrics::LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY.update_since(since); + metrics::LIBP2P_HANDLE_STATUS_RESPONSE.mark(1); + metrics::LIBP2P_HANDLE_STATUS_RESPONSE_LATENCY.update_since(since); } _ => unreachable!("All status response belong to router"), } @@ -251,8 +270,8 @@ impl Libp2pEventHandler { Response::Chunks(response) => { let request_id = match request_id { RequestId::Sync(since, sync_id) => { - metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1); - metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY.update_since(since); + metrics::LIBP2P_HANDLE_GET_CHUNKS_RESPONSE.mark(1); + metrics::LIBP2P_HANDLE_GET_CHUNKS_RESPONSE_LATENCY.update_since(since); sync_id } _ => unreachable!("All Chunks responses belong to sync"), @@ -305,9 +324,18 @@ impl Libp2pEventHandler { metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1); self.on_find_chunks(msg).await } - PubsubMessage::AnnounceFile(msg) => { + PubsubMessage::AnnounceFile(msgs) => { metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1); - self.on_announce_file(propagation_source, msg) + + for msg in msgs { + match self.on_announce_file(propagation_source, msg) { + MessageAcceptance::Reject => return MessageAcceptance::Reject, + MessageAcceptance::Ignore => return MessageAcceptance::Ignore, + _ => {} + } + } + + MessageAcceptance::Accept } PubsubMessage::AnnounceChunks(msg) => { metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS.mark(1); @@ -382,7 +410,14 @@ impl Libp2pEventHandler { false } - pub async fn construct_announce_file_message(&self, tx_id: TxID) -> Option { + pub async fn construct_announce_file_message( + &self, + tx_ids: Vec, + ) -> Option { + if tx_ids.is_empty() { + return None; + } + let peer_id = *self.network_globals.peer_id.read(); let addr = self.get_listen_addr_or_add().await?; @@ -391,7 +426,7 @@ impl Libp2pEventHandler { let shard_config = self.store.get_store().flow().get_shard_config(); let msg = AnnounceFile { - tx_id, + tx_ids, num_shard: shard_config.num_shard, shard_id: shard_config.shard_id, peer_id: peer_id.into(), @@ -402,14 +437,14 @@ impl Libp2pEventHandler { let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) { Ok(signed) => signed, Err(e) => { - error!(%tx_id.seq, %e, "Failed to sign AnnounceFile message"); + error!(%e, "Failed to sign AnnounceFile message"); return None; } }; signed.resend_timestamp = timestamp; - Some(PubsubMessage::AnnounceFile(signed)) + Some(signed) } pub async fn construct_announce_shard_config_message( @@ -447,10 +482,11 @@ impl Libp2pEventHandler { // verify timestamp let d = duration_since( timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_FILE.clone(), + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(), ); if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message"); + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1); return MessageAcceptance::Ignore; } @@ -460,14 +496,10 @@ impl Libp2pEventHandler { if tx.id() == tx_id { trace!(?tx_id, "Found file locally, responding to FindFile query"); - return match self.construct_announce_file_message(tx_id).await { - Some(msg) => { - self.publish(msg); - MessageAcceptance::Ignore - } - // propagate FindFile query to other nodes - None => MessageAcceptance::Accept, - }; + if self.publish_file(tx_id).await.is_some() { + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1); + return MessageAcceptance::Ignore; + } } } } @@ -477,12 +509,15 @@ impl Libp2pEventHandler { trace!(?tx_id, "Found file in cache, responding to FindFile query"); msg.resend_timestamp = timestamp_now(); - self.publish(PubsubMessage::AnnounceFile(msg)); + self.publish_announcement(msg).await; + + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_CACHE.mark(1); return MessageAcceptance::Ignore; } // propagate FindFile query to other nodes + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1); MessageAcceptance::Accept } @@ -528,7 +563,7 @@ impl Libp2pEventHandler { // verify timestamp let d = duration_since( msg.timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_CHUNKS.clone(), + metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS_LATENCY.clone(), ); if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT { debug!(%msg.timestamp, ?d, "Invalid timestamp, ignoring FindChunks message"); @@ -624,6 +659,9 @@ impl Libp2pEventHandler { propagation_source: PeerId, msg: SignedAnnounceFile, ) -> MessageAcceptance { + metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_ANNOUNCEMENTS.mark(1); + metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_FILES.mark(msg.tx_ids.len()); + // verify message signature if !verify_signature(&msg, &msg.peer_id, propagation_source) { return MessageAcceptance::Reject; @@ -646,19 +684,22 @@ impl Libp2pEventHandler { // propagate gossip to peers let d = duration_since( msg.resend_timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_FILE.clone(), + metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY.clone(), ); if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT { debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceFile message"); + metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT.mark(1); return MessageAcceptance::Ignore; } // notify sync layer - self.send_to_sync(SyncMessage::AnnounceFileGossip { - tx_id: msg.tx_id, - peer_id: msg.peer_id.clone().into(), - addr, - }); + for tx_id in msg.tx_ids.iter() { + self.send_to_sync(SyncMessage::AnnounceFileGossip { + tx_id: *tx_id, + peer_id: msg.peer_id.clone().into(), + addr: addr.clone(), + }); + } // insert message to cache self.file_location_cache.insert(msg); @@ -693,7 +734,7 @@ impl Libp2pEventHandler { // propagate gossip to peers let d = duration_since( msg.resend_timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_SHARD.clone(), + metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(), ); if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT { debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message"); @@ -745,7 +786,7 @@ impl Libp2pEventHandler { // propagate gossip to peers let d = duration_since( msg.resend_timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_CHUNKS.clone(), + metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS_LATENCY.clone(), ); if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT { debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceChunks message"); @@ -774,6 +815,39 @@ impl Libp2pEventHandler { }) } } + + pub async fn publish_file(&self, tx_id: TxID) -> Option { + match self.file_batcher.write().await.add(tx_id) { + Some(batch) => { + let announcement = self.construct_announce_file_message(batch).await?; + Some(self.publish_announcement(announcement).await) + } + None => Some(false), + } + } + + async fn publish_announcement(&self, announcement: SignedAnnounceFile) -> bool { + match self.announcement_batcher.write().await.add(announcement) { + Some(batch) => { + self.publish(PubsubMessage::AnnounceFile(batch)); + true + } + None => false, + } + } + + /// Publish expired file announcements. + pub async fn expire_batcher(&self) { + if let Some(batch) = self.file_batcher.write().await.expire() { + if let Some(announcement) = self.construct_announce_file_message(batch).await { + self.publish_announcement(announcement).await; + } + } + + if let Some(batch) = self.announcement_batcher.write().await.expire() { + self.publish(PubsubMessage::AnnounceFile(batch)); + } + } } #[cfg(test)] @@ -895,7 +969,7 @@ mod tests { Ok(NetworkMessage::Publish { messages }) => { assert_eq!(messages.len(), 1); assert!( - matches!(&messages[0], PubsubMessage::AnnounceFile(file) if file.tx_id == expected_tx_id) + matches!(&messages[0], PubsubMessage::AnnounceFile(files) if files[0].tx_ids[0] == expected_tx_id) ); } Ok(_) => panic!("Unexpected network message type received"), @@ -1185,18 +1259,13 @@ mod tests { let tx_id = TxID::random_hash(412); // change signed message - let message = match handler - .construct_announce_file_message(tx_id) + let mut file = handler + .construct_announce_file_message(vec![tx_id]) .await - .unwrap() - { - PubsubMessage::AnnounceFile(mut file) => { - let malicious_addr: Multiaddr = "/ip4/127.0.0.38/tcp/30000".parse().unwrap(); - file.inner.at = malicious_addr.into(); - PubsubMessage::AnnounceFile(file) - } - _ => panic!("Unexpected pubsub message type"), - }; + .unwrap(); + let malicious_addr: Multiaddr = "/ip4/127.0.0.38/tcp/30000".parse().unwrap(); + file.inner.at = malicious_addr.into(); + let message = PubsubMessage::AnnounceFile(vec![file]); // failed to verify signature let result = handler.on_pubsub_message(alice, bob, &id, message).await; @@ -1212,7 +1281,11 @@ mod tests { let (alice, bob) = (PeerId::random(), PeerId::random()); let id = MessageId::new(b"dummy message"); let tx = TxID::random_hash(412); - let message = handler.construct_announce_file_message(tx).await.unwrap(); + let message = handler + .construct_announce_file_message(vec![tx]) + .await + .unwrap(); + let message = PubsubMessage::AnnounceFile(vec![message]); // succeeded to handle let result = handler.on_pubsub_message(alice, bob, &id, message).await; diff --git a/node/router/src/metrics.rs b/node/router/src/metrics.rs index 282029b..03c3081 100644 --- a/node/router/src/metrics.rs +++ b/node/router/src/metrics.rs @@ -23,32 +23,55 @@ lazy_static::lazy_static! { pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024); // libp2p_event_handler - pub static ref LIBP2P_SEND_STATUS: Arc = register_meter("router_libp2p_send_status"); + // libp2p_event_handler: peer connection pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing"); pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming"); pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc = register_meter("router_libp2p_handle_peer_disconnected"); - pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc = register_meter("router_libp2p_handle_request_status"); - pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc = register_meter("router_libp2p_handle_request_get_chunks"); - pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc = register_meter_with_group("router_libp2p_handle_response_status", "qps"); - pub static ref LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_status", "latency", 1024); - pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc = register_meter_with_group("router_libp2p_handle_response_get_chunks", "qps"); - pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_get_chunks", "latency", 1024); + + // libp2p_event_handler: status + pub static ref LIBP2P_SEND_STATUS: Arc = register_meter("router_libp2p_send_status"); + pub static ref LIBP2P_HANDLE_STATUS_REQUEST: Arc = register_meter("router_libp2p_handle_status_request"); + pub static ref LIBP2P_HANDLE_STATUS_RESPONSE: Arc = register_meter_with_group("router_libp2p_handle_status_response", "qps"); + pub static ref LIBP2P_HANDLE_STATUS_RESPONSE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_status_response", "latency", 1024); + + // libp2p_event_handler: get chunks + pub static ref LIBP2P_HANDLE_GET_CHUNKS_REQUEST: Arc = register_meter("router_libp2p_handle_get_chunks_request"); + pub static ref LIBP2P_HANDLE_GET_CHUNKS_RESPONSE: Arc = register_meter_with_group("router_libp2p_handle_get_chunks_response", "qps"); + pub static ref LIBP2P_HANDLE_GET_CHUNKS_RESPONSE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_get_chunks_response", "latency", 1024); + + // libp2p_event_handler: rpc errors pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc = register_meter_with_group("router_libp2p_handle_response_error", "qps"); pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc = register_meter("router_libp2p_handle_pubsub_find_file"); - pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc = register_meter("router_libp2p_handle_pubsub_find_chunks"); - pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc = register_meter("router_libp2p_handle_pubsub_announce_file"); - pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS: Arc = register_meter("router_libp2p_handle_pubsub_announce_chunks"); - pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: Arc = register_meter("router_libp2p_handle_pubsub_announce_shard"); - pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_FILE: Arc = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_find_file", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_CHUNKS: Arc = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_find_chunks", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_FILE: Arc = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_file", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_CHUNKS: Arc = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_chunks", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_SHARD: Arc = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_shard", 1024); + // libp2p_event_handler: find & announce file + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps"); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "timeout"); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "store"); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_CACHE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "cache"); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "forward"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "qps"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_file", "latency", 1024); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "timeout"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_ANNOUNCEMENTS: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "announcements"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_FILES: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "files"); + // libp2p_event_handler: find & announce chunks + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_chunks", "qps"); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_chunks", "latency", 1024); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_chunks", "qps"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_chunks", "latency", 1024); + + // libp2p_event_handler: announce shard config + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_shard", "qps"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_shard", "latency", 1024); + + // libp2p_event_handler: verify IP address pub static ref LIBP2P_VERIFY_ANNOUNCED_IP: Arc = register_meter("router_libp2p_verify_announced_ip"); pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_UNSEEN: Arc = register_meter("router_libp2p_verify_announced_ip_unseen"); pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_MISMATCH: Arc = register_meter("router_libp2p_verify_announced_ip_mismatch"); + + // batcher + pub static ref BATCHER_ANNOUNCE_FILE_SIZE: Arc = Sample::ExpDecay(0.015).register("router_batcher_announce_file_size", 1024); } diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 11d04f9..938c2c3 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -95,7 +95,8 @@ impl RouterService { } async fn main(mut self, mut shutdown_sender: Sender) { - let mut heartbeat = interval(self.config.heartbeat_interval); + let mut heartbeat_service = interval(self.config.heartbeat_interval); + let mut heartbeat_batcher = interval(self.config.batcher_timeout); loop { tokio::select! { @@ -107,8 +108,11 @@ impl RouterService { Some(msg) = Self::try_recv(&mut self.pruner_recv) => self.on_pruner_msg(msg).await, - // heartbeat - _ = heartbeat.tick() => self.on_heartbeat().await, + // heartbeat for service + _ = heartbeat_service.tick() => self.on_heartbeat().await, + + // heartbeat for expire file batcher + _ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await, } } } @@ -324,12 +328,12 @@ impl RouterService { } } NetworkMessage::AnnounceLocalFile { tx_id } => { - if let Some(msg) = self + if self .libp2p_event_handler - .construct_announce_file_message(tx_id) + .publish_file(tx_id) .await + .is_some() { - self.libp2p_event_handler.publish(msg); metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); } } diff --git a/node/shared_types/src/lib.rs b/node/shared_types/src/lib.rs index 76e2903..11340c5 100644 --- a/node/shared_types/src/lib.rs +++ b/node/shared_types/src/lib.rs @@ -376,4 +376,16 @@ pub struct NetworkIdentity { /// The address of the deployed Flow contract on the blockchain. pub flow_address: Address, + + /// P2P network protocol version. + pub p2p_protocol_version: ProtocolVersion, +} + +#[derive( + DeriveEncode, DeriveDecode, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, +)] +pub struct ProtocolVersion { + pub major: u8, + pub minor: u8, + pub build: u8, } diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index c701474..dc0113b 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -8,7 +8,7 @@ use miner::MinerConfig; use network::NetworkConfig; use pruner::PrunerConfig; use rpc::RPCConfig; -use shared_types::NetworkIdentity; +use shared_types::{NetworkIdentity, ProtocolVersion}; use std::net::IpAddr; use std::time::Duration; use storage::config::ShardConfig; @@ -41,6 +41,11 @@ impl ZgsConfig { network_config.network_id = NetworkIdentity { chain_id, flow_address, + p2p_protocol_version: ProtocolVersion { + major: network::PROTOCOL_VERSION[0], + minor: network::PROTOCOL_VERSION[1], + build: network::PROTOCOL_VERSION[2], + }, }; if !self.network_disable_discovery { diff --git a/run/config-testnet-standard.toml b/run/config-testnet-standard.toml index fe6b311..867ee04 100644 --- a/run/config-testnet-standard.toml +++ b/run/config-testnet-standard.toml @@ -218,6 +218,21 @@ reward_contract_address = "0x0496D0817BD8519e0de4894Dc379D35c35275609" # # prune_batch_wait_time_ms = 1000 +####################################################################### +### Router Config Options ### +####################################################################### + +[router] + +# Timeout to publish file announcements in batch. +# batcher_timeout = "1s" + +# Number of files in an announcement to publish in batch. +batcher_file_capacity = 10 + +# Number of announcements in a pubsub message to publish in batch. +batcher_announcement_capacity = 100 + ####################################################################### ### File Sync Config Options ### ####################################################################### @@ -272,7 +287,7 @@ auto_sync_enabled = true # When the cache is full, the storage position information with oldest timestamp will be replaced. # Global cache capacity. -# max_entries_total = 4096 +# max_entries_total = 256000 # Location information capacity for each file. # max_entries_per_file = 4 diff --git a/run/config-testnet-turbo.toml b/run/config-testnet-turbo.toml index 625621a..44f48a7 100644 --- a/run/config-testnet-turbo.toml +++ b/run/config-testnet-turbo.toml @@ -230,6 +230,21 @@ reward_contract_address = "0x51998C4d486F406a788B766d93510980ae1f9360" # # prune_batch_wait_time_ms = 1000 +####################################################################### +### Router Config Options ### +####################################################################### + +[router] + +# Timeout to publish file announcements in batch. +# batcher_timeout = "1s" + +# Number of files in an announcement to publish in batch. +batcher_file_capacity = 10 + +# Number of announcements in a pubsub message to publish in batch. +batcher_announcement_capacity = 100 + ####################################################################### ### File Sync Config Options ### ####################################################################### @@ -284,7 +299,7 @@ auto_sync_enabled = true # When the cache is full, the storage position information with oldest timestamp will be replaced. # Global cache capacity. -# max_entries_total = 4096 +# max_entries_total = 256000 # Location information capacity for each file. # max_entries_per_file = 4 diff --git a/run/config.toml b/run/config.toml index 76309c7..ff6a472 100644 --- a/run/config.toml +++ b/run/config.toml @@ -232,6 +232,21 @@ # # prune_batch_wait_time_ms = 1000 +####################################################################### +### Router Config Options ### +####################################################################### + +# [router] + +# Timeout to publish file announcements in batch. +# batcher_timeout = "1s" + +# Number of files in an announcement to publish in batch. +# batcher_file_capacity = 1 + +# Number of announcements in a pubsub message to publish in batch. +# batcher_announcement_capacity = 1 + ####################################################################### ### File Sync Config Options ### ####################################################################### @@ -286,7 +301,7 @@ # When the cache is full, the storage position information with oldest timestamp will be replaced. # Global cache capacity. -# max_entries_total = 4096 +# max_entries_total = 256000 # Location information capacity for each file. # max_entries_per_file = 4