From 702680f3a4de047d016533d1d16dacbf04bf4401 Mon Sep 17 00:00:00 2001 From: Joel Liu <83172559+csdtowards@users.noreply.github.com> Date: Mon, 9 Sep 2024 14:51:08 +0800 Subject: [PATCH] reset parent block hash when reset progress (#188) * reset parent block hash when reset progress --- .../src/sync_manager/log_entry_fetcher.rs | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index c66c616..8ccf435 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -309,9 +309,12 @@ impl LogEntryFetcher { check_watch_process( &mut watch_progress_rx, &mut progress, + &mut parent_block_hash, &mut progress_reset_history, watch_loop_wait_time_ms, - ); + &block_hash_cache, + ) + .await; match Self::watch_loop( provider.as_ref(), @@ -555,11 +558,13 @@ impl LogEntryFetcher { } } -fn check_watch_process( +async fn check_watch_process( watch_progress_rx: &mut UnboundedReceiver, progress: &mut u64, + parent_block_hash: &mut H256, progress_reset_history: &mut BTreeMap, watch_loop_wait_time_ms: u64, + block_hash_cache: &Arc>>>, ) { let mut min_received_progress = None; while let Ok(v) = watch_progress_rx.try_recv() { @@ -570,6 +575,7 @@ fn check_watch_process( }; } + let mut reset = false; if let Some(v) = min_received_progress { if *progress <= v { error!( @@ -595,16 +601,36 @@ fn check_watch_process( *progress = v; *last_update = now; *counter += 1; + reset = true; } } None => { info!("reset to progress from {} to {}", *progress, v); *progress = v; progress_reset_history.insert(v, (now, 1usize)); + reset = true; } } } + if reset { + *parent_block_hash = loop { + if let Some(block) = block_hash_cache.read().await.get(&(*progress - 1)) { + if let Some(v) = block { + break v.block_hash; + } else { + debug!( + "block_hash_cache wait for SyncedBlock processed for {}", + *progress - 1 + ); + tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await; + } + } else { + panic!("parent block {} expect exist", *progress - 1); + } + }; + } + progress_reset_history.retain(|k, _| k + 1000 >= *progress); }