Allow for retry attempts in the watch loop (#179)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run

* Allow for retry attempts in the watch loop
This commit is contained in:
Joel Liu 2024-09-06 15:08:13 +08:00 committed by GitHub
parent b6972b97af
commit 041f5f12b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 216 additions and 44 deletions

View File

@ -17,9 +17,12 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::{ use tokio::{
mpsc::{UnboundedReceiver, UnboundedSender}, sync::{
RwLock, mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
},
time::Instant,
}; };
pub struct LogEntryFetcher { pub struct LogEntryFetcher {
@ -249,7 +252,10 @@ impl LogEntryFetcher {
}) { }) {
Ok(event) => { Ok(event) => {
if let Err(e) = recover_tx if let Err(e) = recover_tx
.send(submission_event_to_transaction(event)) .send(submission_event_to_transaction(
event,
log.block_number.expect("block number exist").as_u64(),
))
.and_then(|_| match sync_progress { .and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b), Some(b) => recover_tx.send(b),
None => Ok(()), None => Ok(()),
@ -285,12 +291,14 @@ impl LogEntryFetcher {
executor: &TaskExecutor, executor: &TaskExecutor,
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>, block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
watch_loop_wait_time_ms: u64, watch_loop_wait_time_ms: u64,
mut watch_progress_rx: UnboundedReceiver<u64>,
) -> UnboundedReceiver<LogFetchProgress> { ) -> UnboundedReceiver<LogFetchProgress> {
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel(); let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, self.provider.clone()); let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
let provider = self.provider.clone(); let provider = self.provider.clone();
let confirmation_delay = self.confirmation_delay; let confirmation_delay = self.confirmation_delay;
let log_page_size = self.log_page_size; let log_page_size = self.log_page_size;
let mut progress_reset_history = BTreeMap::new();
executor.spawn( executor.spawn(
async move { async move {
debug!("start_watch starts, start={}", start_block_number); debug!("start_watch starts, start={}", start_block_number);
@ -298,6 +306,13 @@ impl LogEntryFetcher {
let mut parent_block_hash = parent_block_hash; let mut parent_block_hash = parent_block_hash;
loop { loop {
check_watch_process(
&mut watch_progress_rx,
&mut progress,
&mut progress_reset_history,
watch_loop_wait_time_ms,
);
match Self::watch_loop( match Self::watch_loop(
provider.as_ref(), provider.as_ref(),
progress, progress,
@ -489,8 +504,11 @@ impl LogEntryFetcher {
first_submission_index = Some(submit_filter.submission_index.as_u64()); first_submission_index = Some(submit_filter.submission_index.as_u64());
} }
log_events.push(submission_event_to_transaction(submit_filter)); log_events
.push(submission_event_to_transaction(submit_filter, block_number));
} }
info!("synced {} events", log_events.len());
} }
let new_progress = if block.hash.is_some() && block.number.is_some() { let new_progress = if block.hash.is_some() && block.number.is_some() {
@ -508,12 +526,21 @@ impl LogEntryFetcher {
return Ok(progress); return Ok(progress);
} }
} }
if let Some(p) = &new_progress { if let Some(p) = &new_progress {
if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) { if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e); warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e);
return Ok(progress); return Ok(progress);
} else { } else {
block_hash_cache.write().await.insert(p.0, None); let mut cache = block_hash_cache.write().await;
match cache.get(&p.0) {
Some(Some(v))
if v.block_hash == p.1
&& v.first_submission_index == p.2.unwrap() => {}
_ => {
cache.insert(p.0, None);
}
}
} }
} }
progress = new_progress; progress = new_progress;
@ -528,6 +555,59 @@ impl LogEntryFetcher {
} }
} }
fn check_watch_process(
watch_progress_rx: &mut UnboundedReceiver<u64>,
progress: &mut u64,
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
watch_loop_wait_time_ms: u64,
) {
let mut min_received_progress = None;
while let Ok(v) = watch_progress_rx.try_recv() {
min_received_progress = match min_received_progress {
Some(min) if min > v => Some(v),
None => Some(v),
_ => min_received_progress,
};
}
if let Some(v) = min_received_progress {
if *progress <= v {
error!(
"received unexpected progress, current {}, received {}",
*progress, v
);
return;
}
let now = Instant::now();
match progress_reset_history.get_mut(&v) {
Some((last_update, counter)) => {
if *counter >= 3 {
error!("maximum reset attempts have been reached.");
watch_progress_rx.close();
return;
}
if now.duration_since(*last_update)
>= Duration::from_millis(watch_loop_wait_time_ms * 30)
{
info!("reset to progress from {} to {}", *progress, v);
*progress = v;
*last_update = now;
*counter += 1;
}
}
None => {
info!("reset to progress from {} to {}", *progress, v);
*progress = v;
progress_reset_history.insert(v, (now, 1usize));
}
}
}
progress_reset_history.retain(|k, _| k + 1000 >= *progress);
}
async fn revert_one_block( async fn revert_one_block(
block_hash: H256, block_hash: H256,
block_number: u64, block_number: u64,
@ -581,26 +661,29 @@ async fn revert_one_block(
#[derive(Debug)] #[derive(Debug)]
pub enum LogFetchProgress { pub enum LogFetchProgress {
SyncedBlock((u64, H256, Option<Option<u64>>)), SyncedBlock((u64, H256, Option<Option<u64>>)),
Transaction(Transaction), Transaction((Transaction, u64)),
Reverted(u64), Reverted(u64),
} }
fn submission_event_to_transaction(e: SubmitFilter) -> LogFetchProgress { fn submission_event_to_transaction(e: SubmitFilter, block_number: u64) -> LogFetchProgress {
LogFetchProgress::Transaction(Transaction { LogFetchProgress::Transaction((
stream_ids: vec![], Transaction {
data: vec![], stream_ids: vec![],
data_merkle_root: nodes_to_root(&e.submission.nodes), data: vec![],
merkle_nodes: e data_merkle_root: nodes_to_root(&e.submission.nodes),
.submission merkle_nodes: e
.nodes .submission
.iter() .nodes
// the submission height is the height of the root node starting from height 0. .iter()
.map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into())) // the submission height is the height of the root node starting from height 0.
.collect(), .map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into()))
start_entry_index: e.start_pos.as_u64(), .collect(),
size: e.submission.length.as_u64(), start_entry_index: e.start_pos.as_u64(),
seq: e.submission_index.as_u64(), size: e.submission.length.as_u64(),
}) seq: e.submission_index.as_u64(),
},
block_number,
))
} }
fn nodes_to_root(node_list: &[SubmissionNode]) -> DataRoot { fn nodes_to_root(node_list: &[SubmissionNode]) -> DataRoot {

View File

@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store}; use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::{ShutdownReason, TaskExecutor}; use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
const RETRY_WAIT_MS: u64 = 500; const RETRY_WAIT_MS: u64 = 500;
@ -103,16 +103,7 @@ impl LogSyncManager {
}; };
let (mut start_block_number, mut start_block_hash) = let (mut start_block_number, mut start_block_hash) =
match log_sync_manager.store.get_sync_progress()? { get_start_block_number_with_hash(&log_sync_manager).await?;
// No previous progress, so just use config.
None => {
let block_number = log_sync_manager.config.start_block_number;
let block_hash =
log_sync_manager.get_block(block_number.into()).await?.1;
(block_number, block_hash)
}
Some((block_number, block_hash)) => (block_number, block_hash),
};
let (mut finalized_block_number, mut finalized_block_hash) = let (mut finalized_block_number, mut finalized_block_hash) =
match log_sync_manager.get_block(BlockNumber::Finalized).await { match log_sync_manager.get_block(BlockNumber::Finalized).await {
@ -146,7 +137,7 @@ impl LogSyncManager {
&executor_clone, &executor_clone,
log_sync_manager.block_hash_cache.clone(), log_sync_manager.block_hash_cache.clone(),
); );
log_sync_manager.handle_data(reorg_rx).await?; log_sync_manager.handle_data(reorg_rx, &None).await?;
if let Some((block_number, block_hash)) = if let Some((block_number, block_hash)) =
log_sync_manager.store.get_sync_progress()? log_sync_manager.store.get_sync_progress()?
{ {
@ -223,15 +214,20 @@ impl LogSyncManager {
warn!("catch_up_end send fails, possibly auto_sync is not enabled"); warn!("catch_up_end send fails, possibly auto_sync is not enabled");
} }
let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel();
let watch_rx = log_sync_manager.log_fetcher.start_watch( let watch_rx = log_sync_manager.log_fetcher.start_watch(
start_block_number, start_block_number,
parent_block_hash, parent_block_hash,
&executor_clone, &executor_clone,
log_sync_manager.block_hash_cache.clone(), log_sync_manager.block_hash_cache.clone(),
log_sync_manager.config.watch_loop_wait_time_ms, log_sync_manager.config.watch_loop_wait_time_ms,
watch_progress_rx,
); );
// Syncing `watch_rx` is supposed to block forever. // Syncing `watch_rx` is supposed to block forever.
log_sync_manager.handle_data(watch_rx).await?; log_sync_manager
.handle_data(watch_rx, &Some(watch_progress_tx))
.await?;
Ok::<(), anyhow::Error>(()) Ok::<(), anyhow::Error>(())
}, },
) )
@ -241,20 +237,20 @@ impl LogSyncManager {
Ok((event_send_cloned, catch_up_end_receiver)) Ok((event_send_cloned, catch_up_end_receiver))
} }
async fn put_tx(&mut self, tx: Transaction) -> bool { async fn put_tx(&mut self, tx: Transaction) -> Option<bool> {
// We call this after process chain reorg, so the sequence number should match. // We call this after process chain reorg, so the sequence number should match.
match tx.seq.cmp(&self.next_tx_seq) { match tx.seq.cmp(&self.next_tx_seq) {
std::cmp::Ordering::Less => true, std::cmp::Ordering::Less => Some(true),
std::cmp::Ordering::Equal => { std::cmp::Ordering::Equal => {
debug!("log entry sync get entry: {:?}", tx); debug!("log entry sync get entry: {:?}", tx);
self.put_tx_inner(tx).await Some(self.put_tx_inner(tx).await)
} }
std::cmp::Ordering::Greater => { std::cmp::Ordering::Greater => {
error!( error!(
"Unexpected transaction seq: next={} get={}", "Unexpected transaction seq: next={} get={}",
self.next_tx_seq, tx.seq self.next_tx_seq, tx.seq
); );
false None
} }
} }
} }
@ -296,7 +292,18 @@ impl LogSyncManager {
let _ = self.event_send.send(LogSyncEvent::Reverted { tx_seq }); let _ = self.event_send.send(LogSyncEvent::Reverted { tx_seq });
} }
async fn handle_data(&mut self, mut rx: UnboundedReceiver<LogFetchProgress>) -> Result<()> { async fn handle_data(
&mut self,
mut rx: UnboundedReceiver<LogFetchProgress>,
watch_progress_tx: &Option<UnboundedSender<u64>>,
) -> Result<()> {
let mut log_latest_block_number =
if let Some(block_number) = self.store.get_log_latest_block_number()? {
block_number
} else {
0
};
while let Some(data) = rx.recv().await { while let Some(data) = rx.recv().await {
trace!("handle_data: data={:?}", data); trace!("handle_data: data={:?}", data);
match data { match data {
@ -336,8 +343,30 @@ impl LogSyncManager {
} }
} }
} }
LogFetchProgress::Transaction(tx) => { LogFetchProgress::Transaction((tx, block_number)) => {
if !self.put_tx(tx.clone()).await { let mut stop = false;
match self.put_tx(tx.clone()).await {
Some(false) => stop = true,
Some(true) => {
if let Err(e) = self.store.put_log_latest_block_number(block_number) {
warn!("failed to put log latest block number, error={:?}", e);
}
log_latest_block_number = block_number;
}
_ => {
stop = true;
if let Some(progress_tx) = watch_progress_tx {
if let Err(e) = progress_tx.send(log_latest_block_number) {
error!("failed to send watch progress, error={:?}", e);
} else {
continue;
}
}
}
}
if stop {
// Unexpected error. // Unexpected error.
bail!("log sync write error"); bail!("log sync write error");
} }
@ -426,7 +455,7 @@ impl LogSyncManager {
&executor_clone, &executor_clone,
Duration::from_millis(self.config.recover_query_delay), Duration::from_millis(self.config.recover_query_delay),
); );
self.handle_data(recover_rx).await?; self.handle_data(recover_rx, &None).await?;
} }
self.log_fetcher.start_remove_finalized_block_task( self.log_fetcher.start_remove_finalized_block_task(
@ -440,6 +469,33 @@ impl LogSyncManager {
} }
} }
async fn get_start_block_number_with_hash(
log_sync_manager: &LogSyncManager,
) -> Result<(u64, H256), anyhow::Error> {
if let Some(block_number) = log_sync_manager.store.get_log_latest_block_number()? {
if let Some(Some(val)) = log_sync_manager
.block_hash_cache
.read()
.await
.get(&block_number)
{
return Ok((block_number, val.block_hash));
}
}
let (start_block_number, start_block_hash) = match log_sync_manager.store.get_sync_progress()? {
// No previous progress, so just use config.
None => {
let block_number = log_sync_manager.config.start_block_number;
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
(block_number, block_hash)
}
Some((block_number, block_hash)) => (block_number, block_hash),
};
Ok((start_block_number, start_block_hash))
}
async fn run_and_log<R, E>( async fn run_and_log<R, E>(
mut on_error: impl FnMut(), mut on_error: impl FnMut(),
f: impl Future<Output = std::result::Result<R, E>> + Send, f: impl Future<Output = std::result::Result<R, E>> + Send,

View File

@ -345,6 +345,10 @@ impl LogStoreWrite for LogManager {
self.tx_store.put_progress(progress) self.tx_store.put_progress(progress)
} }
fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
self.tx_store.put_log_latest_block_number(block_number)
}
/// Return the reverted Transactions in order. /// Return the reverted Transactions in order.
/// `tx_seq == u64::MAX` is a special case for reverting all transactions. /// `tx_seq == u64::MAX` is a special case for reverting all transactions.
fn revert_to(&self, tx_seq: u64) -> Result<Vec<Transaction>> { fn revert_to(&self, tx_seq: u64) -> Result<Vec<Transaction>> {
@ -534,6 +538,10 @@ impl LogStoreRead for LogManager {
self.tx_store.get_progress() self.tx_store.get_progress()
} }
fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
self.tx_store.get_log_latest_block_number()
}
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>> { fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>> {
self.tx_store.get_block_hash_by_number(block_number) self.tx_store.get_block_hash_by_number(block_number)
} }

View File

@ -59,6 +59,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>; fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
fn get_log_latest_block_number(&self) -> Result<Option<u64>>;
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>>; fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>>;
fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>>; fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>>;
@ -126,6 +128,9 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
/// Store the progress of synced block number and its hash. /// Store the progress of synced block number and its hash.
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>; fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
/// Store the latest block number which has log
fn put_log_latest_block_number(&self, block_number: u64) -> Result<()>;
/// Revert the log state to a given tx seq. /// Revert the log state to a given tx seq.
/// This is needed when transactions are reverted because of chain reorg. /// This is needed when transactions are reverted because of chain reorg.
/// ///

View File

@ -19,6 +19,7 @@ use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq"; const NEXT_TX_KEY: &str = "next_tx_seq";
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
const TX_STATUS_FINALIZED: u8 = 0; const TX_STATUS_FINALIZED: u8 = 0;
const TX_STATUS_PRUNED: u8 = 1; const TX_STATUS_PRUNED: u8 = 1;
@ -214,6 +215,25 @@ impl TransactionStore {
)) ))
} }
#[instrument(skip(self))]
pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
Ok(self.kvdb.put(
COL_MISC,
LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(),
&block_number.as_ssz_bytes(),
)?)
}
#[instrument(skip(self))]
pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
Ok(Some(
<u64>::from_ssz_bytes(&try_option!(self
.kvdb
.get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?))
.map_err(Error::from)?,
))
}
pub fn get_block_hash_by_number( pub fn get_block_hash_by_number(
&self, &self,
block_number: u64, block_number: u64,