query getLogs from batch blocks (#37)

* query getLogs from batch blocks

* resolve comments
This commit is contained in:
Joel Liu 2024-04-07 11:27:18 +08:00 committed by GitHub
parent 72e3e0a019
commit b8a59e9222
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 154 additions and 79 deletions

View File

@ -53,4 +53,4 @@ jobs:
- name: Run tests
run: |
cd tests
python test_all.py --max-workers 3
python test_all.py

View File

@ -7,7 +7,7 @@ use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
use ethers::abi::RawLog;
use ethers::prelude::{BlockNumber, EthLogDecode, Http, Middleware, Provider};
use ethers::providers::{HttpRateLimitRetryPolicy, RetryClient, RetryClientBuilder};
use ethers::types::H256;
use ethers::types::{Block, Log, H256};
use futures::StreamExt;
use jsonrpsee::tracing::{debug, error, info, warn};
use shared_types::{DataRoot, Transaction};
@ -335,7 +335,7 @@ impl LogEntryFetcher {
#[allow(clippy::too_many_arguments)]
async fn watch_loop(
provider: &Provider<RetryClient<Http>>,
block_number: u64,
from_block_number: u64,
parent_block_hash: H256,
watch_tx: &UnboundedSender<LogFetchProgress>,
confirmation_delay: u64,
@ -344,22 +344,31 @@ impl LogEntryFetcher {
) -> Result<Option<(u64, H256, Option<Option<u64>>)>> {
let latest_block_number = provider.get_block_number().await?.as_u64();
debug!(
"block number {}, latest block number {}, confirmation_delay {}",
block_number, latest_block_number, confirmation_delay
"from block number {}, latest block number {}, confirmation delay {}",
from_block_number, latest_block_number, confirmation_delay
);
if block_number > latest_block_number.saturating_sub(confirmation_delay) {
let to_block_number = latest_block_number.saturating_sub(confirmation_delay);
if from_block_number > to_block_number {
return Ok(None);
}
let block = provider
.get_block_with_txs(block_number)
.get_block_with_txs(from_block_number)
.await?
.ok_or_else(|| anyhow!("None for block {}", block_number))?;
if block_number > 0 && block.parent_hash != parent_block_hash {
.ok_or_else(|| anyhow!("None for block {}", from_block_number))?;
if Some(from_block_number.into()) != block.number {
bail!(
"block number mismatch, expected {}, actual {:?}",
from_block_number,
block.number
);
}
if from_block_number > 0 && block.parent_hash != parent_block_hash {
// reorg happened
let (parent_block_number, block_hash) = revert_one_block(
parent_block_hash,
block_number.saturating_sub(1),
from_block_number.saturating_sub(1),
watch_tx,
block_hash_cache,
)
@ -367,67 +376,118 @@ impl LogEntryFetcher {
return Ok(Some((parent_block_number, block_hash, None)));
}
let mut blocks: HashMap<u64, Block<ethers::types::Transaction>> = Default::default();
let mut parent_block_hash = block.hash;
blocks.insert(from_block_number, block);
for block_number in from_block_number + 1..to_block_number + 1 {
let block = provider
.get_block_with_txs(block_number)
.await?
.ok_or_else(|| anyhow!("None for block {}", block_number))?;
if Some(block_number.into()) != block.number {
bail!(
"block number mismatch, expected {}, actual {:?}",
block_number,
block.number
);
}
if Some(block.parent_hash) != parent_block_hash {
bail!(
"parent block hash mismatch, expected {:?}, actual {}",
parent_block_hash,
block.parent_hash
);
}
parent_block_hash = block.hash;
blocks.insert(block_number, block);
}
let filter = contract
.submit_filter()
.from_block(from_block_number)
.to_block(to_block_number)
.address(contract.address().into())
.filter;
let mut block_logs: BTreeMap<u64, Vec<Log>> = BTreeMap::new();
for log in provider.get_logs(&filter).await? {
let block_number = log
.block_number
.ok_or_else(|| anyhow!("block number missing"))?
.as_u64();
block_logs.entry(block_number).or_default().push(log);
}
let mut progress = None;
for block_number in from_block_number..to_block_number + 1 {
if let Some(block) = blocks.remove(&block_number) {
let txs_hm = block
.transactions
.iter()
.map(|tx| (tx.transaction_index, tx))
.collect::<HashMap<_, _>>();
let filter = contract
.submit_filter()
.from_block(block_number)
.to_block(block_number)
.address(contract.address().into())
.filter;
let mut logs = vec![];
let mut log_events = vec![];
let mut first_submission_index = None;
for log in provider.get_logs(&filter).await? {
if let Some(logs) = block_logs.remove(&block_number) {
for log in logs.into_iter() {
if log.block_hash != block.hash {
bail!(
warn!(
"log block hash mismatch, log block hash {:?}, block hash {:?}",
log.block_hash,
block.hash
log.block_hash, block.hash
);
return Ok(progress);
}
if log.block_number != block.number {
bail!(
warn!(
"log block num mismatch, log block number {:?}, block number {:?}",
log.block_number,
block.number
log.block_number, block.number
);
return Ok(progress);
}
let tx = txs_hm[&log.transaction_index];
if log.transaction_hash != Some(tx.hash) {
bail!(
warn!(
"log tx hash mismatch, log transaction {:?}, block transaction {:?}",
log.transaction_hash,
tx.hash
);
return Ok(progress);
}
if log.transaction_index != tx.transaction_index {
bail!(
warn!(
"log tx index mismatch, log tx index {:?}, block transaction index {:?}",
log.transaction_index,
tx.transaction_index
);
return Ok(progress);
}
let tx = SubmitFilter::decode_log(&RawLog {
let submit_filter = match SubmitFilter::decode_log(&RawLog {
topics: log.topics,
data: log.data.to_vec(),
})?;
}) {
Ok(v) => v,
Err(e) => {
return {
warn!("decode log failed: {:?}", e);
Ok(progress)
}
}
};
if first_submission_index.is_none()
|| first_submission_index > Some(tx.submission_index.as_u64())
|| first_submission_index
> Some(submit_filter.submission_index.as_u64())
{
first_submission_index = Some(tx.submission_index.as_u64());
first_submission_index = Some(submit_filter.submission_index.as_u64());
}
logs.push(submission_event_to_transaction(tx));
log_events.push(submission_event_to_transaction(submit_filter));
}
}
let progress = if block.hash.is_some() && block.number.is_some() {
let new_progress = if block.hash.is_some() && block.number.is_some() {
Some((
block.number.unwrap().as_u64(),
block.hash.unwrap(),
@ -436,12 +496,20 @@ impl LogEntryFetcher {
} else {
None
};
if let Some(p) = &progress {
watch_tx.send(LogFetchProgress::SyncedBlock(*p))?;
if let Some(p) = &new_progress {
if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
warn!("send LogFetchProgress failed: {:?}", e);
return Ok(progress);
}
}
for log in log_events.into_iter() {
if let Err(e) = watch_tx.send(log) {
warn!("send log failed: {:?}", e);
return Ok(progress);
}
}
progress = new_progress;
}
for log in logs.into_iter() {
watch_tx.send(log)?;
}
Ok(progress)

View File

@ -189,6 +189,16 @@ impl LogSyncManager {
let mut submission_idx = None;
let parent_block_hash = if start_block_number >= finalized_block_number {
if start_block_number > 0 {
if let Some(b) = log_sync_manager
.block_hash_cache
.read()
.await
.get(&start_block_number)
{
// special case avoid reorg
submission_idx = b.first_submission_index;
}
let parent_block_number = start_block_number.saturating_sub(1);
match log_sync_manager
.block_hash_cache
@ -196,10 +206,7 @@ impl LogSyncManager {
.await
.get(&parent_block_number)
{
Some(b) => {
submission_idx = b.first_submission_index; // special case avoid reorg
b.block_hash
}
Some(b) => b.block_hash,
_ => log_sync_manager
.log_fetcher
.provider()

View File

@ -36,7 +36,7 @@ build_config! {
(default_finalized_block_count, (u64), 100)
(remove_finalized_block_interval_minutes, (u64), 30)
(watch_loop_wait_time_ms, (u64), 50)
(watch_loop_wait_time_ms, (u64), 500)
// rpc
(rpc_enabled, (bool), true)