From f878a4849c1d45a9bef0527ea752c2710502334e Mon Sep 17 00:00:00 2001 From: Joel Liu <83172559+csdtowards@users.noreply.github.com> Date: Sat, 14 Sep 2024 09:05:11 +0800 Subject: [PATCH] Handle cases where sequence is not continuous during catch-up (#199) * Handle cases where the sequence is not continuous during the catch-up process * get block hash from rpc if not found in local cache --- .../src/sync_manager/log_entry_fetcher.rs | 81 ++++++++++++----- node/log_entry_sync/src/sync_manager/mod.rs | 86 ++++++++++++++++--- 2 files changed, 133 insertions(+), 34 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index 8ccf435..93239e3 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -145,6 +145,15 @@ impl LogEntryFetcher { } }; + let log_latest_block_number = match store.get_log_latest_block_number() { + Ok(Some(b)) => b, + Ok(None) => 0, + Err(e) => { + error!("get log latest block number error: e={:?}", e); + 0 + } + }; + if let Some(processed_block_number) = processed_block_number { let finalized_block_number = match provider.get_block(BlockNumber::Finalized).await { @@ -168,25 +177,24 @@ impl LogEntryFetcher { }; if let Some(finalized_block_number) = finalized_block_number { - if processed_block_number >= finalized_block_number { - let mut pending_keys = vec![]; - for (key, _) in block_hash_cache.read().await.iter() { - if *key < finalized_block_number { - pending_keys.push(*key); - } else { - break; - } + let safe_block_number = std::cmp::min( + std::cmp::min(log_latest_block_number, finalized_block_number), + processed_block_number, + ); + let mut pending_keys = vec![]; + for (key, _) in block_hash_cache.read().await.iter() { + if *key < safe_block_number { + pending_keys.push(*key); + } else { + break; } + } - for key in pending_keys.into_iter() { - if let Err(e) = store.delete_block_hash_by_number(key) { - error!( - "remove block tx for number {} error: e={:?}", - key, e - ); - } else { - block_hash_cache.write().await.remove(&key); - } + for key in pending_keys.into_iter() { + if let Err(e) = store.delete_block_hash_by_number(key) { + error!("remove block tx for number {} error: e={:?}", key, e); + } else { + block_hash_cache.write().await.remove(&key); } } } @@ -313,6 +321,7 @@ impl LogEntryFetcher { &mut progress_reset_history, watch_loop_wait_time_ms, &block_hash_cache, + provider.as_ref(), ) .await; @@ -384,6 +393,10 @@ impl LogEntryFetcher { ); } + if block.logs_bloom.is_none() { + bail!("block {:?} logs bloom is none", block.number); + } + if from_block_number > 0 && block.parent_hash != parent_block_hash { // reorg happened let (parent_block_number, block_hash) = revert_one_block( @@ -412,13 +425,22 @@ impl LogEntryFetcher { block.number ); } - if Some(block.parent_hash) != parent_block_hash { + if parent_block_hash.is_none() || Some(block.parent_hash) != parent_block_hash { bail!( "parent block hash mismatch, expected {:?}, actual {}", parent_block_hash, block.parent_hash ); } + + if block_number == to_block_number && block.hash.is_none() { + bail!("block {:?} hash is none", block.number); + } + + if block.logs_bloom.is_none() { + bail!("block {:?} logs bloom is none", block.number); + } + parent_block_hash = block.hash; blocks.insert(block_number, block); } @@ -470,7 +492,7 @@ impl LogEntryFetcher { } let tx = txs_hm[&log.transaction_index]; - if log.transaction_hash != Some(tx.hash) { + if log.transaction_hash.is_none() || log.transaction_hash != Some(tx.hash) { warn!( "log tx hash mismatch, log transaction {:?}, block transaction {:?}", log.transaction_hash, @@ -478,7 +500,9 @@ impl LogEntryFetcher { ); return Ok(progress); } - if log.transaction_index != tx.transaction_index { + if log.transaction_index.is_none() + || log.transaction_index != tx.transaction_index + { warn!( "log tx index mismatch, log tx index {:?}, block transaction index {:?}", log.transaction_index, @@ -565,6 +589,7 @@ async fn check_watch_process( progress_reset_history: &mut BTreeMap, watch_loop_wait_time_ms: u64, block_hash_cache: &Arc>>>, + provider: &Provider>, ) { let mut min_received_progress = None; while let Ok(v) = watch_progress_rx.try_recv() { @@ -626,7 +651,21 @@ async fn check_watch_process( tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await; } } else { - panic!("parent block {} expect exist", *progress - 1); + warn!( + "get block hash for block {} from RPC, assume there is no org", + *progress - 1 + ); + match provider.get_block(*progress - 1).await { + Ok(Some(v)) => { + v.hash.expect("parent block hash expect exist"); + } + Ok(None) => { + panic!("parent block {} expect exist", *progress - 1); + } + Err(e) => { + panic!("parent block {} expect exist, error {}", *progress - 1, e); + } + } } }; } diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 0055fb4..7412779 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use task_executor::{ShutdownReason, TaskExecutor}; +use thiserror::Error; use tokio::sync::broadcast; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{oneshot, RwLock}; @@ -25,6 +26,17 @@ const RETRY_WAIT_MS: u64 = 500; const BROADCAST_CHANNEL_CAPACITY: usize = 25000; const CATCH_UP_END_GAP: u64 = 10; +/// Errors while handle data +#[derive(Error, Debug)] +pub enum HandleDataError { + /// Sequence Error + #[error("transaction seq is great than expected, expect block number {0}")] + SeqError(u64), + /// Other Errors + #[error("{0}")] + CommonError(#[from] anyhow::Error), +} + #[derive(Clone, Debug)] pub enum LogSyncEvent { /// Chain reorg detected without any operation yet. @@ -189,13 +201,51 @@ impl LogSyncManager { } else { // Keep catching-up data until we are close to the latest height. loop { - log_sync_manager + // wait tx receipt is ready + if let Ok(Some(block)) = log_sync_manager + .log_fetcher + .provider() + .get_block_with_txs(finalized_block_number) + .await + { + if let Some(tx) = block.transactions.first() { + loop { + match log_sync_manager + .log_fetcher + .provider() + .get_transaction_receipt(tx.hash) + .await + { + Ok(Some(_)) => break, + _ => { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + } + } + } + } + + while let Err(e) = log_sync_manager .catch_up_data( executor_clone.clone(), start_block_number, finalized_block_number, ) - .await?; + .await + { + match e { + HandleDataError::SeqError(block_number) => { + warn!("seq error occurred, retry from {}", block_number); + start_block_number = block_number; + tokio::time::sleep(Duration::from_secs(1)).await; + } + _ => { + return Err(e.into()); + } + } + } + start_block_number = finalized_block_number.saturating_add(1); let new_finalized_block = @@ -214,6 +264,18 @@ impl LogSyncManager { warn!("catch_up_end send fails, possibly auto_sync is not enabled"); } + log_sync_manager + .log_fetcher + .start_remove_finalized_block_task( + &executor_clone, + log_sync_manager.store.clone(), + log_sync_manager.block_hash_cache.clone(), + log_sync_manager.config.default_finalized_block_count, + log_sync_manager + .config + .remove_finalized_block_interval_minutes, + ); + let (watch_progress_tx, watch_progress_rx) = tokio::sync::mpsc::unbounded_channel(); let watch_rx = log_sync_manager.log_fetcher.start_watch( @@ -296,7 +358,7 @@ impl LogSyncManager { &mut self, mut rx: UnboundedReceiver, watch_progress_tx: &Option>, - ) -> Result<()> { + ) -> Result<(), HandleDataError> { let mut log_latest_block_number = if let Some(block_number) = self.store.get_log_latest_block_number()? { block_number @@ -362,13 +424,15 @@ impl LogSyncManager { } else { continue; } + } else { + return Err(HandleDataError::SeqError(log_latest_block_number)); } } } if stop { // Unexpected error. - bail!("log sync write error"); + return Err(anyhow!("log sync write error").into()); } if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) { // TODO: Do we need to wait until all receivers are initialized? @@ -447,7 +511,7 @@ impl LogSyncManager { executor_clone: TaskExecutor, start_block_number: u64, finalized_block_number: u64, - ) -> Result<()> { + ) -> Result<(), HandleDataError> { if start_block_number < finalized_block_number { let recover_rx = self.log_fetcher.start_recover( start_block_number, @@ -457,14 +521,6 @@ impl LogSyncManager { ); self.handle_data(recover_rx, &None).await?; } - - self.log_fetcher.start_remove_finalized_block_task( - &executor_clone, - self.store.clone(), - self.block_hash_cache.clone(), - self.config.default_finalized_block_count, - self.config.remove_finalized_block_interval_minutes, - ); Ok(()) } } @@ -489,6 +545,10 @@ async fn get_start_block_number_with_hash( .get(&block_number) { return Ok((block_number, val.block_hash)); + } else { + warn!("get block hash for block {} from RPC", block_number); + let block_hash = log_sync_manager.get_block(block_number.into()).await?.1; + return Ok((block_number, block_hash)); } }