get block hash from rpc if not found in local cache

This commit is contained in:
Joel 2024-09-13 19:05:25 +08:00
parent b4c30e2036
commit aad2af252a
2 changed files with 58 additions and 26 deletions

View File

@ -145,6 +145,15 @@ impl LogEntryFetcher {
}
};
let log_latest_block_number = match store.get_log_latest_block_number() {
Ok(Some(b)) => b,
Ok(None) => 0,
Err(e) => {
error!("get log latest block number error: e={:?}", e);
0
}
};
if let Some(processed_block_number) = processed_block_number {
let finalized_block_number =
match provider.get_block(BlockNumber::Finalized).await {
@ -168,10 +177,13 @@ impl LogEntryFetcher {
};
if let Some(finalized_block_number) = finalized_block_number {
if processed_block_number >= finalized_block_number {
let safe_block_number = std::cmp::min(
std::cmp::min(log_latest_block_number, finalized_block_number),
processed_block_number,
);
let mut pending_keys = vec![];
for (key, _) in block_hash_cache.read().await.iter() {
if *key < finalized_block_number {
if *key < safe_block_number {
pending_keys.push(*key);
} else {
break;
@ -180,17 +192,13 @@ impl LogEntryFetcher {
for key in pending_keys.into_iter() {
if let Err(e) = store.delete_block_hash_by_number(key) {
error!(
"remove block tx for number {} error: e={:?}",
key, e
);
error!("remove block tx for number {} error: e={:?}", key, e);
} else {
block_hash_cache.write().await.remove(&key);
}
}
}
}
}
tokio::time::sleep(Duration::from_secs(
60 * remove_finalized_block_interval_minutes,
@ -313,6 +321,7 @@ impl LogEntryFetcher {
&mut progress_reset_history,
watch_loop_wait_time_ms,
&block_hash_cache,
provider.as_ref(),
)
.await;
@ -580,6 +589,7 @@ async fn check_watch_process(
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
watch_loop_wait_time_ms: u64,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
provider: &Provider<RetryClient<Http>>,
) {
let mut min_received_progress = None;
while let Ok(v) = watch_progress_rx.try_recv() {
@ -641,8 +651,22 @@ async fn check_watch_process(
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
}
} else {
warn!(
"get block hash for block {} from RPC, assume there is no org",
*progress - 1
);
match provider.get_block(*progress - 1).await {
Ok(Some(v)) => {
v.hash.expect("parent block hash expect exist");
}
Ok(None) => {
panic!("parent block {} expect exist", *progress - 1);
}
Err(e) => {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
}
};
}

View File

@ -264,6 +264,18 @@ impl LogSyncManager {
warn!("catch_up_end send fails, possibly auto_sync is not enabled");
}
log_sync_manager
.log_fetcher
.start_remove_finalized_block_task(
&executor_clone,
log_sync_manager.store.clone(),
log_sync_manager.block_hash_cache.clone(),
log_sync_manager.config.default_finalized_block_count,
log_sync_manager
.config
.remove_finalized_block_interval_minutes,
);
let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel();
let watch_rx = log_sync_manager.log_fetcher.start_watch(
@ -509,14 +521,6 @@ impl LogSyncManager {
);
self.handle_data(recover_rx, &None).await?;
}
self.log_fetcher.start_remove_finalized_block_task(
&executor_clone,
self.store.clone(),
self.block_hash_cache.clone(),
self.config.default_finalized_block_count,
self.config.remove_finalized_block_interval_minutes,
);
Ok(())
}
}
@ -541,6 +545,10 @@ async fn get_start_block_number_with_hash(
.get(&block_number)
{
return Ok((block_number, val.block_hash));
} else {
warn!("get block hash for block {} from RPC", block_number);
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
return Ok((block_number, block_hash));
}
}