mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-27 00:35:18 +00:00
Compare commits
3 Commits
c956b28eb7
...
992deb0979
Author | SHA1 | Date | |
---|---|---|---|
|
992deb0979 | ||
|
0c493880ee | ||
|
07704fedc9 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -4652,12 +4652,14 @@ dependencies = [
|
|||||||
"jsonrpsee",
|
"jsonrpsee",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"metrics",
|
"metrics",
|
||||||
|
"reqwest",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"shared_types",
|
"shared_types",
|
||||||
"storage",
|
"storage",
|
||||||
"task_executor",
|
"task_executor",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -24,3 +24,5 @@ futures-util = "0.3.28"
|
|||||||
thiserror = "1.0.44"
|
thiserror = "1.0.44"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
metrics = { workspace = true }
|
metrics = { workspace = true }
|
||||||
|
reqwest = {version = "0.11", features = ["json"]}
|
||||||
|
url = { version = "2.4", default-features = false }
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::ContractAddress;
|
use crate::ContractAddress;
|
||||||
|
|
||||||
pub struct LogSyncConfig {
|
pub struct LogSyncConfig {
|
||||||
@ -34,6 +36,9 @@ pub struct LogSyncConfig {
|
|||||||
pub watch_loop_wait_time_ms: u64,
|
pub watch_loop_wait_time_ms: u64,
|
||||||
// force to sync log from start block number
|
// force to sync log from start block number
|
||||||
pub force_log_sync_from_start_block_number: bool,
|
pub force_log_sync_from_start_block_number: bool,
|
||||||
|
|
||||||
|
// the timeout for blockchain rpc connection
|
||||||
|
pub blockchain_rpc_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -61,6 +66,7 @@ impl LogSyncConfig {
|
|||||||
remove_finalized_block_interval_minutes: u64,
|
remove_finalized_block_interval_minutes: u64,
|
||||||
watch_loop_wait_time_ms: u64,
|
watch_loop_wait_time_ms: u64,
|
||||||
force_log_sync_from_start_block_number: bool,
|
force_log_sync_from_start_block_number: bool,
|
||||||
|
blockchain_rpc_timeout: Duration,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
rpc_endpoint_url,
|
rpc_endpoint_url,
|
||||||
@ -77,6 +83,7 @@ impl LogSyncConfig {
|
|||||||
remove_finalized_block_interval_minutes,
|
remove_finalized_block_interval_minutes,
|
||||||
watch_loop_wait_time_ms,
|
watch_loop_wait_time_ms,
|
||||||
force_log_sync_from_start_block_number,
|
force_log_sync_from_start_block_number,
|
||||||
|
blockchain_rpc_timeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use crate::sync_manager::log_query::LogQuery;
|
use crate::sync_manager::log_query::LogQuery;
|
||||||
use crate::sync_manager::RETRY_WAIT_MS;
|
use crate::sync_manager::RETRY_WAIT_MS;
|
||||||
use crate::ContractAddress;
|
use crate::{ContractAddress, LogSyncConfig};
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use append_merkle::{Algorithm, Sha3Algorithm};
|
use append_merkle::{Algorithm, Sha3Algorithm};
|
||||||
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
|
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
|
||||||
@ -12,7 +12,6 @@ use futures::StreamExt;
|
|||||||
use jsonrpsee::tracing::{debug, error, info, warn};
|
use jsonrpsee::tracing::{debug, error, info, warn};
|
||||||
use shared_types::{DataRoot, Transaction};
|
use shared_types::{DataRoot, Transaction};
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::str::FromStr;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||||
@ -34,28 +33,29 @@ pub struct LogEntryFetcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl LogEntryFetcher {
|
impl LogEntryFetcher {
|
||||||
pub async fn new(
|
pub async fn new(config: &LogSyncConfig) -> Result<Self> {
|
||||||
url: &str,
|
|
||||||
contract_address: ContractAddress,
|
|
||||||
log_page_size: u64,
|
|
||||||
confirmation_delay: u64,
|
|
||||||
rate_limit_retries: u32,
|
|
||||||
timeout_retries: u32,
|
|
||||||
initial_backoff: u64,
|
|
||||||
) -> Result<Self> {
|
|
||||||
let provider = Arc::new(Provider::new(
|
let provider = Arc::new(Provider::new(
|
||||||
RetryClientBuilder::default()
|
RetryClientBuilder::default()
|
||||||
.rate_limit_retries(rate_limit_retries)
|
.rate_limit_retries(config.rate_limit_retries)
|
||||||
.timeout_retries(timeout_retries)
|
.timeout_retries(config.timeout_retries)
|
||||||
.initial_backoff(Duration::from_millis(initial_backoff))
|
.initial_backoff(Duration::from_millis(config.initial_backoff))
|
||||||
.build(Http::from_str(url)?, Box::new(HttpRateLimitRetryPolicy)),
|
.build(
|
||||||
|
Http::new_with_client(
|
||||||
|
url::Url::parse(&config.rpc_endpoint_url)?,
|
||||||
|
reqwest::Client::builder()
|
||||||
|
.timeout(config.blockchain_rpc_timeout)
|
||||||
|
.connect_timeout(config.blockchain_rpc_timeout)
|
||||||
|
.build()?,
|
||||||
|
),
|
||||||
|
Box::new(HttpRateLimitRetryPolicy),
|
||||||
|
),
|
||||||
));
|
));
|
||||||
// TODO: `error` types are removed from the ABI json file.
|
// TODO: `error` types are removed from the ABI json file.
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
contract_address,
|
contract_address: config.contract_address,
|
||||||
provider,
|
provider,
|
||||||
log_page_size,
|
log_page_size: config.log_page_size,
|
||||||
confirmation_delay,
|
confirmation_delay: config.confirmation_block_count,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,16 +86,7 @@ impl LogSyncManager {
|
|||||||
.expect("shutdown send error")
|
.expect("shutdown send error")
|
||||||
},
|
},
|
||||||
async move {
|
async move {
|
||||||
let log_fetcher = LogEntryFetcher::new(
|
let log_fetcher = LogEntryFetcher::new(&config).await?;
|
||||||
&config.rpc_endpoint_url,
|
|
||||||
config.contract_address,
|
|
||||||
config.log_page_size,
|
|
||||||
config.confirmation_block_count,
|
|
||||||
config.rate_limit_retries,
|
|
||||||
config.timeout_retries,
|
|
||||||
config.initial_backoff,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
let data_cache = DataCache::new(config.cache_config.clone());
|
let data_cache = DataCache::new(config.cache_config.clone());
|
||||||
|
|
||||||
let block_hash_cache = Arc::new(RwLock::new(
|
let block_hash_cache = Arc::new(RwLock::new(
|
||||||
|
@ -146,6 +146,7 @@ impl ZgsConfig {
|
|||||||
self.remove_finalized_block_interval_minutes,
|
self.remove_finalized_block_interval_minutes,
|
||||||
self.watch_loop_wait_time_ms,
|
self.watch_loop_wait_time_ms,
|
||||||
self.force_log_sync_from_start_block_number,
|
self.force_log_sync_from_start_block_number,
|
||||||
|
Duration::from_secs(self.blockchain_rpc_timeout_secs),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +49,8 @@ build_config! {
|
|||||||
(remove_finalized_block_interval_minutes, (u64), 30)
|
(remove_finalized_block_interval_minutes, (u64), 30)
|
||||||
(watch_loop_wait_time_ms, (u64), 500)
|
(watch_loop_wait_time_ms, (u64), 500)
|
||||||
|
|
||||||
|
(blockchain_rpc_timeout_secs, (u64), 120)
|
||||||
|
|
||||||
// chunk pool
|
// chunk pool
|
||||||
(chunk_pool_write_window_size, (usize), 4)
|
(chunk_pool_write_window_size, (usize), 4)
|
||||||
(chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G
|
(chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::{anyhow, bail};
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
||||||
};
|
};
|
||||||
@ -140,17 +140,21 @@ impl Store {
|
|||||||
let store = self.store.clone();
|
let store = self.store.clone();
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
self.executor.spawn_blocking(
|
self.executor
|
||||||
move || {
|
.spawn_blocking_handle(
|
||||||
// FIXME(zz): Not all functions need `write`. Refactor store usage.
|
move || {
|
||||||
let res = f(&*store);
|
// FIXME(zz): Not all functions need `write`. Refactor store usage.
|
||||||
|
let res = f(&*store);
|
||||||
|
|
||||||
if tx.send(res).is_err() {
|
if tx.send(res).is_err() {
|
||||||
error!("Unable to complete async storage operation: the receiver dropped");
|
error!("Unable to complete async storage operation: the receiver dropped");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
WORKER_TASK_NAME,
|
WORKER_TASK_NAME,
|
||||||
);
|
)
|
||||||
|
.ok_or(anyhow!("Unable to spawn async storage work"))?
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow!("join error: e={:?}", e))?;
|
||||||
|
|
||||||
rx.await
|
rx.await
|
||||||
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))
|
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))
|
||||||
|
Loading…
Reference in New Issue
Block a user