Handle cases where sequence is not continuous during catch-up (#199)

* Handle cases where the sequence is not continuous during the catch-up process

* get block hash from rpc if not found in local cache
This commit is contained in:
Joel Liu 2024-09-14 09:05:11 +08:00 committed by GitHub
parent a9f5169c15
commit f878a4849c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 133 additions and 34 deletions

View File

@ -145,6 +145,15 @@ impl LogEntryFetcher {
} }
}; };
let log_latest_block_number = match store.get_log_latest_block_number() {
Ok(Some(b)) => b,
Ok(None) => 0,
Err(e) => {
error!("get log latest block number error: e={:?}", e);
0
}
};
if let Some(processed_block_number) = processed_block_number { if let Some(processed_block_number) = processed_block_number {
let finalized_block_number = let finalized_block_number =
match provider.get_block(BlockNumber::Finalized).await { match provider.get_block(BlockNumber::Finalized).await {
@ -168,10 +177,13 @@ impl LogEntryFetcher {
}; };
if let Some(finalized_block_number) = finalized_block_number { if let Some(finalized_block_number) = finalized_block_number {
if processed_block_number >= finalized_block_number { let safe_block_number = std::cmp::min(
std::cmp::min(log_latest_block_number, finalized_block_number),
processed_block_number,
);
let mut pending_keys = vec![]; let mut pending_keys = vec![];
for (key, _) in block_hash_cache.read().await.iter() { for (key, _) in block_hash_cache.read().await.iter() {
if *key < finalized_block_number { if *key < safe_block_number {
pending_keys.push(*key); pending_keys.push(*key);
} else { } else {
break; break;
@ -180,17 +192,13 @@ impl LogEntryFetcher {
for key in pending_keys.into_iter() { for key in pending_keys.into_iter() {
if let Err(e) = store.delete_block_hash_by_number(key) { if let Err(e) = store.delete_block_hash_by_number(key) {
error!( error!("remove block tx for number {} error: e={:?}", key, e);
"remove block tx for number {} error: e={:?}",
key, e
);
} else { } else {
block_hash_cache.write().await.remove(&key); block_hash_cache.write().await.remove(&key);
} }
} }
} }
} }
}
tokio::time::sleep(Duration::from_secs( tokio::time::sleep(Duration::from_secs(
60 * remove_finalized_block_interval_minutes, 60 * remove_finalized_block_interval_minutes,
@ -313,6 +321,7 @@ impl LogEntryFetcher {
&mut progress_reset_history, &mut progress_reset_history,
watch_loop_wait_time_ms, watch_loop_wait_time_ms,
&block_hash_cache, &block_hash_cache,
provider.as_ref(),
) )
.await; .await;
@ -384,6 +393,10 @@ impl LogEntryFetcher {
); );
} }
if block.logs_bloom.is_none() {
bail!("block {:?} logs bloom is none", block.number);
}
if from_block_number > 0 && block.parent_hash != parent_block_hash { if from_block_number > 0 && block.parent_hash != parent_block_hash {
// reorg happened // reorg happened
let (parent_block_number, block_hash) = revert_one_block( let (parent_block_number, block_hash) = revert_one_block(
@ -412,13 +425,22 @@ impl LogEntryFetcher {
block.number block.number
); );
} }
if Some(block.parent_hash) != parent_block_hash { if parent_block_hash.is_none() || Some(block.parent_hash) != parent_block_hash {
bail!( bail!(
"parent block hash mismatch, expected {:?}, actual {}", "parent block hash mismatch, expected {:?}, actual {}",
parent_block_hash, parent_block_hash,
block.parent_hash block.parent_hash
); );
} }
if block_number == to_block_number && block.hash.is_none() {
bail!("block {:?} hash is none", block.number);
}
if block.logs_bloom.is_none() {
bail!("block {:?} logs bloom is none", block.number);
}
parent_block_hash = block.hash; parent_block_hash = block.hash;
blocks.insert(block_number, block); blocks.insert(block_number, block);
} }
@ -470,7 +492,7 @@ impl LogEntryFetcher {
} }
let tx = txs_hm[&log.transaction_index]; let tx = txs_hm[&log.transaction_index];
if log.transaction_hash != Some(tx.hash) { if log.transaction_hash.is_none() || log.transaction_hash != Some(tx.hash) {
warn!( warn!(
"log tx hash mismatch, log transaction {:?}, block transaction {:?}", "log tx hash mismatch, log transaction {:?}, block transaction {:?}",
log.transaction_hash, log.transaction_hash,
@ -478,7 +500,9 @@ impl LogEntryFetcher {
); );
return Ok(progress); return Ok(progress);
} }
if log.transaction_index != tx.transaction_index { if log.transaction_index.is_none()
|| log.transaction_index != tx.transaction_index
{
warn!( warn!(
"log tx index mismatch, log tx index {:?}, block transaction index {:?}", "log tx index mismatch, log tx index {:?}, block transaction index {:?}",
log.transaction_index, log.transaction_index,
@ -565,6 +589,7 @@ async fn check_watch_process(
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>, progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
watch_loop_wait_time_ms: u64, watch_loop_wait_time_ms: u64,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>, block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
provider: &Provider<RetryClient<Http>>,
) { ) {
let mut min_received_progress = None; let mut min_received_progress = None;
while let Ok(v) = watch_progress_rx.try_recv() { while let Ok(v) = watch_progress_rx.try_recv() {
@ -626,8 +651,22 @@ async fn check_watch_process(
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await; tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
} }
} else { } else {
warn!(
"get block hash for block {} from RPC, assume there is no org",
*progress - 1
);
match provider.get_block(*progress - 1).await {
Ok(Some(v)) => {
v.hash.expect("parent block hash expect exist");
}
Ok(None) => {
panic!("parent block {} expect exist", *progress - 1); panic!("parent block {} expect exist", *progress - 1);
} }
Err(e) => {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
}
}; };
} }

View File

@ -14,6 +14,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::{ShutdownReason, TaskExecutor}; use task_executor::{ShutdownReason, TaskExecutor};
use thiserror::Error;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
@ -25,6 +26,17 @@ const RETRY_WAIT_MS: u64 = 500;
const BROADCAST_CHANNEL_CAPACITY: usize = 25000; const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10; const CATCH_UP_END_GAP: u64 = 10;
/// Errors while handle data
#[derive(Error, Debug)]
pub enum HandleDataError {
/// Sequence Error
#[error("transaction seq is great than expected, expect block number {0}")]
SeqError(u64),
/// Other Errors
#[error("{0}")]
CommonError(#[from] anyhow::Error),
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum LogSyncEvent { pub enum LogSyncEvent {
/// Chain reorg detected without any operation yet. /// Chain reorg detected without any operation yet.
@ -189,13 +201,51 @@ impl LogSyncManager {
} else { } else {
// Keep catching-up data until we are close to the latest height. // Keep catching-up data until we are close to the latest height.
loop { loop {
log_sync_manager // wait tx receipt is ready
if let Ok(Some(block)) = log_sync_manager
.log_fetcher
.provider()
.get_block_with_txs(finalized_block_number)
.await
{
if let Some(tx) = block.transactions.first() {
loop {
match log_sync_manager
.log_fetcher
.provider()
.get_transaction_receipt(tx.hash)
.await
{
Ok(Some(_)) => break,
_ => {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
}
}
while let Err(e) = log_sync_manager
.catch_up_data( .catch_up_data(
executor_clone.clone(), executor_clone.clone(),
start_block_number, start_block_number,
finalized_block_number, finalized_block_number,
) )
.await?; .await
{
match e {
HandleDataError::SeqError(block_number) => {
warn!("seq error occurred, retry from {}", block_number);
start_block_number = block_number;
tokio::time::sleep(Duration::from_secs(1)).await;
}
_ => {
return Err(e.into());
}
}
}
start_block_number = finalized_block_number.saturating_add(1); start_block_number = finalized_block_number.saturating_add(1);
let new_finalized_block = let new_finalized_block =
@ -214,6 +264,18 @@ impl LogSyncManager {
warn!("catch_up_end send fails, possibly auto_sync is not enabled"); warn!("catch_up_end send fails, possibly auto_sync is not enabled");
} }
log_sync_manager
.log_fetcher
.start_remove_finalized_block_task(
&executor_clone,
log_sync_manager.store.clone(),
log_sync_manager.block_hash_cache.clone(),
log_sync_manager.config.default_finalized_block_count,
log_sync_manager
.config
.remove_finalized_block_interval_minutes,
);
let (watch_progress_tx, watch_progress_rx) = let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
let watch_rx = log_sync_manager.log_fetcher.start_watch( let watch_rx = log_sync_manager.log_fetcher.start_watch(
@ -296,7 +358,7 @@ impl LogSyncManager {
&mut self, &mut self,
mut rx: UnboundedReceiver<LogFetchProgress>, mut rx: UnboundedReceiver<LogFetchProgress>,
watch_progress_tx: &Option<UnboundedSender<u64>>, watch_progress_tx: &Option<UnboundedSender<u64>>,
) -> Result<()> { ) -> Result<(), HandleDataError> {
let mut log_latest_block_number = let mut log_latest_block_number =
if let Some(block_number) = self.store.get_log_latest_block_number()? { if let Some(block_number) = self.store.get_log_latest_block_number()? {
block_number block_number
@ -362,13 +424,15 @@ impl LogSyncManager {
} else { } else {
continue; continue;
} }
} else {
return Err(HandleDataError::SeqError(log_latest_block_number));
} }
} }
} }
if stop { if stop {
// Unexpected error. // Unexpected error.
bail!("log sync write error"); return Err(anyhow!("log sync write error").into());
} }
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) { if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
// TODO: Do we need to wait until all receivers are initialized? // TODO: Do we need to wait until all receivers are initialized?
@ -447,7 +511,7 @@ impl LogSyncManager {
executor_clone: TaskExecutor, executor_clone: TaskExecutor,
start_block_number: u64, start_block_number: u64,
finalized_block_number: u64, finalized_block_number: u64,
) -> Result<()> { ) -> Result<(), HandleDataError> {
if start_block_number < finalized_block_number { if start_block_number < finalized_block_number {
let recover_rx = self.log_fetcher.start_recover( let recover_rx = self.log_fetcher.start_recover(
start_block_number, start_block_number,
@ -457,14 +521,6 @@ impl LogSyncManager {
); );
self.handle_data(recover_rx, &None).await?; self.handle_data(recover_rx, &None).await?;
} }
self.log_fetcher.start_remove_finalized_block_task(
&executor_clone,
self.store.clone(),
self.block_hash_cache.clone(),
self.config.default_finalized_block_count,
self.config.remove_finalized_block_interval_minutes,
);
Ok(()) Ok(())
} }
} }
@ -489,6 +545,10 @@ async fn get_start_block_number_with_hash(
.get(&block_number) .get(&block_number)
{ {
return Ok((block_number, val.block_hash)); return Ok((block_number, val.block_hash));
} else {
warn!("get block hash for block {} from RPC", block_number);
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
return Ok((block_number, block_hash));
} }
} }