Compare commits

...

3 Commits

Author SHA1 Message Date
peilun-conflux
ae08773df5
Merge 07704fedc9 into 3fd800275a 2024-11-06 18:25:31 +08:00
Bo QIU
3fd800275a
Improve rate limit for UDP discovery (#261)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-11-06 18:25:08 +08:00
Peilun Li
07704fedc9 fix: wait for async storage to complete before receiving the result. 2024-10-30 18:51:45 +08:00
2 changed files with 17 additions and 13 deletions

View File

@ -157,8 +157,8 @@ impl Default for Config {
let filter_rate_limiter = Some( let filter_rate_limiter = Some(
discv5::RateLimiterBuilder::new() discv5::RateLimiterBuilder::new()
.total_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second .total_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second
.ip_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second .ip_n_every(9, Duration::from_secs(1)) // Allow bursts, average 9 per second
.node_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second .node_n_every(8, Duration::from_secs(1)) // Allow bursts, average 8 per second
.build() .build()
.expect("The total rate limit has been specified"), .expect("The total rate limit has been specified"),
); );

View File

@ -1,7 +1,7 @@
#[macro_use] #[macro_use]
extern crate tracing; extern crate tracing;
use anyhow::bail; use anyhow::{anyhow, bail};
use shared_types::{ use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
}; };
@ -136,17 +136,21 @@ impl Store {
let store = self.store.clone(); let store = self.store.clone();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.executor.spawn_blocking( self.executor
move || { .spawn_blocking_handle(
// FIXME(zz): Not all functions need `write`. Refactor store usage. move || {
let res = f(&*store); // FIXME(zz): Not all functions need `write`. Refactor store usage.
let res = f(&*store);
if tx.send(res).is_err() { if tx.send(res).is_err() {
error!("Unable to complete async storage operation: the receiver dropped"); error!("Unable to complete async storage operation: the receiver dropped");
} }
}, },
WORKER_TASK_NAME, WORKER_TASK_NAME,
); )
.ok_or(anyhow!("Unable to spawn async storage work"))?
.await
.map_err(|e| anyhow!("join error: e={:?}", e))?;
rx.await rx.await
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string()))) .unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))