mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-12 16:15:17 +00:00
Enhance P2P network protocol to support batch messages for performance concern (#173)
* Add p2p protocol version in network identity * Cache annouce file pubsub messages to publish in batch * fix file location cache * opt sync metrics * opt file location cache default configs * publish files announcements in batch * enhance announce file pubsub msg metrics * opt metrics * fix ci * fix clippy * fix batcher * minor fix * opt batcher: publish all if expired
This commit is contained in:
parent
a79f7bbf12
commit
2fd9712d59
@ -161,9 +161,7 @@ impl FileCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Insert the specified `announcement` into cache.
|
/// Insert the specified `announcement` into cache.
|
||||||
fn insert(&mut self, announcement: SignedAnnounceFile) {
|
fn insert(&mut self, tx_id: TxID, announcement: SignedAnnounceFile) {
|
||||||
let tx_id = announcement.tx_id;
|
|
||||||
|
|
||||||
let item = self.files.entry(tx_id).or_insert_with(|| {
|
let item = self.files.entry(tx_id).or_insert_with(|| {
|
||||||
AnnouncementCache::new(
|
AnnouncementCache::new(
|
||||||
self.config.max_entries_per_file,
|
self.config.max_entries_per_file,
|
||||||
@ -290,8 +288,12 @@ impl FileLocationCache {
|
|||||||
shard_id: announcement.shard_id,
|
shard_id: announcement.shard_id,
|
||||||
num_shard: announcement.num_shard,
|
num_shard: announcement.num_shard,
|
||||||
};
|
};
|
||||||
self.cache.lock().insert(announcement);
|
|
||||||
self.insert_peer_config(peer_id, shard_config);
|
self.insert_peer_config(peer_id, shard_config);
|
||||||
|
|
||||||
|
let mut cache = self.cache.lock();
|
||||||
|
for tx_id in announcement.tx_ids.iter() {
|
||||||
|
cache.insert(*tx_id, announcement.clone());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_one(&self, tx_id: TxID) -> Option<SignedAnnounceFile> {
|
pub fn get_one(&self, tx_id: TxID) -> Option<SignedAnnounceFile> {
|
||||||
@ -534,7 +536,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn assert_file(file: &SignedAnnounceFile, tx_id: TxID, peer_id: PeerId, timestamp: u32) {
|
fn assert_file(file: &SignedAnnounceFile, tx_id: TxID, peer_id: PeerId, timestamp: u32) {
|
||||||
assert_eq!(file.tx_id, tx_id);
|
assert_eq!(file.tx_ids[0], tx_id);
|
||||||
assert_eq!(PeerId::from(file.peer_id.clone()), peer_id);
|
assert_eq!(PeerId::from(file.peer_id.clone()), peer_id);
|
||||||
assert_eq!(file.timestamp, timestamp);
|
assert_eq!(file.timestamp, timestamp);
|
||||||
}
|
}
|
||||||
@ -551,11 +553,11 @@ mod tests {
|
|||||||
let tx1 = TxID::random_hash(1);
|
let tx1 = TxID::random_hash(1);
|
||||||
let tx2 = TxID::random_hash(2);
|
let tx2 = TxID::random_hash(2);
|
||||||
|
|
||||||
cache.insert(create_file_2(tx1, peer1, now - 1));
|
cache.insert(tx1, create_file_2(tx1, peer1, now - 1));
|
||||||
assert_eq!(cache.total_announcements, 1);
|
assert_eq!(cache.total_announcements, 1);
|
||||||
cache.insert(create_file_2(tx2, peer1, now - 2));
|
cache.insert(tx2, create_file_2(tx2, peer1, now - 2));
|
||||||
assert_eq!(cache.total_announcements, 2);
|
assert_eq!(cache.total_announcements, 2);
|
||||||
cache.insert(create_file_2(tx1, peer2, now - 3));
|
cache.insert(tx1, create_file_2(tx1, peer2, now - 3));
|
||||||
assert_eq!(cache.total_announcements, 3);
|
assert_eq!(cache.total_announcements, 3);
|
||||||
|
|
||||||
assert_file(&cache.pop().unwrap(), tx1, peer2, now - 3);
|
assert_file(&cache.pop().unwrap(), tx1, peer2, now - 3);
|
||||||
@ -573,18 +575,18 @@ mod tests {
|
|||||||
let now = timestamp_now();
|
let now = timestamp_now();
|
||||||
|
|
||||||
let tx1 = TxID::random_hash(1);
|
let tx1 = TxID::random_hash(1);
|
||||||
cache.insert(create_file_2(tx1, PeerId::random(), now - 7));
|
cache.insert(tx1, create_file_2(tx1, PeerId::random(), now - 7));
|
||||||
cache.insert(create_file_2(tx1, PeerId::random(), now - 8));
|
cache.insert(tx1, create_file_2(tx1, PeerId::random(), now - 8));
|
||||||
cache.insert(create_file_2(tx1, PeerId::random(), now - 9));
|
cache.insert(tx1, create_file_2(tx1, PeerId::random(), now - 9));
|
||||||
assert_eq!(cache.total_announcements, 3);
|
assert_eq!(cache.total_announcements, 3);
|
||||||
|
|
||||||
// insert more files and cause to max entries limited
|
// insert more files and cause to max entries limited
|
||||||
let tx2 = TxID::random_hash(2);
|
let tx2 = TxID::random_hash(2);
|
||||||
cache.insert(create_file_2(tx2, PeerId::random(), now - 1));
|
cache.insert(tx2, create_file_2(tx2, PeerId::random(), now - 1));
|
||||||
assert_all_files(cache.all(tx1).unwrap_or_default(), vec![now - 8, now - 7]);
|
assert_all_files(cache.all(tx1).unwrap_or_default(), vec![now - 8, now - 7]);
|
||||||
cache.insert(create_file_2(tx2, PeerId::random(), now - 2));
|
cache.insert(tx2, create_file_2(tx2, PeerId::random(), now - 2));
|
||||||
assert_all_files(cache.all(tx1).unwrap_or_default(), vec![now - 7]);
|
assert_all_files(cache.all(tx1).unwrap_or_default(), vec![now - 7]);
|
||||||
cache.insert(create_file_2(tx2, PeerId::random(), now - 3));
|
cache.insert(tx2, create_file_2(tx2, PeerId::random(), now - 3));
|
||||||
assert_all_files(cache.all(tx1).unwrap_or_default(), vec![]);
|
assert_all_files(cache.all(tx1).unwrap_or_default(), vec![]);
|
||||||
|
|
||||||
assert_all_files(
|
assert_all_files(
|
||||||
|
@ -16,7 +16,7 @@ pub struct Config {
|
|||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Config {
|
Config {
|
||||||
max_entries_total: 4096,
|
max_entries_total: 256000,
|
||||||
max_entries_per_file: 4,
|
max_entries_per_file: 4,
|
||||||
entry_expiration_time_secs: 3600,
|
entry_expiration_time_secs: 3600,
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ impl AnnounceFileBuilder {
|
|||||||
let timestamp = self.timestamp.unwrap_or_else(timestamp_now);
|
let timestamp = self.timestamp.unwrap_or_else(timestamp_now);
|
||||||
|
|
||||||
let msg = AnnounceFile {
|
let msg = AnnounceFile {
|
||||||
tx_id,
|
tx_ids: vec![tx_id],
|
||||||
num_shard: 1,
|
num_shard: 1,
|
||||||
shard_id: 0,
|
shard_id: 0,
|
||||||
peer_id: peer_id.into(),
|
peer_id: peer_id.into(),
|
||||||
|
@ -95,6 +95,8 @@ pub use peer_manager::{
|
|||||||
};
|
};
|
||||||
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
|
||||||
|
|
||||||
|
pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0];
|
||||||
|
|
||||||
/// Application level requests sent to the network.
|
/// Application level requests sent to the network.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub enum RequestId {
|
pub enum RequestId {
|
||||||
|
@ -130,7 +130,7 @@ pub struct FindChunks {
|
|||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
|
||||||
pub struct AnnounceFile {
|
pub struct AnnounceFile {
|
||||||
pub tx_id: TxID,
|
pub tx_ids: Vec<TxID>,
|
||||||
pub num_shard: usize,
|
pub num_shard: usize,
|
||||||
pub shard_id: usize,
|
pub shard_id: usize,
|
||||||
pub peer_id: WrappedPeerId,
|
pub peer_id: WrappedPeerId,
|
||||||
@ -200,12 +200,14 @@ pub type SignedAnnounceFile = SignedMessage<AnnounceFile>;
|
|||||||
pub type SignedAnnounceShardConfig = SignedMessage<AnnounceShardConfig>;
|
pub type SignedAnnounceShardConfig = SignedMessage<AnnounceShardConfig>;
|
||||||
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;
|
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;
|
||||||
|
|
||||||
|
type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum PubsubMessage {
|
pub enum PubsubMessage {
|
||||||
ExampleMessage(u64),
|
ExampleMessage(u64),
|
||||||
FindFile(FindFile),
|
FindFile(FindFile),
|
||||||
FindChunks(FindChunks),
|
FindChunks(FindChunks),
|
||||||
AnnounceFile(SignedAnnounceFile),
|
AnnounceFile(Vec<SignedAnnounceFile>),
|
||||||
AnnounceShardConfig(SignedAnnounceShardConfig),
|
AnnounceShardConfig(SignedAnnounceShardConfig),
|
||||||
AnnounceChunks(SignedAnnounceChunks),
|
AnnounceChunks(SignedAnnounceChunks),
|
||||||
}
|
}
|
||||||
@ -314,7 +316,8 @@ impl PubsubMessage {
|
|||||||
FindChunks::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
FindChunks::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
||||||
)),
|
)),
|
||||||
GossipKind::AnnounceFile => Ok(PubsubMessage::AnnounceFile(
|
GossipKind::AnnounceFile => Ok(PubsubMessage::AnnounceFile(
|
||||||
SignedAnnounceFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
|
SignedAnnounceFiles::from_ssz_bytes(data)
|
||||||
|
.map_err(|e| format!("{:?}", e))?,
|
||||||
)),
|
)),
|
||||||
GossipKind::AnnounceChunks => Ok(PubsubMessage::AnnounceChunks(
|
GossipKind::AnnounceChunks => Ok(PubsubMessage::AnnounceChunks(
|
||||||
SignedAnnounceChunks::from_ssz_bytes(data)
|
SignedAnnounceChunks::from_ssz_bytes(data)
|
||||||
|
117
node/router/src/batcher.rs
Normal file
117
node/router/src/batcher.rs
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
use std::{
|
||||||
|
collections::VecDeque,
|
||||||
|
sync::Arc,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::metrics::{Histogram, Sample};
|
||||||
|
|
||||||
|
/// `Batcher` is used to handle data in batch, when `capacity` or `timeout` matches.
|
||||||
|
pub(crate) struct Batcher<T> {
|
||||||
|
items: VecDeque<T>,
|
||||||
|
earliest_time: Option<Instant>,
|
||||||
|
capacity: usize,
|
||||||
|
timeout: Duration,
|
||||||
|
metrics_batch_size: Arc<dyn Histogram>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Batcher<T> {
|
||||||
|
pub fn new(capacity: usize, timeout: Duration, name: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
items: VecDeque::with_capacity(capacity),
|
||||||
|
earliest_time: None,
|
||||||
|
capacity,
|
||||||
|
timeout,
|
||||||
|
metrics_batch_size: Sample::ExpDecay(0.015).register_with_group(
|
||||||
|
"router_batcher_size",
|
||||||
|
name,
|
||||||
|
1024,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_all(&mut self) -> Option<Vec<T>> {
|
||||||
|
let size = self.items.len();
|
||||||
|
if size == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.metrics_batch_size.update(size as u64);
|
||||||
|
self.earliest_time = None;
|
||||||
|
|
||||||
|
Some(Vec::from_iter(self.items.split_off(0).into_iter().rev()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add(&mut self, value: T) -> Option<Vec<T>> {
|
||||||
|
self.add_with_time(value, Instant::now())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_with_time(&mut self, value: T, now: Instant) -> Option<Vec<T>> {
|
||||||
|
// push at front so as to use `split_off` to remove expired items
|
||||||
|
self.items.push_front(value);
|
||||||
|
if self.earliest_time.is_none() {
|
||||||
|
self.earliest_time = Some(now);
|
||||||
|
}
|
||||||
|
|
||||||
|
// cache if not full
|
||||||
|
let size = self.items.len();
|
||||||
|
if size < self.capacity {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// cache is full
|
||||||
|
self.remove_all()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn expire(&mut self) -> Option<Vec<T>> {
|
||||||
|
self.expire_with_time(Instant::now())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn expire_with_time(&mut self, now: Instant) -> Option<Vec<T>> {
|
||||||
|
if now.duration_since(self.earliest_time?) < self.timeout {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
self.remove_all()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use super::Batcher;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add() {
|
||||||
|
let mut batcher: Batcher<usize> = Batcher::new(3, Duration::from_secs(10), "test");
|
||||||
|
|
||||||
|
assert_eq!(batcher.add(1), None);
|
||||||
|
assert_eq!(batcher.add(2), None);
|
||||||
|
assert_eq!(batcher.add(3), Some(vec![1, 2, 3]));
|
||||||
|
assert_eq!(batcher.items.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_expire() {
|
||||||
|
let mut batcher: Batcher<usize> = Batcher::new(5, Duration::from_secs(10), "test");
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
// enqueue: 1, 2, 3, 4
|
||||||
|
assert_eq!(batcher.add_with_time(1, now + Duration::from_secs(1)), None);
|
||||||
|
assert_eq!(batcher.add_with_time(2, now + Duration::from_secs(2)), None);
|
||||||
|
assert_eq!(batcher.add_with_time(3, now + Duration::from_secs(4)), None);
|
||||||
|
assert_eq!(batcher.add_with_time(4, now + Duration::from_secs(5)), None);
|
||||||
|
|
||||||
|
// expire None
|
||||||
|
assert_eq!(batcher.expire_with_time(now + Duration::from_secs(6)), None);
|
||||||
|
|
||||||
|
// expire all
|
||||||
|
assert_eq!(
|
||||||
|
batcher.expire_with_time(now + Duration::from_secs(13)),
|
||||||
|
Some(vec![1, 2, 3, 4])
|
||||||
|
);
|
||||||
|
assert_eq!(batcher.items.len(), 0);
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
|
mod batcher;
|
||||||
mod libp2p_event_handler;
|
mod libp2p_event_handler;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
mod peer_manager;
|
mod peer_manager;
|
||||||
@ -25,6 +26,15 @@ pub struct Config {
|
|||||||
pub libp2p_nodes: Vec<Multiaddr>,
|
pub libp2p_nodes: Vec<Multiaddr>,
|
||||||
pub private_ip_enabled: bool,
|
pub private_ip_enabled: bool,
|
||||||
pub check_announced_ip: bool,
|
pub check_announced_ip: bool,
|
||||||
|
|
||||||
|
// batcher
|
||||||
|
/// Timeout to publish messages in batch
|
||||||
|
#[serde(deserialize_with = "deserialize_duration")]
|
||||||
|
pub batcher_timeout: Duration,
|
||||||
|
/// Number of files in an announcement
|
||||||
|
pub batcher_file_capacity: usize,
|
||||||
|
/// Number of announcements in a pubsub message
|
||||||
|
pub batcher_announcement_capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
@ -37,6 +47,10 @@ impl Default for Config {
|
|||||||
libp2p_nodes: vec![],
|
libp2p_nodes: vec![],
|
||||||
private_ip_enabled: false,
|
private_ip_enabled: false,
|
||||||
check_announced_ip: false,
|
check_announced_ip: false,
|
||||||
|
|
||||||
|
batcher_timeout: Duration::from_secs(1),
|
||||||
|
batcher_file_capacity: 1,
|
||||||
|
batcher_announcement_capacity: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ use sync::{SyncMessage, SyncSender};
|
|||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio::sync::{mpsc, RwLock};
|
use tokio::sync::{mpsc, RwLock};
|
||||||
|
|
||||||
|
use crate::batcher::Batcher;
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::peer_manager::PeerManager;
|
use crate::peer_manager::PeerManager;
|
||||||
use crate::Config;
|
use crate::Config;
|
||||||
@ -94,6 +95,10 @@ pub struct Libp2pEventHandler {
|
|||||||
file_location_cache: Arc<FileLocationCache>,
|
file_location_cache: Arc<FileLocationCache>,
|
||||||
/// All connected peers.
|
/// All connected peers.
|
||||||
peers: Arc<RwLock<PeerManager>>,
|
peers: Arc<RwLock<PeerManager>>,
|
||||||
|
/// Files to announce in batch
|
||||||
|
file_batcher: RwLock<Batcher<TxID>>,
|
||||||
|
/// Announcements to publish in batch
|
||||||
|
announcement_batcher: RwLock<Batcher<SignedAnnounceFile>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Libp2pEventHandler {
|
impl Libp2pEventHandler {
|
||||||
@ -109,6 +114,18 @@ impl Libp2pEventHandler {
|
|||||||
file_location_cache: Arc<FileLocationCache>,
|
file_location_cache: Arc<FileLocationCache>,
|
||||||
peers: Arc<RwLock<PeerManager>>,
|
peers: Arc<RwLock<PeerManager>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let file_batcher = RwLock::new(Batcher::new(
|
||||||
|
config.batcher_file_capacity,
|
||||||
|
config.batcher_timeout,
|
||||||
|
"file",
|
||||||
|
));
|
||||||
|
|
||||||
|
let announcement_batcher = RwLock::new(Batcher::new(
|
||||||
|
config.batcher_announcement_capacity,
|
||||||
|
config.batcher_timeout,
|
||||||
|
"announcement",
|
||||||
|
));
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
config,
|
config,
|
||||||
network_globals,
|
network_globals,
|
||||||
@ -119,6 +136,8 @@ impl Libp2pEventHandler {
|
|||||||
store,
|
store,
|
||||||
file_location_cache,
|
file_location_cache,
|
||||||
peers,
|
peers,
|
||||||
|
file_batcher,
|
||||||
|
announcement_batcher,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,7 +209,7 @@ impl Libp2pEventHandler {
|
|||||||
match request {
|
match request {
|
||||||
Request::Status(status) => {
|
Request::Status(status) => {
|
||||||
self.on_status_request(peer_id, request_id, status);
|
self.on_status_request(peer_id, request_id, status);
|
||||||
metrics::LIBP2P_HANDLE_REQUEST_STATUS.mark(1);
|
metrics::LIBP2P_HANDLE_STATUS_REQUEST.mark(1);
|
||||||
}
|
}
|
||||||
Request::GetChunks(request) => {
|
Request::GetChunks(request) => {
|
||||||
self.send_to_sync(SyncMessage::RequestChunks {
|
self.send_to_sync(SyncMessage::RequestChunks {
|
||||||
@ -198,7 +217,7 @@ impl Libp2pEventHandler {
|
|||||||
request_id,
|
request_id,
|
||||||
request,
|
request,
|
||||||
});
|
});
|
||||||
metrics::LIBP2P_HANDLE_REQUEST_GET_CHUNKS.mark(1);
|
metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1);
|
||||||
}
|
}
|
||||||
Request::DataByHash(_) => {
|
Request::DataByHash(_) => {
|
||||||
// ignore
|
// ignore
|
||||||
@ -241,8 +260,8 @@ impl Libp2pEventHandler {
|
|||||||
debug!(%peer_id, ?status_message, "Received Status response");
|
debug!(%peer_id, ?status_message, "Received Status response");
|
||||||
match request_id {
|
match request_id {
|
||||||
RequestId::Router(since) => {
|
RequestId::Router(since) => {
|
||||||
metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
|
metrics::LIBP2P_HANDLE_STATUS_RESPONSE.mark(1);
|
||||||
metrics::LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY.update_since(since);
|
metrics::LIBP2P_HANDLE_STATUS_RESPONSE_LATENCY.update_since(since);
|
||||||
}
|
}
|
||||||
_ => unreachable!("All status response belong to router"),
|
_ => unreachable!("All status response belong to router"),
|
||||||
}
|
}
|
||||||
@ -251,8 +270,8 @@ impl Libp2pEventHandler {
|
|||||||
Response::Chunks(response) => {
|
Response::Chunks(response) => {
|
||||||
let request_id = match request_id {
|
let request_id = match request_id {
|
||||||
RequestId::Sync(since, sync_id) => {
|
RequestId::Sync(since, sync_id) => {
|
||||||
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
|
metrics::LIBP2P_HANDLE_GET_CHUNKS_RESPONSE.mark(1);
|
||||||
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY.update_since(since);
|
metrics::LIBP2P_HANDLE_GET_CHUNKS_RESPONSE_LATENCY.update_since(since);
|
||||||
sync_id
|
sync_id
|
||||||
}
|
}
|
||||||
_ => unreachable!("All Chunks responses belong to sync"),
|
_ => unreachable!("All Chunks responses belong to sync"),
|
||||||
@ -305,9 +324,18 @@ impl Libp2pEventHandler {
|
|||||||
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
|
||||||
self.on_find_chunks(msg).await
|
self.on_find_chunks(msg).await
|
||||||
}
|
}
|
||||||
PubsubMessage::AnnounceFile(msg) => {
|
PubsubMessage::AnnounceFile(msgs) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1);
|
||||||
self.on_announce_file(propagation_source, msg)
|
|
||||||
|
for msg in msgs {
|
||||||
|
match self.on_announce_file(propagation_source, msg) {
|
||||||
|
MessageAcceptance::Reject => return MessageAcceptance::Reject,
|
||||||
|
MessageAcceptance::Ignore => return MessageAcceptance::Ignore,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageAcceptance::Accept
|
||||||
}
|
}
|
||||||
PubsubMessage::AnnounceChunks(msg) => {
|
PubsubMessage::AnnounceChunks(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS.mark(1);
|
||||||
@ -382,7 +410,14 @@ impl Libp2pEventHandler {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn construct_announce_file_message(&self, tx_id: TxID) -> Option<PubsubMessage> {
|
pub async fn construct_announce_file_message(
|
||||||
|
&self,
|
||||||
|
tx_ids: Vec<TxID>,
|
||||||
|
) -> Option<SignedAnnounceFile> {
|
||||||
|
if tx_ids.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
let peer_id = *self.network_globals.peer_id.read();
|
let peer_id = *self.network_globals.peer_id.read();
|
||||||
|
|
||||||
let addr = self.get_listen_addr_or_add().await?;
|
let addr = self.get_listen_addr_or_add().await?;
|
||||||
@ -391,7 +426,7 @@ impl Libp2pEventHandler {
|
|||||||
let shard_config = self.store.get_store().flow().get_shard_config();
|
let shard_config = self.store.get_store().flow().get_shard_config();
|
||||||
|
|
||||||
let msg = AnnounceFile {
|
let msg = AnnounceFile {
|
||||||
tx_id,
|
tx_ids,
|
||||||
num_shard: shard_config.num_shard,
|
num_shard: shard_config.num_shard,
|
||||||
shard_id: shard_config.shard_id,
|
shard_id: shard_config.shard_id,
|
||||||
peer_id: peer_id.into(),
|
peer_id: peer_id.into(),
|
||||||
@ -402,14 +437,14 @@ impl Libp2pEventHandler {
|
|||||||
let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
|
let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
|
||||||
Ok(signed) => signed,
|
Ok(signed) => signed,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(%tx_id.seq, %e, "Failed to sign AnnounceFile message");
|
error!(%e, "Failed to sign AnnounceFile message");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
signed.resend_timestamp = timestamp;
|
signed.resend_timestamp = timestamp;
|
||||||
|
|
||||||
Some(PubsubMessage::AnnounceFile(signed))
|
Some(signed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn construct_announce_shard_config_message(
|
pub async fn construct_announce_shard_config_message(
|
||||||
@ -447,10 +482,11 @@ impl Libp2pEventHandler {
|
|||||||
// verify timestamp
|
// verify timestamp
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
timestamp,
|
timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_FILE.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
||||||
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -460,14 +496,10 @@ impl Libp2pEventHandler {
|
|||||||
if tx.id() == tx_id {
|
if tx.id() == tx_id {
|
||||||
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
trace!(?tx_id, "Found file locally, responding to FindFile query");
|
||||||
|
|
||||||
return match self.construct_announce_file_message(tx_id).await {
|
if self.publish_file(tx_id).await.is_some() {
|
||||||
Some(msg) => {
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1);
|
||||||
self.publish(msg);
|
return MessageAcceptance::Ignore;
|
||||||
MessageAcceptance::Ignore
|
}
|
||||||
}
|
|
||||||
// propagate FindFile query to other nodes
|
|
||||||
None => MessageAcceptance::Accept,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -477,12 +509,15 @@ impl Libp2pEventHandler {
|
|||||||
trace!(?tx_id, "Found file in cache, responding to FindFile query");
|
trace!(?tx_id, "Found file in cache, responding to FindFile query");
|
||||||
|
|
||||||
msg.resend_timestamp = timestamp_now();
|
msg.resend_timestamp = timestamp_now();
|
||||||
self.publish(PubsubMessage::AnnounceFile(msg));
|
self.publish_announcement(msg).await;
|
||||||
|
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_CACHE.mark(1);
|
||||||
|
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
|
|
||||||
// propagate FindFile query to other nodes
|
// propagate FindFile query to other nodes
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1);
|
||||||
MessageAcceptance::Accept
|
MessageAcceptance::Accept
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -528,7 +563,7 @@ impl Libp2pEventHandler {
|
|||||||
// verify timestamp
|
// verify timestamp
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
msg.timestamp,
|
msg.timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_CHUNKS.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
|
||||||
debug!(%msg.timestamp, ?d, "Invalid timestamp, ignoring FindChunks message");
|
debug!(%msg.timestamp, ?d, "Invalid timestamp, ignoring FindChunks message");
|
||||||
@ -624,6 +659,9 @@ impl Libp2pEventHandler {
|
|||||||
propagation_source: PeerId,
|
propagation_source: PeerId,
|
||||||
msg: SignedAnnounceFile,
|
msg: SignedAnnounceFile,
|
||||||
) -> MessageAcceptance {
|
) -> MessageAcceptance {
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_ANNOUNCEMENTS.mark(1);
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_FILES.mark(msg.tx_ids.len());
|
||||||
|
|
||||||
// verify message signature
|
// verify message signature
|
||||||
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
|
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
|
||||||
return MessageAcceptance::Reject;
|
return MessageAcceptance::Reject;
|
||||||
@ -646,19 +684,22 @@ impl Libp2pEventHandler {
|
|||||||
// propagate gossip to peers
|
// propagate gossip to peers
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
msg.resend_timestamp,
|
msg.resend_timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_FILE.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
|
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
|
||||||
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceFile message");
|
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceFile message");
|
||||||
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT.mark(1);
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify sync layer
|
// notify sync layer
|
||||||
self.send_to_sync(SyncMessage::AnnounceFileGossip {
|
for tx_id in msg.tx_ids.iter() {
|
||||||
tx_id: msg.tx_id,
|
self.send_to_sync(SyncMessage::AnnounceFileGossip {
|
||||||
peer_id: msg.peer_id.clone().into(),
|
tx_id: *tx_id,
|
||||||
addr,
|
peer_id: msg.peer_id.clone().into(),
|
||||||
});
|
addr: addr.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// insert message to cache
|
// insert message to cache
|
||||||
self.file_location_cache.insert(msg);
|
self.file_location_cache.insert(msg);
|
||||||
@ -693,7 +734,7 @@ impl Libp2pEventHandler {
|
|||||||
// propagate gossip to peers
|
// propagate gossip to peers
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
msg.resend_timestamp,
|
msg.resend_timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_SHARD.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
|
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
|
||||||
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
|
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
|
||||||
@ -745,7 +786,7 @@ impl Libp2pEventHandler {
|
|||||||
// propagate gossip to peers
|
// propagate gossip to peers
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
msg.resend_timestamp,
|
msg.resend_timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_CHUNKS.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
|
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
|
||||||
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceChunks message");
|
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceChunks message");
|
||||||
@ -774,6 +815,39 @@ impl Libp2pEventHandler {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
||||||
|
match self.file_batcher.write().await.add(tx_id) {
|
||||||
|
Some(batch) => {
|
||||||
|
let announcement = self.construct_announce_file_message(batch).await?;
|
||||||
|
Some(self.publish_announcement(announcement).await)
|
||||||
|
}
|
||||||
|
None => Some(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn publish_announcement(&self, announcement: SignedAnnounceFile) -> bool {
|
||||||
|
match self.announcement_batcher.write().await.add(announcement) {
|
||||||
|
Some(batch) => {
|
||||||
|
self.publish(PubsubMessage::AnnounceFile(batch));
|
||||||
|
true
|
||||||
|
}
|
||||||
|
None => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish expired file announcements.
|
||||||
|
pub async fn expire_batcher(&self) {
|
||||||
|
if let Some(batch) = self.file_batcher.write().await.expire() {
|
||||||
|
if let Some(announcement) = self.construct_announce_file_message(batch).await {
|
||||||
|
self.publish_announcement(announcement).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(batch) = self.announcement_batcher.write().await.expire() {
|
||||||
|
self.publish(PubsubMessage::AnnounceFile(batch));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -895,7 +969,7 @@ mod tests {
|
|||||||
Ok(NetworkMessage::Publish { messages }) => {
|
Ok(NetworkMessage::Publish { messages }) => {
|
||||||
assert_eq!(messages.len(), 1);
|
assert_eq!(messages.len(), 1);
|
||||||
assert!(
|
assert!(
|
||||||
matches!(&messages[0], PubsubMessage::AnnounceFile(file) if file.tx_id == expected_tx_id)
|
matches!(&messages[0], PubsubMessage::AnnounceFile(files) if files[0].tx_ids[0] == expected_tx_id)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(_) => panic!("Unexpected network message type received"),
|
Ok(_) => panic!("Unexpected network message type received"),
|
||||||
@ -1185,18 +1259,13 @@ mod tests {
|
|||||||
let tx_id = TxID::random_hash(412);
|
let tx_id = TxID::random_hash(412);
|
||||||
|
|
||||||
// change signed message
|
// change signed message
|
||||||
let message = match handler
|
let mut file = handler
|
||||||
.construct_announce_file_message(tx_id)
|
.construct_announce_file_message(vec![tx_id])
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap();
|
||||||
{
|
let malicious_addr: Multiaddr = "/ip4/127.0.0.38/tcp/30000".parse().unwrap();
|
||||||
PubsubMessage::AnnounceFile(mut file) => {
|
file.inner.at = malicious_addr.into();
|
||||||
let malicious_addr: Multiaddr = "/ip4/127.0.0.38/tcp/30000".parse().unwrap();
|
let message = PubsubMessage::AnnounceFile(vec![file]);
|
||||||
file.inner.at = malicious_addr.into();
|
|
||||||
PubsubMessage::AnnounceFile(file)
|
|
||||||
}
|
|
||||||
_ => panic!("Unexpected pubsub message type"),
|
|
||||||
};
|
|
||||||
|
|
||||||
// failed to verify signature
|
// failed to verify signature
|
||||||
let result = handler.on_pubsub_message(alice, bob, &id, message).await;
|
let result = handler.on_pubsub_message(alice, bob, &id, message).await;
|
||||||
@ -1212,7 +1281,11 @@ mod tests {
|
|||||||
let (alice, bob) = (PeerId::random(), PeerId::random());
|
let (alice, bob) = (PeerId::random(), PeerId::random());
|
||||||
let id = MessageId::new(b"dummy message");
|
let id = MessageId::new(b"dummy message");
|
||||||
let tx = TxID::random_hash(412);
|
let tx = TxID::random_hash(412);
|
||||||
let message = handler.construct_announce_file_message(tx).await.unwrap();
|
let message = handler
|
||||||
|
.construct_announce_file_message(vec![tx])
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let message = PubsubMessage::AnnounceFile(vec![message]);
|
||||||
|
|
||||||
// succeeded to handle
|
// succeeded to handle
|
||||||
let result = handler.on_pubsub_message(alice, bob, &id, message).await;
|
let result = handler.on_pubsub_message(alice, bob, &id, message).await;
|
||||||
|
@ -23,32 +23,55 @@ lazy_static::lazy_static! {
|
|||||||
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024);
|
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024);
|
||||||
|
|
||||||
// libp2p_event_handler
|
// libp2p_event_handler
|
||||||
pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
|
|
||||||
|
|
||||||
|
// libp2p_event_handler: peer connection
|
||||||
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing");
|
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing");
|
||||||
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming");
|
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming");
|
||||||
pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
|
pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
|
||||||
pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status");
|
|
||||||
pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks");
|
// libp2p_event_handler: status
|
||||||
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_status", "qps");
|
pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
|
||||||
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_status", "latency", 1024);
|
pub static ref LIBP2P_HANDLE_STATUS_REQUEST: Arc<dyn Meter> = register_meter("router_libp2p_handle_status_request");
|
||||||
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_get_chunks", "qps");
|
pub static ref LIBP2P_HANDLE_STATUS_RESPONSE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_status_response", "qps");
|
||||||
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_get_chunks", "latency", 1024);
|
pub static ref LIBP2P_HANDLE_STATUS_RESPONSE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_status_response", "latency", 1024);
|
||||||
|
|
||||||
|
// libp2p_event_handler: get chunks
|
||||||
|
pub static ref LIBP2P_HANDLE_GET_CHUNKS_REQUEST: Arc<dyn Meter> = register_meter("router_libp2p_handle_get_chunks_request");
|
||||||
|
pub static ref LIBP2P_HANDLE_GET_CHUNKS_RESPONSE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_get_chunks_response", "qps");
|
||||||
|
pub static ref LIBP2P_HANDLE_GET_CHUNKS_RESPONSE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_get_chunks_response", "latency", 1024);
|
||||||
|
|
||||||
|
// libp2p_event_handler: rpc errors
|
||||||
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
|
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
|
||||||
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
|
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file");
|
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks");
|
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file");
|
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_chunks");
|
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_shard");
|
|
||||||
|
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_FILE: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_find_file", 1024);
|
// libp2p_event_handler: find & announce file
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_CHUNKS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_find_chunks", 1024);
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps");
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_FILE: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_file", 1024);
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024);
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_CHUNKS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_chunks", 1024);
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "timeout");
|
||||||
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_SHARD: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_shard", 1024);
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "store");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_CACHE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "cache");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "forward");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "qps");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_file", "latency", 1024);
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "timeout");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_ANNOUNCEMENTS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "announcements");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_FILES: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "files");
|
||||||
|
|
||||||
|
// libp2p_event_handler: find & announce chunks
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_chunks", "qps");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_chunks", "latency", 1024);
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_announce_chunks", "qps");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_chunks", "latency", 1024);
|
||||||
|
|
||||||
|
// libp2p_event_handler: announce shard config
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_announce_shard", "qps");
|
||||||
|
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_shard", "latency", 1024);
|
||||||
|
|
||||||
|
// libp2p_event_handler: verify IP address
|
||||||
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip");
|
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip");
|
||||||
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_UNSEEN: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip_unseen");
|
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_UNSEEN: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip_unseen");
|
||||||
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_MISMATCH: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip_mismatch");
|
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_MISMATCH: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip_mismatch");
|
||||||
|
|
||||||
|
// batcher
|
||||||
|
pub static ref BATCHER_ANNOUNCE_FILE_SIZE: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_batcher_announce_file_size", 1024);
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,8 @@ impl RouterService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
|
async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
|
||||||
let mut heartbeat = interval(self.config.heartbeat_interval);
|
let mut heartbeat_service = interval(self.config.heartbeat_interval);
|
||||||
|
let mut heartbeat_batcher = interval(self.config.batcher_timeout);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@ -107,8 +108,11 @@ impl RouterService {
|
|||||||
|
|
||||||
Some(msg) = Self::try_recv(&mut self.pruner_recv) => self.on_pruner_msg(msg).await,
|
Some(msg) = Self::try_recv(&mut self.pruner_recv) => self.on_pruner_msg(msg).await,
|
||||||
|
|
||||||
// heartbeat
|
// heartbeat for service
|
||||||
_ = heartbeat.tick() => self.on_heartbeat().await,
|
_ = heartbeat_service.tick() => self.on_heartbeat().await,
|
||||||
|
|
||||||
|
// heartbeat for expire file batcher
|
||||||
|
_ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -324,12 +328,12 @@ impl RouterService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
||||||
if let Some(msg) = self
|
if self
|
||||||
.libp2p_event_handler
|
.libp2p_event_handler
|
||||||
.construct_announce_file_message(tx_id)
|
.publish_file(tx_id)
|
||||||
.await
|
.await
|
||||||
|
.is_some()
|
||||||
{
|
{
|
||||||
self.libp2p_event_handler.publish(msg);
|
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -376,4 +376,16 @@ pub struct NetworkIdentity {
|
|||||||
|
|
||||||
/// The address of the deployed Flow contract on the blockchain.
|
/// The address of the deployed Flow contract on the blockchain.
|
||||||
pub flow_address: Address,
|
pub flow_address: Address,
|
||||||
|
|
||||||
|
/// P2P network protocol version.
|
||||||
|
pub p2p_protocol_version: ProtocolVersion,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(
|
||||||
|
DeriveEncode, DeriveDecode, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize,
|
||||||
|
)]
|
||||||
|
pub struct ProtocolVersion {
|
||||||
|
pub major: u8,
|
||||||
|
pub minor: u8,
|
||||||
|
pub build: u8,
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ use miner::MinerConfig;
|
|||||||
use network::NetworkConfig;
|
use network::NetworkConfig;
|
||||||
use pruner::PrunerConfig;
|
use pruner::PrunerConfig;
|
||||||
use rpc::RPCConfig;
|
use rpc::RPCConfig;
|
||||||
use shared_types::NetworkIdentity;
|
use shared_types::{NetworkIdentity, ProtocolVersion};
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
@ -41,6 +41,11 @@ impl ZgsConfig {
|
|||||||
network_config.network_id = NetworkIdentity {
|
network_config.network_id = NetworkIdentity {
|
||||||
chain_id,
|
chain_id,
|
||||||
flow_address,
|
flow_address,
|
||||||
|
p2p_protocol_version: ProtocolVersion {
|
||||||
|
major: network::PROTOCOL_VERSION[0],
|
||||||
|
minor: network::PROTOCOL_VERSION[1],
|
||||||
|
build: network::PROTOCOL_VERSION[2],
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
if !self.network_disable_discovery {
|
if !self.network_disable_discovery {
|
||||||
|
@ -218,6 +218,21 @@ reward_contract_address = "0x0496D0817BD8519e0de4894Dc379D35c35275609"
|
|||||||
#
|
#
|
||||||
# prune_batch_wait_time_ms = 1000
|
# prune_batch_wait_time_ms = 1000
|
||||||
|
|
||||||
|
#######################################################################
|
||||||
|
### Router Config Options ###
|
||||||
|
#######################################################################
|
||||||
|
|
||||||
|
[router]
|
||||||
|
|
||||||
|
# Timeout to publish file announcements in batch.
|
||||||
|
# batcher_timeout = "1s"
|
||||||
|
|
||||||
|
# Number of files in an announcement to publish in batch.
|
||||||
|
batcher_file_capacity = 10
|
||||||
|
|
||||||
|
# Number of announcements in a pubsub message to publish in batch.
|
||||||
|
batcher_announcement_capacity = 100
|
||||||
|
|
||||||
#######################################################################
|
#######################################################################
|
||||||
### File Sync Config Options ###
|
### File Sync Config Options ###
|
||||||
#######################################################################
|
#######################################################################
|
||||||
@ -272,7 +287,7 @@ auto_sync_enabled = true
|
|||||||
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
||||||
|
|
||||||
# Global cache capacity.
|
# Global cache capacity.
|
||||||
# max_entries_total = 4096
|
# max_entries_total = 256000
|
||||||
|
|
||||||
# Location information capacity for each file.
|
# Location information capacity for each file.
|
||||||
# max_entries_per_file = 4
|
# max_entries_per_file = 4
|
||||||
|
@ -230,6 +230,21 @@ reward_contract_address = "0x51998C4d486F406a788B766d93510980ae1f9360"
|
|||||||
#
|
#
|
||||||
# prune_batch_wait_time_ms = 1000
|
# prune_batch_wait_time_ms = 1000
|
||||||
|
|
||||||
|
#######################################################################
|
||||||
|
### Router Config Options ###
|
||||||
|
#######################################################################
|
||||||
|
|
||||||
|
[router]
|
||||||
|
|
||||||
|
# Timeout to publish file announcements in batch.
|
||||||
|
# batcher_timeout = "1s"
|
||||||
|
|
||||||
|
# Number of files in an announcement to publish in batch.
|
||||||
|
batcher_file_capacity = 10
|
||||||
|
|
||||||
|
# Number of announcements in a pubsub message to publish in batch.
|
||||||
|
batcher_announcement_capacity = 100
|
||||||
|
|
||||||
#######################################################################
|
#######################################################################
|
||||||
### File Sync Config Options ###
|
### File Sync Config Options ###
|
||||||
#######################################################################
|
#######################################################################
|
||||||
@ -284,7 +299,7 @@ auto_sync_enabled = true
|
|||||||
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
||||||
|
|
||||||
# Global cache capacity.
|
# Global cache capacity.
|
||||||
# max_entries_total = 4096
|
# max_entries_total = 256000
|
||||||
|
|
||||||
# Location information capacity for each file.
|
# Location information capacity for each file.
|
||||||
# max_entries_per_file = 4
|
# max_entries_per_file = 4
|
||||||
|
@ -232,6 +232,21 @@
|
|||||||
#
|
#
|
||||||
# prune_batch_wait_time_ms = 1000
|
# prune_batch_wait_time_ms = 1000
|
||||||
|
|
||||||
|
#######################################################################
|
||||||
|
### Router Config Options ###
|
||||||
|
#######################################################################
|
||||||
|
|
||||||
|
# [router]
|
||||||
|
|
||||||
|
# Timeout to publish file announcements in batch.
|
||||||
|
# batcher_timeout = "1s"
|
||||||
|
|
||||||
|
# Number of files in an announcement to publish in batch.
|
||||||
|
# batcher_file_capacity = 1
|
||||||
|
|
||||||
|
# Number of announcements in a pubsub message to publish in batch.
|
||||||
|
# batcher_announcement_capacity = 1
|
||||||
|
|
||||||
#######################################################################
|
#######################################################################
|
||||||
### File Sync Config Options ###
|
### File Sync Config Options ###
|
||||||
#######################################################################
|
#######################################################################
|
||||||
@ -286,7 +301,7 @@
|
|||||||
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
||||||
|
|
||||||
# Global cache capacity.
|
# Global cache capacity.
|
||||||
# max_entries_total = 4096
|
# max_entries_total = 256000
|
||||||
|
|
||||||
# Location information capacity for each file.
|
# Location information capacity for each file.
|
||||||
# max_entries_per_file = 4
|
# max_entries_per_file = 4
|
||||||
|
Loading…
Reference in New Issue
Block a user