Supports rate limit for pubsub message (#299)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled

This commit is contained in:
Bo QIU 2024-12-12 17:49:26 +08:00 committed by GitHub
parent 349e13e7fc
commit 910b5af1c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 131 additions and 23 deletions

View File

@ -33,7 +33,7 @@ mod handler;
pub mod methods;
mod outbound;
mod protocol;
mod rate_limiter;
pub mod rate_limiter;
/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}

View File

@ -54,6 +54,22 @@ pub struct Quota {
max_tokens: u64,
}
impl Quota {
pub fn one_every(period: Duration) -> Self {
Self {
replenish_all_every: period,
max_tokens: 1,
}
}
pub fn n_every(n: u64, period: Duration) -> Self {
Self {
replenish_all_every: period,
max_tokens: n,
}
}
}
/// Manages rate limiting of requests per peer, with differentiated rates per protocol.
pub struct RPCRateLimiter {
/// Interval to prune peers for which their timer ran out.
@ -122,24 +138,12 @@ impl RPCRateLimiterBuilder {
/// Allow one token every `time_period` to be used for this `protocol`.
/// This produces a hard limit.
pub fn one_every(self, protocol: Protocol, time_period: Duration) -> Self {
self.set_quota(
protocol,
Quota {
replenish_all_every: time_period,
max_tokens: 1,
},
)
self.set_quota(protocol, Quota::one_every(time_period))
}
/// Allow `n` tokens to be use used every `time_period` for this `protocol`.
pub fn n_every(self, protocol: Protocol, n: u64, time_period: Duration) -> Self {
self.set_quota(
protocol,
Quota {
max_tokens: n,
replenish_all_every: time_period,
},
)
self.set_quota(protocol, Quota::n_every(n, time_period))
}
pub fn build(self) -> Result<RPCRateLimiter, &'static str> {

View File

@ -5,6 +5,7 @@ mod batcher;
mod libp2p_event_handler;
mod metrics;
mod peer_manager;
mod rate_limit;
mod service;
use duration_str::deserialize_duration;

View File

@ -174,7 +174,7 @@ impl Libp2pEventHandler {
}
}
fn send_to_network(&self, message: NetworkMessage) {
pub fn send_to_network(&self, message: NetworkMessage) {
self.network_send.send(message).unwrap_or_else(|err| {
warn!(%err, "Could not send message to the network service");
});

View File

@ -0,0 +1,68 @@
use std::{
collections::HashMap,
time::{Duration, Instant},
};
use network::{
rpc::rate_limiter::{Limiter, Quota, RateLimitedErr},
types::GossipKind,
PeerId, PubsubMessage,
};
pub struct PubsubRateLimiter {
init_time: Instant,
limiters: Limiter<PeerId>,
limiters_by_topic: HashMap<GossipKind, Limiter<PeerId>>,
}
impl PubsubRateLimiter {
pub fn new(n: u64, period: Duration) -> Result<Self, String> {
Ok(Self {
init_time: Instant::now(),
limiters: Limiter::from_quota(Quota::n_every(n, period))?,
limiters_by_topic: Default::default(),
})
}
pub fn limit_by_topic(
mut self,
kind: GossipKind,
n: u64,
period: Duration,
) -> Result<Self, String> {
let limiter = Limiter::from_quota(Quota::n_every(n, period))?;
self.limiters_by_topic.insert(kind, limiter);
Ok(self)
}
pub fn allows(
&mut self,
peer_id: &PeerId,
msg: &PubsubMessage,
) -> Result<(), (Option<GossipKind>, RateLimitedErr)> {
let time_since_start = self.init_time.elapsed();
if let Err(err) = self.limiters.allows(time_since_start, peer_id, 1) {
return Err((None, err));
}
let kind = msg.kind();
if let Some(limiter) = self.limiters_by_topic.get_mut(&kind) {
if let Err(err) = limiter.allows(time_since_start, peer_id, 1) {
return Err((Some(kind), err));
}
}
Ok(())
}
pub fn prune(&mut self) {
let time_since_start = self.init_time.elapsed();
self.limiters.prune(time_since_start);
for limiter in self.limiters_by_topic.values_mut() {
limiter.prune(time_since_start);
}
}
}

View File

@ -1,4 +1,5 @@
use crate::metrics;
use crate::rate_limit::PubsubRateLimiter;
use crate::Config;
use crate::{libp2p_event_handler::Libp2pEventHandler, peer_manager::PeerManager};
use chunk_pool::ChunkPoolMessage;
@ -6,14 +7,16 @@ use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::rpc::GoodbyeReason;
use network::PeerId;
use network::types::GossipKind;
use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, NetworkReceiver,
NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
};
use network::{MessageAcceptance, PeerAction, PeerId, ReportSource};
use pruner::PrunerMessage;
use shared_types::ShardedFile;
use std::sync::Arc;
use std::time::Duration;
use storage::log_store::Store as LogStore;
use storage_async::Store;
use sync::{SyncMessage, SyncSender};
@ -49,6 +52,8 @@ pub struct RouterService {
upnp_mappings: (Option<u16>, Option<u16>),
store: Arc<dyn LogStore>,
pubsub_rate_limiter: PubsubRateLimiter,
}
impl RouterService {
@ -67,9 +72,19 @@ impl RouterService {
file_location_cache: Arc<FileLocationCache>,
local_keypair: Keypair,
config: Config,
) {
) -> Result<(), String> {
let peers = Arc::new(RwLock::new(PeerManager::new(config.clone())));
let pubsub_rate_limiter = PubsubRateLimiter::new(100, Duration::from_secs(10))?
.limit_by_topic(GossipKind::Example, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::NewFile, 50, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AskFile, 50, Duration::from_secs(10))?
.limit_by_topic(GossipKind::FindFile, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AnnounceFile, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::FindChunks, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AnnounceChunks, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AnnounceShardConfig, 50, Duration::from_secs(10))?;
// create the network service and spawn the task
let router = RouterService {
config: config.clone(),
@ -91,17 +106,21 @@ impl RouterService {
),
upnp_mappings: (None, None),
store,
pubsub_rate_limiter,
};
// spawn service
let shutdown_sender = executor.shutdown_sender();
executor.spawn(router.main(shutdown_sender), "router");
Ok(())
}
async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
let mut heartbeat_service = interval(self.config.heartbeat_interval);
let mut heartbeat_batcher = interval(self.config.batcher_timeout);
let mut heartbeat_rate_limiter = interval(Duration::from_secs(30));
loop {
tokio::select! {
@ -118,6 +137,8 @@ impl RouterService {
// heartbeat for expire file batcher
_ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await,
_ = heartbeat_rate_limiter.tick() => self.pubsub_rate_limiter.prune(),
}
}
}
@ -192,10 +213,24 @@ impl RouterService {
message,
..
} => {
let result = self
.libp2p_event_handler
.on_pubsub_message(propagation_source, source, &id, message)
.await;
let result = if let Err((rate_limit_kind, _)) = self
.pubsub_rate_limiter
.allows(&propagation_source, &message)
{
warn!(%propagation_source, kind=?message.kind(), ?rate_limit_kind, "Pubsub message rate limited");
self.libp2p_event_handler
.send_to_network(NetworkMessage::ReportPeer {
peer_id: propagation_source,
action: PeerAction::LowToleranceError,
source: ReportSource::Gossipsub,
msg: "Pubsub message rate limited",
});
MessageAcceptance::Reject
} else {
self.libp2p_event_handler
.on_pubsub_message(propagation_source, source, &id, message)
.await
};
self.libp2p
.swarm

View File

@ -265,7 +265,7 @@ impl ClientBuilder {
file_location_cache,
network.keypair.clone(),
router_config,
);
)?;
Ok(self)
}