Compare commits

...

3 Commits

Author SHA1 Message Date
peilun-conflux
992deb0979
Merge 07704fedc9 into 0c493880ee 2024-11-09 13:37:33 +08:00
Joel Liu
0c493880ee
add timeout for rpc connections (#263)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-11-09 13:37:09 +08:00
Peilun Li
07704fedc9 fix: wait for async storage to complete before receiving the result. 2024-10-30 18:51:45 +08:00
8 changed files with 48 additions and 39 deletions

2
Cargo.lock generated
View File

@ -4652,12 +4652,14 @@ dependencies = [
"jsonrpsee", "jsonrpsee",
"lazy_static", "lazy_static",
"metrics", "metrics",
"reqwest",
"serde_json", "serde_json",
"shared_types", "shared_types",
"storage", "storage",
"task_executor", "task_executor",
"thiserror", "thiserror",
"tokio", "tokio",
"url",
] ]
[[package]] [[package]]

View File

@ -24,3 +24,5 @@ futures-util = "0.3.28"
thiserror = "1.0.44" thiserror = "1.0.44"
lazy_static = "1.4.0" lazy_static = "1.4.0"
metrics = { workspace = true } metrics = { workspace = true }
reqwest = {version = "0.11", features = ["json"]}
url = { version = "2.4", default-features = false }

View File

@ -1,3 +1,5 @@
use std::time::Duration;
use crate::ContractAddress; use crate::ContractAddress;
pub struct LogSyncConfig { pub struct LogSyncConfig {
@ -34,6 +36,9 @@ pub struct LogSyncConfig {
pub watch_loop_wait_time_ms: u64, pub watch_loop_wait_time_ms: u64,
// force to sync log from start block number // force to sync log from start block number
pub force_log_sync_from_start_block_number: bool, pub force_log_sync_from_start_block_number: bool,
// the timeout for blockchain rpc connection
pub blockchain_rpc_timeout: Duration,
} }
#[derive(Clone)] #[derive(Clone)]
@ -61,6 +66,7 @@ impl LogSyncConfig {
remove_finalized_block_interval_minutes: u64, remove_finalized_block_interval_minutes: u64,
watch_loop_wait_time_ms: u64, watch_loop_wait_time_ms: u64,
force_log_sync_from_start_block_number: bool, force_log_sync_from_start_block_number: bool,
blockchain_rpc_timeout: Duration,
) -> Self { ) -> Self {
Self { Self {
rpc_endpoint_url, rpc_endpoint_url,
@ -77,6 +83,7 @@ impl LogSyncConfig {
remove_finalized_block_interval_minutes, remove_finalized_block_interval_minutes,
watch_loop_wait_time_ms, watch_loop_wait_time_ms,
force_log_sync_from_start_block_number, force_log_sync_from_start_block_number,
blockchain_rpc_timeout,
} }
} }
} }

View File

@ -1,6 +1,6 @@
use crate::sync_manager::log_query::LogQuery; use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS; use crate::sync_manager::RETRY_WAIT_MS;
use crate::ContractAddress; use crate::{ContractAddress, LogSyncConfig};
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm}; use append_merkle::{Algorithm, Sha3Algorithm};
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow}; use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
@ -12,7 +12,6 @@ use futures::StreamExt;
use jsonrpsee::tracing::{debug, error, info, warn}; use jsonrpsee::tracing::{debug, error, info, warn};
use shared_types::{DataRoot, Transaction}; use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
@ -34,28 +33,29 @@ pub struct LogEntryFetcher {
} }
impl LogEntryFetcher { impl LogEntryFetcher {
pub async fn new( pub async fn new(config: &LogSyncConfig) -> Result<Self> {
url: &str,
contract_address: ContractAddress,
log_page_size: u64,
confirmation_delay: u64,
rate_limit_retries: u32,
timeout_retries: u32,
initial_backoff: u64,
) -> Result<Self> {
let provider = Arc::new(Provider::new( let provider = Arc::new(Provider::new(
RetryClientBuilder::default() RetryClientBuilder::default()
.rate_limit_retries(rate_limit_retries) .rate_limit_retries(config.rate_limit_retries)
.timeout_retries(timeout_retries) .timeout_retries(config.timeout_retries)
.initial_backoff(Duration::from_millis(initial_backoff)) .initial_backoff(Duration::from_millis(config.initial_backoff))
.build(Http::from_str(url)?, Box::new(HttpRateLimitRetryPolicy)), .build(
Http::new_with_client(
url::Url::parse(&config.rpc_endpoint_url)?,
reqwest::Client::builder()
.timeout(config.blockchain_rpc_timeout)
.connect_timeout(config.blockchain_rpc_timeout)
.build()?,
),
Box::new(HttpRateLimitRetryPolicy),
),
)); ));
// TODO: `error` types are removed from the ABI json file. // TODO: `error` types are removed from the ABI json file.
Ok(Self { Ok(Self {
contract_address, contract_address: config.contract_address,
provider, provider,
log_page_size, log_page_size: config.log_page_size,
confirmation_delay, confirmation_delay: config.confirmation_block_count,
}) })
} }

View File

@ -86,16 +86,7 @@ impl LogSyncManager {
.expect("shutdown send error") .expect("shutdown send error")
}, },
async move { async move {
let log_fetcher = LogEntryFetcher::new( let log_fetcher = LogEntryFetcher::new(&config).await?;
&config.rpc_endpoint_url,
config.contract_address,
config.log_page_size,
config.confirmation_block_count,
config.rate_limit_retries,
config.timeout_retries,
config.initial_backoff,
)
.await?;
let data_cache = DataCache::new(config.cache_config.clone()); let data_cache = DataCache::new(config.cache_config.clone());
let block_hash_cache = Arc::new(RwLock::new( let block_hash_cache = Arc::new(RwLock::new(

View File

@ -146,6 +146,7 @@ impl ZgsConfig {
self.remove_finalized_block_interval_minutes, self.remove_finalized_block_interval_minutes,
self.watch_loop_wait_time_ms, self.watch_loop_wait_time_ms,
self.force_log_sync_from_start_block_number, self.force_log_sync_from_start_block_number,
Duration::from_secs(self.blockchain_rpc_timeout_secs),
)) ))
} }

View File

@ -49,6 +49,8 @@ build_config! {
(remove_finalized_block_interval_minutes, (u64), 30) (remove_finalized_block_interval_minutes, (u64), 30)
(watch_loop_wait_time_ms, (u64), 500) (watch_loop_wait_time_ms, (u64), 500)
(blockchain_rpc_timeout_secs, (u64), 120)
// chunk pool // chunk pool
(chunk_pool_write_window_size, (usize), 4) (chunk_pool_write_window_size, (usize), 4)
(chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G (chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G

View File

@ -1,7 +1,7 @@
#[macro_use] #[macro_use]
extern crate tracing; extern crate tracing;
use anyhow::bail; use anyhow::{anyhow, bail};
use shared_types::{ use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
}; };
@ -140,17 +140,21 @@ impl Store {
let store = self.store.clone(); let store = self.store.clone();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.executor.spawn_blocking( self.executor
move || { .spawn_blocking_handle(
// FIXME(zz): Not all functions need `write`. Refactor store usage. move || {
let res = f(&*store); // FIXME(zz): Not all functions need `write`. Refactor store usage.
let res = f(&*store);
if tx.send(res).is_err() { if tx.send(res).is_err() {
error!("Unable to complete async storage operation: the receiver dropped"); error!("Unable to complete async storage operation: the receiver dropped");
} }
}, },
WORKER_TASK_NAME, WORKER_TASK_NAME,
); )
.ok_or(anyhow!("Unable to spawn async storage work"))?
.await
.map_err(|e| anyhow!("join error: e={:?}", e))?;
rx.await rx.await
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string()))) .unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))