mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Allow for retry attempts in the watch loop
This commit is contained in:
parent
e20be63026
commit
7b84b4fcd0
@ -17,9 +17,12 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::{
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
pub struct LogEntryFetcher {
|
||||
@ -285,12 +288,14 @@ impl LogEntryFetcher {
|
||||
executor: &TaskExecutor,
|
||||
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
|
||||
watch_loop_wait_time_ms: u64,
|
||||
mut watch_progress_rx: UnboundedReceiver<u64>,
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
||||
let provider = self.provider.clone();
|
||||
let confirmation_delay = self.confirmation_delay;
|
||||
let log_page_size = self.log_page_size;
|
||||
let mut progress_reset_history = BTreeMap::new();
|
||||
executor.spawn(
|
||||
async move {
|
||||
debug!("start_watch starts, start={}", start_block_number);
|
||||
@ -298,6 +303,13 @@ impl LogEntryFetcher {
|
||||
let mut parent_block_hash = parent_block_hash;
|
||||
|
||||
loop {
|
||||
check_watch_process(
|
||||
&mut watch_progress_rx,
|
||||
&mut progress,
|
||||
&mut progress_reset_history,
|
||||
watch_loop_wait_time_ms,
|
||||
);
|
||||
|
||||
match Self::watch_loop(
|
||||
provider.as_ref(),
|
||||
progress,
|
||||
@ -502,6 +514,9 @@ impl LogEntryFetcher {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if !log_events.is_empty() {
|
||||
info!("synced {} events", log_events.len());
|
||||
for log in log_events.into_iter() {
|
||||
if let Err(e) = watch_tx.send(log) {
|
||||
warn!("send LogFetchProgress::Transaction failed: {:?}", e);
|
||||
@ -516,6 +531,7 @@ impl LogEntryFetcher {
|
||||
block_hash_cache.write().await.insert(p.0, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
progress = new_progress;
|
||||
}
|
||||
}
|
||||
@ -528,6 +544,59 @@ impl LogEntryFetcher {
|
||||
}
|
||||
}
|
||||
|
||||
fn check_watch_process(
|
||||
watch_progress_rx: &mut UnboundedReceiver<u64>,
|
||||
progress: &mut u64,
|
||||
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
|
||||
watch_loop_wait_time_ms: u64,
|
||||
) {
|
||||
let mut min_received_progress = None;
|
||||
while let Ok(v) = watch_progress_rx.try_recv() {
|
||||
min_received_progress = match min_received_progress {
|
||||
Some(min) if min > v => Some(v),
|
||||
None => Some(v),
|
||||
_ => min_received_progress,
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(v) = min_received_progress {
|
||||
if *progress <= v {
|
||||
error!(
|
||||
"received unexpected progress, current {}, received {}",
|
||||
*progress, v
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
match progress_reset_history.get_mut(&v) {
|
||||
Some((last_update, counter)) => {
|
||||
if *counter >= 3 {
|
||||
error!("maximum reset attempts have been reached.");
|
||||
watch_progress_rx.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if now.duration_since(*last_update)
|
||||
>= Duration::from_millis(watch_loop_wait_time_ms * 30)
|
||||
{
|
||||
info!("reset to progress from {} to {}", *progress, v);
|
||||
*progress = v;
|
||||
*last_update = now;
|
||||
*counter += 1;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("reset to progress from {} to {}", *progress, v);
|
||||
*progress = v;
|
||||
progress_reset_history.insert(v, (now, 1usize));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
progress_reset_history.retain(|k, _| k + 1000 >= *progress);
|
||||
}
|
||||
|
||||
async fn revert_one_block(
|
||||
block_hash: H256,
|
||||
block_number: u64,
|
||||
|
@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::{ShutdownReason, TaskExecutor};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
const RETRY_WAIT_MS: u64 = 500;
|
||||
@ -146,7 +146,7 @@ impl LogSyncManager {
|
||||
&executor_clone,
|
||||
log_sync_manager.block_hash_cache.clone(),
|
||||
);
|
||||
log_sync_manager.handle_data(reorg_rx).await?;
|
||||
log_sync_manager.handle_data(reorg_rx, &None).await?;
|
||||
if let Some((block_number, block_hash)) =
|
||||
log_sync_manager.store.get_sync_progress()?
|
||||
{
|
||||
@ -223,15 +223,20 @@ impl LogSyncManager {
|
||||
warn!("catch_up_end send fails, possibly auto_sync is not enabled");
|
||||
}
|
||||
|
||||
let (watch_progress_tx, watch_progress_rx) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let watch_rx = log_sync_manager.log_fetcher.start_watch(
|
||||
start_block_number,
|
||||
parent_block_hash,
|
||||
&executor_clone,
|
||||
log_sync_manager.block_hash_cache.clone(),
|
||||
log_sync_manager.config.watch_loop_wait_time_ms,
|
||||
watch_progress_rx,
|
||||
);
|
||||
// Syncing `watch_rx` is supposed to block forever.
|
||||
log_sync_manager.handle_data(watch_rx).await?;
|
||||
log_sync_manager
|
||||
.handle_data(watch_rx, &Some(watch_progress_tx))
|
||||
.await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
},
|
||||
)
|
||||
@ -241,20 +246,20 @@ impl LogSyncManager {
|
||||
Ok((event_send_cloned, catch_up_end_receiver))
|
||||
}
|
||||
|
||||
async fn put_tx(&mut self, tx: Transaction) -> bool {
|
||||
async fn put_tx(&mut self, tx: Transaction) -> Option<bool> {
|
||||
// We call this after process chain reorg, so the sequence number should match.
|
||||
match tx.seq.cmp(&self.next_tx_seq) {
|
||||
std::cmp::Ordering::Less => true,
|
||||
std::cmp::Ordering::Less => Some(true),
|
||||
std::cmp::Ordering::Equal => {
|
||||
debug!("log entry sync get entry: {:?}", tx);
|
||||
self.put_tx_inner(tx).await
|
||||
Some(self.put_tx_inner(tx).await)
|
||||
}
|
||||
std::cmp::Ordering::Greater => {
|
||||
error!(
|
||||
"Unexpected transaction seq: next={} get={}",
|
||||
self.next_tx_seq, tx.seq
|
||||
);
|
||||
false
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -296,7 +301,11 @@ impl LogSyncManager {
|
||||
let _ = self.event_send.send(LogSyncEvent::Reverted { tx_seq });
|
||||
}
|
||||
|
||||
async fn handle_data(&mut self, mut rx: UnboundedReceiver<LogFetchProgress>) -> Result<()> {
|
||||
async fn handle_data(
|
||||
&mut self,
|
||||
mut rx: UnboundedReceiver<LogFetchProgress>,
|
||||
watch_progress_tx: &Option<UnboundedSender<u64>>,
|
||||
) -> Result<()> {
|
||||
while let Some(data) = rx.recv().await {
|
||||
trace!("handle_data: data={:?}", data);
|
||||
match data {
|
||||
@ -337,7 +346,26 @@ impl LogSyncManager {
|
||||
}
|
||||
}
|
||||
LogFetchProgress::Transaction(tx) => {
|
||||
if !self.put_tx(tx.clone()).await {
|
||||
let mut stop = false;
|
||||
match self.put_tx(tx.clone()).await {
|
||||
Some(false) => stop = true,
|
||||
Some(true) => {}
|
||||
_ => {
|
||||
stop = true;
|
||||
|
||||
if let Some(progress_tx) = watch_progress_tx {
|
||||
if let Some((block_number, _)) = self.store.get_sync_progress()? {
|
||||
if let Err(e) = progress_tx.send(block_number) {
|
||||
error!("failed to send watch progress, error={:?}", e);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if stop {
|
||||
// Unexpected error.
|
||||
bail!("log sync write error");
|
||||
}
|
||||
@ -426,7 +454,7 @@ impl LogSyncManager {
|
||||
&executor_clone,
|
||||
Duration::from_millis(self.config.recover_query_delay),
|
||||
);
|
||||
self.handle_data(recover_rx).await?;
|
||||
self.handle_data(recover_rx, &None).await?;
|
||||
}
|
||||
|
||||
self.log_fetcher.start_remove_finalized_block_task(
|
||||
|
Loading…
Reference in New Issue
Block a user