mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Compare commits
4 Commits
96376e0801
...
cce84da433
Author | SHA1 | Date | |
---|---|---|---|
|
cce84da433 | ||
|
a153955246 | ||
|
f878a4849c | ||
|
a9f5169c15 |
@ -145,6 +145,15 @@ impl LogEntryFetcher {
|
||||
}
|
||||
};
|
||||
|
||||
let log_latest_block_number = match store.get_log_latest_block_number() {
|
||||
Ok(Some(b)) => b,
|
||||
Ok(None) => 0,
|
||||
Err(e) => {
|
||||
error!("get log latest block number error: e={:?}", e);
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(processed_block_number) = processed_block_number {
|
||||
let finalized_block_number =
|
||||
match provider.get_block(BlockNumber::Finalized).await {
|
||||
@ -168,25 +177,24 @@ impl LogEntryFetcher {
|
||||
};
|
||||
|
||||
if let Some(finalized_block_number) = finalized_block_number {
|
||||
if processed_block_number >= finalized_block_number {
|
||||
let mut pending_keys = vec![];
|
||||
for (key, _) in block_hash_cache.read().await.iter() {
|
||||
if *key < finalized_block_number {
|
||||
pending_keys.push(*key);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
let safe_block_number = std::cmp::min(
|
||||
std::cmp::min(log_latest_block_number, finalized_block_number),
|
||||
processed_block_number,
|
||||
);
|
||||
let mut pending_keys = vec![];
|
||||
for (key, _) in block_hash_cache.read().await.iter() {
|
||||
if *key < safe_block_number {
|
||||
pending_keys.push(*key);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for key in pending_keys.into_iter() {
|
||||
if let Err(e) = store.delete_block_hash_by_number(key) {
|
||||
error!(
|
||||
"remove block tx for number {} error: e={:?}",
|
||||
key, e
|
||||
);
|
||||
} else {
|
||||
block_hash_cache.write().await.remove(&key);
|
||||
}
|
||||
for key in pending_keys.into_iter() {
|
||||
if let Err(e) = store.delete_block_hash_by_number(key) {
|
||||
error!("remove block tx for number {} error: e={:?}", key, e);
|
||||
} else {
|
||||
block_hash_cache.write().await.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -313,6 +321,7 @@ impl LogEntryFetcher {
|
||||
&mut progress_reset_history,
|
||||
watch_loop_wait_time_ms,
|
||||
&block_hash_cache,
|
||||
provider.as_ref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
@ -384,6 +393,10 @@ impl LogEntryFetcher {
|
||||
);
|
||||
}
|
||||
|
||||
if block.logs_bloom.is_none() {
|
||||
bail!("block {:?} logs bloom is none", block.number);
|
||||
}
|
||||
|
||||
if from_block_number > 0 && block.parent_hash != parent_block_hash {
|
||||
// reorg happened
|
||||
let (parent_block_number, block_hash) = revert_one_block(
|
||||
@ -412,13 +425,22 @@ impl LogEntryFetcher {
|
||||
block.number
|
||||
);
|
||||
}
|
||||
if Some(block.parent_hash) != parent_block_hash {
|
||||
if parent_block_hash.is_none() || Some(block.parent_hash) != parent_block_hash {
|
||||
bail!(
|
||||
"parent block hash mismatch, expected {:?}, actual {}",
|
||||
parent_block_hash,
|
||||
block.parent_hash
|
||||
);
|
||||
}
|
||||
|
||||
if block_number == to_block_number && block.hash.is_none() {
|
||||
bail!("block {:?} hash is none", block.number);
|
||||
}
|
||||
|
||||
if block.logs_bloom.is_none() {
|
||||
bail!("block {:?} logs bloom is none", block.number);
|
||||
}
|
||||
|
||||
parent_block_hash = block.hash;
|
||||
blocks.insert(block_number, block);
|
||||
}
|
||||
@ -470,7 +492,7 @@ impl LogEntryFetcher {
|
||||
}
|
||||
|
||||
let tx = txs_hm[&log.transaction_index];
|
||||
if log.transaction_hash != Some(tx.hash) {
|
||||
if log.transaction_hash.is_none() || log.transaction_hash != Some(tx.hash) {
|
||||
warn!(
|
||||
"log tx hash mismatch, log transaction {:?}, block transaction {:?}",
|
||||
log.transaction_hash,
|
||||
@ -478,7 +500,9 @@ impl LogEntryFetcher {
|
||||
);
|
||||
return Ok(progress);
|
||||
}
|
||||
if log.transaction_index != tx.transaction_index {
|
||||
if log.transaction_index.is_none()
|
||||
|| log.transaction_index != tx.transaction_index
|
||||
{
|
||||
warn!(
|
||||
"log tx index mismatch, log tx index {:?}, block transaction index {:?}",
|
||||
log.transaction_index,
|
||||
@ -565,6 +589,7 @@ async fn check_watch_process(
|
||||
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
|
||||
watch_loop_wait_time_ms: u64,
|
||||
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
|
||||
provider: &Provider<RetryClient<Http>>,
|
||||
) {
|
||||
let mut min_received_progress = None;
|
||||
while let Ok(v) = watch_progress_rx.try_recv() {
|
||||
@ -626,7 +651,21 @@ async fn check_watch_process(
|
||||
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
|
||||
}
|
||||
} else {
|
||||
panic!("parent block {} expect exist", *progress - 1);
|
||||
warn!(
|
||||
"get block hash for block {} from RPC, assume there is no org",
|
||||
*progress - 1
|
||||
);
|
||||
match provider.get_block(*progress - 1).await {
|
||||
Ok(Some(v)) => {
|
||||
v.hash.expect("parent block hash expect exist");
|
||||
}
|
||||
Ok(None) => {
|
||||
panic!("parent block {} expect exist", *progress - 1);
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("parent block {} expect exist, error {}", *progress - 1, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::{ShutdownReason, TaskExecutor};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
@ -25,6 +26,17 @@ const RETRY_WAIT_MS: u64 = 500;
|
||||
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
||||
const CATCH_UP_END_GAP: u64 = 10;
|
||||
|
||||
/// Errors while handle data
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HandleDataError {
|
||||
/// Sequence Error
|
||||
#[error("transaction seq is great than expected, expect block number {0}")]
|
||||
SeqError(u64),
|
||||
/// Other Errors
|
||||
#[error("{0}")]
|
||||
CommonError(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum LogSyncEvent {
|
||||
/// Chain reorg detected without any operation yet.
|
||||
@ -189,13 +201,51 @@ impl LogSyncManager {
|
||||
} else {
|
||||
// Keep catching-up data until we are close to the latest height.
|
||||
loop {
|
||||
log_sync_manager
|
||||
// wait tx receipt is ready
|
||||
if let Ok(Some(block)) = log_sync_manager
|
||||
.log_fetcher
|
||||
.provider()
|
||||
.get_block_with_txs(finalized_block_number)
|
||||
.await
|
||||
{
|
||||
if let Some(tx) = block.transactions.first() {
|
||||
loop {
|
||||
match log_sync_manager
|
||||
.log_fetcher
|
||||
.provider()
|
||||
.get_transaction_receipt(tx.hash)
|
||||
.await
|
||||
{
|
||||
Ok(Some(_)) => break,
|
||||
_ => {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Err(e) = log_sync_manager
|
||||
.catch_up_data(
|
||||
executor_clone.clone(),
|
||||
start_block_number,
|
||||
finalized_block_number,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
HandleDataError::SeqError(block_number) => {
|
||||
warn!("seq error occurred, retry from {}", block_number);
|
||||
start_block_number = block_number;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
_ => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_block_number = finalized_block_number.saturating_add(1);
|
||||
|
||||
let new_finalized_block =
|
||||
@ -214,6 +264,18 @@ impl LogSyncManager {
|
||||
warn!("catch_up_end send fails, possibly auto_sync is not enabled");
|
||||
}
|
||||
|
||||
log_sync_manager
|
||||
.log_fetcher
|
||||
.start_remove_finalized_block_task(
|
||||
&executor_clone,
|
||||
log_sync_manager.store.clone(),
|
||||
log_sync_manager.block_hash_cache.clone(),
|
||||
log_sync_manager.config.default_finalized_block_count,
|
||||
log_sync_manager
|
||||
.config
|
||||
.remove_finalized_block_interval_minutes,
|
||||
);
|
||||
|
||||
let (watch_progress_tx, watch_progress_rx) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let watch_rx = log_sync_manager.log_fetcher.start_watch(
|
||||
@ -296,7 +358,7 @@ impl LogSyncManager {
|
||||
&mut self,
|
||||
mut rx: UnboundedReceiver<LogFetchProgress>,
|
||||
watch_progress_tx: &Option<UnboundedSender<u64>>,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), HandleDataError> {
|
||||
let mut log_latest_block_number =
|
||||
if let Some(block_number) = self.store.get_log_latest_block_number()? {
|
||||
block_number
|
||||
@ -362,13 +424,15 @@ impl LogSyncManager {
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
return Err(HandleDataError::SeqError(log_latest_block_number));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if stop {
|
||||
// Unexpected error.
|
||||
bail!("log sync write error");
|
||||
return Err(anyhow!("log sync write error").into());
|
||||
}
|
||||
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
|
||||
// TODO: Do we need to wait until all receivers are initialized?
|
||||
@ -447,7 +511,7 @@ impl LogSyncManager {
|
||||
executor_clone: TaskExecutor,
|
||||
start_block_number: u64,
|
||||
finalized_block_number: u64,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), HandleDataError> {
|
||||
if start_block_number < finalized_block_number {
|
||||
let recover_rx = self.log_fetcher.start_recover(
|
||||
start_block_number,
|
||||
@ -457,14 +521,6 @@ impl LogSyncManager {
|
||||
);
|
||||
self.handle_data(recover_rx, &None).await?;
|
||||
}
|
||||
|
||||
self.log_fetcher.start_remove_finalized_block_task(
|
||||
&executor_clone,
|
||||
self.store.clone(),
|
||||
self.block_hash_cache.clone(),
|
||||
self.config.default_finalized_block_count,
|
||||
self.config.remove_finalized_block_interval_minutes,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -489,6 +545,10 @@ async fn get_start_block_number_with_hash(
|
||||
.get(&block_number)
|
||||
{
|
||||
return Ok((block_number, val.block_hash));
|
||||
} else {
|
||||
warn!("get block hash for block {} from RPC", block_number);
|
||||
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
|
||||
return Ok((block_number, block_hash));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,6 +29,7 @@ pub struct Status {
|
||||
pub connected_peers: usize,
|
||||
pub log_sync_height: u64,
|
||||
pub log_sync_block: H256,
|
||||
pub next_tx_seq: u64,
|
||||
pub network_identity: NetworkIdentity,
|
||||
}
|
||||
|
||||
|
@ -26,10 +26,13 @@ impl RpcServer for RpcServerImpl {
|
||||
.get_sync_progress()?
|
||||
.unwrap_or_default();
|
||||
|
||||
let next_tx_seq = self.ctx.log_store.get_store().next_tx_seq();
|
||||
|
||||
Ok(Status {
|
||||
connected_peers: self.ctx.network_globals.connected_peers(),
|
||||
log_sync_height: sync_progress.0,
|
||||
log_sync_block: sync_progress.1,
|
||||
next_tx_seq,
|
||||
network_identity: self.ctx.network_globals.network_id(),
|
||||
})
|
||||
}
|
||||
|
@ -1,10 +1,14 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_timer, Counter, CounterUsize, Histogram, Sample, Timer};
|
||||
use metrics::{
|
||||
register_meter, register_timer, Counter, CounterUsize, Histogram, Meter, Sample, Timer,
|
||||
};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref SERIAL_SYNC_FILE_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_file_completed");
|
||||
pub static ref SERIAL_SYNC_CHUNKS_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_chunks_completed");
|
||||
|
||||
pub static ref SERIAL_SYNC_SEGMENT_BANDWIDTH: Arc<dyn Meter> = register_meter("sync_controllers_serial_sync_segment_bandwidth");
|
||||
pub static ref SERIAL_SYNC_SEGMENT_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_controllers_serial_sync_segment_latency", 1024);
|
||||
pub static ref SERIAL_SYNC_SEGMENT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_segment_timeout");
|
||||
pub static ref SERIAL_SYNC_UNEXPECTED_ERRORS: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_unexpected_errors");
|
||||
|
@ -11,6 +11,7 @@ use network::{
|
||||
};
|
||||
use rand::Rng;
|
||||
use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
|
||||
use ssz::Encode;
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
||||
use storage_async::Store;
|
||||
@ -255,6 +256,17 @@ impl SerialSyncController {
|
||||
|
||||
/// Randomly select a peer to sync the next segment.
|
||||
fn try_request_next(&mut self) {
|
||||
// limits network bandwidth if configured
|
||||
if self.config.max_bandwidth_bytes > 0 {
|
||||
let m1 = metrics::SERIAL_SYNC_SEGMENT_BANDWIDTH.rate1() as u64;
|
||||
if m1 > self.config.max_bandwidth_bytes {
|
||||
self.state = SyncState::AwaitingDownload {
|
||||
since: (Instant::now() + self.config.bandwidth_wait_timeout).into(),
|
||||
};
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// request next chunk array
|
||||
let from_chunk = self.next_chunk;
|
||||
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
|
||||
@ -407,6 +419,8 @@ impl SerialSyncController {
|
||||
}
|
||||
|
||||
pub async fn on_response(&mut self, from_peer_id: PeerId, response: ChunkArrayWithProof) {
|
||||
metrics::SERIAL_SYNC_SEGMENT_BANDWIDTH.mark(response.ssz_bytes_len());
|
||||
|
||||
if self.handle_on_response_mismatch(from_peer_id) {
|
||||
return;
|
||||
}
|
||||
|
@ -43,6 +43,9 @@ pub struct Config {
|
||||
pub peer_wait_outgoing_connection_timeout: Duration,
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
pub peer_next_chunks_request_wait_timeout: Duration,
|
||||
pub max_bandwidth_bytes: u64,
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
pub bandwidth_wait_timeout: Duration,
|
||||
|
||||
// auto sync config
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
@ -76,6 +79,8 @@ impl Default for Config {
|
||||
peer_chunks_download_timeout: Duration::from_secs(15),
|
||||
peer_wait_outgoing_connection_timeout: Duration::from_secs(10),
|
||||
peer_next_chunks_request_wait_timeout: Duration::from_secs(3),
|
||||
max_bandwidth_bytes: 0,
|
||||
bandwidth_wait_timeout: Duration::from_secs(5),
|
||||
|
||||
// auto sync config
|
||||
auto_sync_idle_interval: Duration::from_secs(3),
|
||||
|
@ -263,6 +263,10 @@ auto_sync_enabled = true
|
||||
# Timeout to download data from remote peer.
|
||||
# peer_chunks_download_timeout = "15s"
|
||||
|
||||
# Maximum network bandwidth (B/s) to sync files. Default value is 0,
|
||||
# which indicates no limitation.
|
||||
# max_bandwidth_bytes = 0
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 0
|
||||
|
||||
|
@ -275,6 +275,10 @@ auto_sync_enabled = true
|
||||
# Timeout to download data from remote peer.
|
||||
# peer_chunks_download_timeout = "15s"
|
||||
|
||||
# Maximum network bandwidth (B/s) to sync files. Default value is 0,
|
||||
# which indicates no limitation.
|
||||
# max_bandwidth_bytes = 0
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 0
|
||||
|
||||
|
@ -277,6 +277,10 @@
|
||||
# Timeout to download data from remote peer.
|
||||
# peer_chunks_download_timeout = "15s"
|
||||
|
||||
# Maximum network bandwidth (B/s) to sync files. Default value is 0,
|
||||
# which indicates no limitation.
|
||||
# max_bandwidth_bytes = 0
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 0
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user