mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Handle cases where the sequence is not continuous during the catch-up process
This commit is contained in:
parent
59d24b073d
commit
b4c30e2036
@ -384,6 +384,10 @@ impl LogEntryFetcher {
|
||||
);
|
||||
}
|
||||
|
||||
if block.logs_bloom.is_none() {
|
||||
bail!("block {:?} logs bloom is none", block.number);
|
||||
}
|
||||
|
||||
if from_block_number > 0 && block.parent_hash != parent_block_hash {
|
||||
// reorg happened
|
||||
let (parent_block_number, block_hash) = revert_one_block(
|
||||
@ -412,13 +416,22 @@ impl LogEntryFetcher {
|
||||
block.number
|
||||
);
|
||||
}
|
||||
if Some(block.parent_hash) != parent_block_hash {
|
||||
if parent_block_hash.is_none() || Some(block.parent_hash) != parent_block_hash {
|
||||
bail!(
|
||||
"parent block hash mismatch, expected {:?}, actual {}",
|
||||
parent_block_hash,
|
||||
block.parent_hash
|
||||
);
|
||||
}
|
||||
|
||||
if block_number == to_block_number && block.hash.is_none() {
|
||||
bail!("block {:?} hash is none", block.number);
|
||||
}
|
||||
|
||||
if block.logs_bloom.is_none() {
|
||||
bail!("block {:?} logs bloom is none", block.number);
|
||||
}
|
||||
|
||||
parent_block_hash = block.hash;
|
||||
blocks.insert(block_number, block);
|
||||
}
|
||||
@ -470,7 +483,7 @@ impl LogEntryFetcher {
|
||||
}
|
||||
|
||||
let tx = txs_hm[&log.transaction_index];
|
||||
if log.transaction_hash != Some(tx.hash) {
|
||||
if log.transaction_hash.is_none() || log.transaction_hash != Some(tx.hash) {
|
||||
warn!(
|
||||
"log tx hash mismatch, log transaction {:?}, block transaction {:?}",
|
||||
log.transaction_hash,
|
||||
@ -478,7 +491,9 @@ impl LogEntryFetcher {
|
||||
);
|
||||
return Ok(progress);
|
||||
}
|
||||
if log.transaction_index != tx.transaction_index {
|
||||
if log.transaction_index.is_none()
|
||||
|| log.transaction_index != tx.transaction_index
|
||||
{
|
||||
warn!(
|
||||
"log tx index mismatch, log tx index {:?}, block transaction index {:?}",
|
||||
log.transaction_index,
|
||||
|
@ -14,6 +14,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::{ShutdownReason, TaskExecutor};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
@ -25,6 +26,17 @@ const RETRY_WAIT_MS: u64 = 500;
|
||||
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
||||
const CATCH_UP_END_GAP: u64 = 10;
|
||||
|
||||
/// Errors while handle data
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HandleDataError {
|
||||
/// Sequence Error
|
||||
#[error("transaction seq is great than expected, expect block number {0}")]
|
||||
SeqError(u64),
|
||||
/// Other Errors
|
||||
#[error("{0}")]
|
||||
CommonError(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum LogSyncEvent {
|
||||
/// Chain reorg detected without any operation yet.
|
||||
@ -189,13 +201,51 @@ impl LogSyncManager {
|
||||
} else {
|
||||
// Keep catching-up data until we are close to the latest height.
|
||||
loop {
|
||||
log_sync_manager
|
||||
// wait tx receipt is ready
|
||||
if let Ok(Some(block)) = log_sync_manager
|
||||
.log_fetcher
|
||||
.provider()
|
||||
.get_block_with_txs(finalized_block_number)
|
||||
.await
|
||||
{
|
||||
if let Some(tx) = block.transactions.first() {
|
||||
loop {
|
||||
match log_sync_manager
|
||||
.log_fetcher
|
||||
.provider()
|
||||
.get_transaction_receipt(tx.hash)
|
||||
.await
|
||||
{
|
||||
Ok(Some(_)) => break,
|
||||
_ => {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Err(e) = log_sync_manager
|
||||
.catch_up_data(
|
||||
executor_clone.clone(),
|
||||
start_block_number,
|
||||
finalized_block_number,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
HandleDataError::SeqError(block_number) => {
|
||||
warn!("seq error occurred, retry from {}", block_number);
|
||||
start_block_number = block_number;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
_ => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_block_number = finalized_block_number.saturating_add(1);
|
||||
|
||||
let new_finalized_block =
|
||||
@ -296,7 +346,7 @@ impl LogSyncManager {
|
||||
&mut self,
|
||||
mut rx: UnboundedReceiver<LogFetchProgress>,
|
||||
watch_progress_tx: &Option<UnboundedSender<u64>>,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), HandleDataError> {
|
||||
let mut log_latest_block_number =
|
||||
if let Some(block_number) = self.store.get_log_latest_block_number()? {
|
||||
block_number
|
||||
@ -362,13 +412,15 @@ impl LogSyncManager {
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
return Err(HandleDataError::SeqError(log_latest_block_number));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if stop {
|
||||
// Unexpected error.
|
||||
bail!("log sync write error");
|
||||
return Err(anyhow!("log sync write error").into());
|
||||
}
|
||||
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
|
||||
// TODO: Do we need to wait until all receivers are initialized?
|
||||
@ -447,7 +499,7 @@ impl LogSyncManager {
|
||||
executor_clone: TaskExecutor,
|
||||
start_block_number: u64,
|
||||
finalized_block_number: u64,
|
||||
) -> Result<()> {
|
||||
) -> Result<(), HandleDataError> {
|
||||
if start_block_number < finalized_block_number {
|
||||
let recover_rx = self.log_fetcher.start_recover(
|
||||
start_block_number,
|
||||
|
Loading…
Reference in New Issue
Block a user