reset parent block hash when reset progress (#188)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled

* reset parent block hash when reset progress
This commit is contained in:
Joel Liu 2024-09-09 14:51:08 +08:00 committed by GitHub
parent 678d233f69
commit 702680f3a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -309,9 +309,12 @@ impl LogEntryFetcher {
check_watch_process( check_watch_process(
&mut watch_progress_rx, &mut watch_progress_rx,
&mut progress, &mut progress,
&mut parent_block_hash,
&mut progress_reset_history, &mut progress_reset_history,
watch_loop_wait_time_ms, watch_loop_wait_time_ms,
); &block_hash_cache,
)
.await;
match Self::watch_loop( match Self::watch_loop(
provider.as_ref(), provider.as_ref(),
@ -555,11 +558,13 @@ impl LogEntryFetcher {
} }
} }
fn check_watch_process( async fn check_watch_process(
watch_progress_rx: &mut UnboundedReceiver<u64>, watch_progress_rx: &mut UnboundedReceiver<u64>,
progress: &mut u64, progress: &mut u64,
parent_block_hash: &mut H256,
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>, progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
watch_loop_wait_time_ms: u64, watch_loop_wait_time_ms: u64,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
) { ) {
let mut min_received_progress = None; let mut min_received_progress = None;
while let Ok(v) = watch_progress_rx.try_recv() { while let Ok(v) = watch_progress_rx.try_recv() {
@ -570,6 +575,7 @@ fn check_watch_process(
}; };
} }
let mut reset = false;
if let Some(v) = min_received_progress { if let Some(v) = min_received_progress {
if *progress <= v { if *progress <= v {
error!( error!(
@ -595,16 +601,36 @@ fn check_watch_process(
*progress = v; *progress = v;
*last_update = now; *last_update = now;
*counter += 1; *counter += 1;
reset = true;
} }
} }
None => { None => {
info!("reset to progress from {} to {}", *progress, v); info!("reset to progress from {} to {}", *progress, v);
*progress = v; *progress = v;
progress_reset_history.insert(v, (now, 1usize)); progress_reset_history.insert(v, (now, 1usize));
reset = true;
} }
} }
} }
if reset {
*parent_block_hash = loop {
if let Some(block) = block_hash_cache.read().await.get(&(*progress - 1)) {
if let Some(v) = block {
break v.block_hash;
} else {
debug!(
"block_hash_cache wait for SyncedBlock processed for {}",
*progress - 1
);
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
}
} else {
panic!("parent block {} expect exist", *progress - 1);
}
};
}
progress_reset_history.retain(|k, _| k + 1000 >= *progress); progress_reset_history.retain(|k, _| k + 1000 >= *progress);
} }