From aad2af252a51f333216ecd95e2083daca373ba04 Mon Sep 17 00:00:00 2001 From: Joel Date: Fri, 13 Sep 2024 19:05:25 +0800 Subject: [PATCH] get block hash from rpc if not found in local cache --- .../src/sync_manager/log_entry_fetcher.rs | 60 +++++++++++++------ node/log_entry_sync/src/sync_manager/mod.rs | 24 +++++--- 2 files changed, 58 insertions(+), 26 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index b585c19..93239e3 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -145,6 +145,15 @@ impl LogEntryFetcher { } }; + let log_latest_block_number = match store.get_log_latest_block_number() { + Ok(Some(b)) => b, + Ok(None) => 0, + Err(e) => { + error!("get log latest block number error: e={:?}", e); + 0 + } + }; + if let Some(processed_block_number) = processed_block_number { let finalized_block_number = match provider.get_block(BlockNumber::Finalized).await { @@ -168,25 +177,24 @@ impl LogEntryFetcher { }; if let Some(finalized_block_number) = finalized_block_number { - if processed_block_number >= finalized_block_number { - let mut pending_keys = vec![]; - for (key, _) in block_hash_cache.read().await.iter() { - if *key < finalized_block_number { - pending_keys.push(*key); - } else { - break; - } + let safe_block_number = std::cmp::min( + std::cmp::min(log_latest_block_number, finalized_block_number), + processed_block_number, + ); + let mut pending_keys = vec![]; + for (key, _) in block_hash_cache.read().await.iter() { + if *key < safe_block_number { + pending_keys.push(*key); + } else { + break; } + } - for key in pending_keys.into_iter() { - if let Err(e) = store.delete_block_hash_by_number(key) { - error!( - "remove block tx for number {} error: e={:?}", - key, e - ); - } else { - block_hash_cache.write().await.remove(&key); - } + for key in pending_keys.into_iter() { + if let Err(e) = store.delete_block_hash_by_number(key) { + error!("remove block tx for number {} error: e={:?}", key, e); + } else { + block_hash_cache.write().await.remove(&key); } } } @@ -313,6 +321,7 @@ impl LogEntryFetcher { &mut progress_reset_history, watch_loop_wait_time_ms, &block_hash_cache, + provider.as_ref(), ) .await; @@ -580,6 +589,7 @@ async fn check_watch_process( progress_reset_history: &mut BTreeMap, watch_loop_wait_time_ms: u64, block_hash_cache: &Arc>>>, + provider: &Provider>, ) { let mut min_received_progress = None; while let Ok(v) = watch_progress_rx.try_recv() { @@ -641,7 +651,21 @@ async fn check_watch_process( tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await; } } else { - panic!("parent block {} expect exist", *progress - 1); + warn!( + "get block hash for block {} from RPC, assume there is no org", + *progress - 1 + ); + match provider.get_block(*progress - 1).await { + Ok(Some(v)) => { + v.hash.expect("parent block hash expect exist"); + } + Ok(None) => { + panic!("parent block {} expect exist", *progress - 1); + } + Err(e) => { + panic!("parent block {} expect exist, error {}", *progress - 1, e); + } + } } }; } diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 418afea..7412779 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -264,6 +264,18 @@ impl LogSyncManager { warn!("catch_up_end send fails, possibly auto_sync is not enabled"); } + log_sync_manager + .log_fetcher + .start_remove_finalized_block_task( + &executor_clone, + log_sync_manager.store.clone(), + log_sync_manager.block_hash_cache.clone(), + log_sync_manager.config.default_finalized_block_count, + log_sync_manager + .config + .remove_finalized_block_interval_minutes, + ); + let (watch_progress_tx, watch_progress_rx) = tokio::sync::mpsc::unbounded_channel(); let watch_rx = log_sync_manager.log_fetcher.start_watch( @@ -509,14 +521,6 @@ impl LogSyncManager { ); self.handle_data(recover_rx, &None).await?; } - - self.log_fetcher.start_remove_finalized_block_task( - &executor_clone, - self.store.clone(), - self.block_hash_cache.clone(), - self.config.default_finalized_block_count, - self.config.remove_finalized_block_interval_minutes, - ); Ok(()) } } @@ -541,6 +545,10 @@ async fn get_start_block_number_with_hash( .get(&block_number) { return Ok((block_number, val.block_hash)); + } else { + warn!("get block hash for block {} from RPC", block_number); + let block_hash = log_sync_manager.get_block(block_number.into()).await?.1; + return Ok((block_number, block_hash)); } }