Wait SyncedBlock to be processed for revert block (#45)

* wait SyncedBlock to be processed

* remove retry counter

* use default parent block hash for missing case
This commit is contained in:
Joel Liu 2024-04-11 18:13:52 +08:00 committed by GitHub
parent bd4ebee2da
commit 6c1b0b35ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 24 deletions

View File

@ -61,7 +61,7 @@ impl LogEntryFetcher {
block_number: u64,
block_hash: H256,
executor: &TaskExecutor,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
) -> UnboundedReceiver<LogFetchProgress> {
let (reorg_tx, reorg_rx) = tokio::sync::mpsc::unbounded_channel();
let provider = self.provider.clone();
@ -93,6 +93,7 @@ impl LogEntryFetcher {
block_number,
&reorg_tx,
&block_hash_cache,
provider.as_ref(),
)
.await
{
@ -122,7 +123,7 @@ impl LogEntryFetcher {
&self,
executor: &TaskExecutor,
store: Arc<RwLock<dyn Store>>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
default_finalized_block_count: u64,
remove_finalized_block_interval_minutes: u64,
) {
@ -167,7 +168,7 @@ impl LogEntryFetcher {
if processed_block_number >= finalized_block_number {
let mut pending_keys = vec![];
for (key, _) in block_hash_cache.read().await.iter() {
if *key <= finalized_block_number {
if *key < finalized_block_number {
pending_keys.push(*key);
} else {
break;
@ -284,7 +285,7 @@ impl LogEntryFetcher {
start_block_number: u64,
parent_block_hash: H256,
executor: &TaskExecutor,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
watch_loop_wait_time_ms: u64,
) -> UnboundedReceiver<LogFetchProgress> {
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
@ -340,7 +341,7 @@ impl LogEntryFetcher {
watch_tx: &UnboundedSender<LogFetchProgress>,
confirmation_delay: u64,
contract: &ZgsFlow<Provider<RetryClient<Http>>>,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
) -> Result<Option<(u64, H256, Option<Option<u64>>)>> {
let latest_block_number = provider.get_block_number().await?.as_u64();
debug!(
@ -371,6 +372,7 @@ impl LogEntryFetcher {
from_block_number.saturating_sub(1),
watch_tx,
block_hash_cache,
provider,
)
.await?;
return Ok(Some((parent_block_number, block_hash, None)));
@ -500,6 +502,8 @@ impl LogEntryFetcher {
if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
warn!("send LogFetchProgress failed: {:?}", e);
return Ok(progress);
} else {
block_hash_cache.write().await.insert(p.0, None);
}
}
for log in log_events.into_iter() {
@ -524,15 +528,25 @@ async fn revert_one_block(
block_hash: H256,
block_number: u64,
watch_tx: &UnboundedSender<LogFetchProgress>,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
provider: &Provider<RetryClient<Http>>,
) -> Result<(u64, H256), anyhow::Error> {
debug!("revert block {}, block hash {:?}", block_number, block_hash);
let block = block_hash_cache
.read()
.await
.get(&block_number)
.ok_or_else(|| anyhow!("None for block {}", block_number))?
.clone();
let block = loop {
if let Some(block) = block_hash_cache.read().await.get(&block_number) {
if let Some(v) = block {
break v.clone();
} else {
debug!(
"block_hash_cache wait for SyncedBlock processed for {}",
block_number
);
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
}
} else {
return Err(anyhow!("None for block {}", block_number));
}
};
assert!(block_hash == block.block_hash);
if let Some(reverted) = block.first_submission_index {
@ -540,13 +554,18 @@ async fn revert_one_block(
}
let parent_block_number = block_number.saturating_sub(1);
let parent_block_hash = block_hash_cache
.read()
.await
.get(&parent_block_number)
let parent_block_hash = match block_hash_cache.read().await.get(&parent_block_number) {
Some(v) => v.clone().as_ref().unwrap().block_hash,
_ => {
debug!("assume parent block {} is not reorged", parent_block_number);
provider
.get_block(parent_block_number)
.await?
.ok_or_else(|| anyhow!("None for block {}", parent_block_number))?
.clone()
.block_hash;
.hash
.ok_or_else(|| anyhow!("None block hash for block {}", parent_block_number))?
}
};
let synced_block =
LogFetchProgress::SyncedBlock((parent_block_number, parent_block_hash, None));

View File

@ -41,7 +41,7 @@ pub struct LogSyncManager {
/// To broadcast events to handle in advance.
event_send: broadcast::Sender<LogSyncEvent>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
}
impl LogSyncManager {
@ -85,6 +85,7 @@ impl LogSyncManager {
.await
.get_block_hashes()?
.into_iter()
.map(|(x, y)| (x, Some(y)))
.collect::<BTreeMap<_, _>>(),
));
let mut log_sync_manager = Self {
@ -196,7 +197,7 @@ impl LogSyncManager {
.get(&start_block_number)
{
// special case avoid reorg
submission_idx = b.first_submission_index;
submission_idx = b.as_ref().unwrap().first_submission_index;
}
let parent_block_number = start_block_number.saturating_sub(1);
@ -206,7 +207,7 @@ impl LogSyncManager {
.await
.get(&parent_block_number)
{
Some(b) => b.block_hash,
Some(b) => b.as_ref().unwrap().block_hash,
_ => log_sync_manager
.log_fetcher
.provider()
@ -345,10 +346,10 @@ impl LogSyncManager {
if first_submission_index.is_some() {
self.block_hash_cache.write().await.insert(
block_number,
BlockHashAndSubmissionIndex {
Some(BlockHashAndSubmissionIndex {
block_hash,
first_submission_index: first_submission_index.unwrap(),
},
}),
);
}