reset parent block hash when reset progress

This commit is contained in:
Joel 2024-09-09 12:25:24 +08:00
parent 678d233f69
commit 24c0ca54fd

View File

@ -309,9 +309,11 @@ impl LogEntryFetcher {
check_watch_process(
&mut watch_progress_rx,
&mut progress,
&mut parent_block_hash,
&mut progress_reset_history,
watch_loop_wait_time_ms,
);
&block_hash_cache,
).await;
match Self::watch_loop(
provider.as_ref(),
@ -555,11 +557,13 @@ impl LogEntryFetcher {
}
}
fn check_watch_process(
async fn check_watch_process(
watch_progress_rx: &mut UnboundedReceiver<u64>,
progress: &mut u64,
parent_block_hash: &mut H256,
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
watch_loop_wait_time_ms: u64,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
) {
let mut min_received_progress = None;
while let Ok(v) = watch_progress_rx.try_recv() {
@ -570,6 +574,7 @@ fn check_watch_process(
};
}
let mut reset = false;
if let Some(v) = min_received_progress {
if *progress <= v {
error!(
@ -595,16 +600,36 @@ fn check_watch_process(
*progress = v;
*last_update = now;
*counter += 1;
reset = true;
}
}
None => {
info!("reset to progress from {} to {}", *progress, v);
*progress = v;
progress_reset_history.insert(v, (now, 1usize));
reset = true;
}
}
}
if reset {
*parent_block_hash = loop {
if let Some(block) = block_hash_cache.read().await.get(&(*progress - 1)) {
if let Some(v) = block {
break v.block_hash;
} else {
debug!(
"block_hash_cache wait for SyncedBlock processed for {}",
*progress - 1
);
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
}
} else {
panic!("parent block {} expect exist", *progress - 1);
}
};
}
progress_reset_history.retain(|k, _| k + 1000 >= *progress);
}