handle NewFile message in router

This commit is contained in:
boqiu 2024-10-23 18:09:52 +08:00
parent d12bf66129
commit 364640adc8
3 changed files with 70 additions and 4 deletions

View File

@ -5,7 +5,7 @@ use std::{ops::Neg, sync::Arc};
use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use network::multiaddr::Protocol;
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
use network::{
rpc::StatusMessage,
types::{
@ -29,6 +29,7 @@ use crate::peer_manager::PeerManager;
use crate::Config;
lazy_static::lazy_static! {
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
@ -316,8 +317,10 @@ impl Libp2pEventHandler {
match message {
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
// TODO qbit: handle the NewFile pubsub message
PubsubMessage::NewFile(_) => todo!(),
PubsubMessage::NewFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
self.on_new_file(source, msg).await
}
PubsubMessage::FindFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
self.on_find_file(msg).await
@ -350,6 +353,59 @@ impl Libp2pEventHandler {
}
}
async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
// verify timestamp
let d = duration_since(
msg.timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *NEW_FILE_TIMEOUT {
debug!(?d, ?msg, "Invalid timestamp, ignoring NewFile message");
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT.mark(1);
self.send_to_network(NetworkMessage::ReportPeer {
peer_id: from,
action: PeerAction::LowToleranceError,
source: ReportSource::Gossipsub,
msg: "Received out of date NewFile message",
});
return MessageAcceptance::Ignore;
}
// verify announced shard config
let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
Ok(v) => v,
Err(_) => return MessageAcceptance::Reject,
};
// ignore if already exists
match self.store.check_tx_completed(msg.tx_id.seq).await {
Ok(true) => return MessageAcceptance::Ignore,
Ok(false) => {}
Err(err) => {
warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx completed");
return MessageAcceptance::Ignore;
}
}
// ignore if already pruned
match self.store.check_tx_pruned(msg.tx_id.seq).await {
Ok(true) => return MessageAcceptance::Ignore,
Ok(false) => {}
Err(err) => {
warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx pruned");
return MessageAcceptance::Ignore;
}
}
// notify sync layer if shard config matches
let my_shard_config = self.store.get_store().get_shard_config();
if my_shard_config.intersect(&announced_shard_config) {
self.send_to_sync(SyncMessage::NewFile { from, msg });
}
MessageAcceptance::Ignore
}
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
// public address configured
if let Some(ip) = self.config.public_address {

View File

@ -44,6 +44,11 @@ lazy_static::lazy_static! {
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
// libp2p_event_handler: new file
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "qps");
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_new_file", "latency", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "timeout");
// libp2p_event_handler: find & announce file
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps");
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024);

View File

@ -8,7 +8,7 @@ use anyhow::{anyhow, bail, Result};
use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent;
use network::types::{AnnounceChunks, FindFile};
use network::types::{AnnounceChunks, FindFile, NewFile};
use network::PubsubMessage;
use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
@ -70,6 +70,10 @@ pub enum SyncMessage {
AnnounceChunksGossip {
msg: AnnounceChunks,
},
NewFile {
from: PeerId,
msg: NewFile,
},
}
#[derive(Debug)]
@ -265,6 +269,7 @@ impl SyncService {
SyncMessage::AnnounceShardConfig { .. } => {
// FIXME: Check if controllers need to be reset?
}
SyncMessage::NewFile { from, msg } => todo!(),
}
}