From e53743bfbe347b79d17dafaa0c86ec7ee0993fea Mon Sep 17 00:00:00 2001 From: Joel Date: Tue, 3 Sep 2024 17:17:22 +0800 Subject: [PATCH] fix test --- .../src/sync_manager/log_entry_fetcher.rs | 80 +++++++++++-------- node/log_entry_sync/src/sync_manager/mod.rs | 66 ++++++++++----- node/storage/src/log_store/log_manager.rs | 8 ++ node/storage/src/log_store/mod.rs | 5 ++ node/storage/src/log_store/tx_store.rs | 20 +++++ 5 files changed, 127 insertions(+), 52 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index ff68f63..c66c616 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -252,7 +252,10 @@ impl LogEntryFetcher { }) { Ok(event) => { if let Err(e) = recover_tx - .send(submission_event_to_transaction(event)) + .send(submission_event_to_transaction( + event, + log.block_number.expect("block number exist").as_u64(), + )) .and_then(|_| match sync_progress { Some(b) => recover_tx.send(b), None => Ok(()), @@ -501,8 +504,11 @@ impl LogEntryFetcher { first_submission_index = Some(submit_filter.submission_index.as_u64()); } - log_events.push(submission_event_to_transaction(submit_filter)); + log_events + .push(submission_event_to_transaction(submit_filter, block_number)); } + + info!("synced {} events", log_events.len()); } let new_progress = if block.hash.is_some() && block.number.is_some() { @@ -514,21 +520,26 @@ impl LogEntryFetcher { } else { None }; - - if !log_events.is_empty() { - info!("synced {} events", log_events.len()); - for log in log_events.into_iter() { - if let Err(e) = watch_tx.send(log) { - warn!("send LogFetchProgress::Transaction failed: {:?}", e); - return Ok(progress); - } + for log in log_events.into_iter() { + if let Err(e) = watch_tx.send(log) { + warn!("send LogFetchProgress::Transaction failed: {:?}", e); + return Ok(progress); } - if let Some(p) = &new_progress { - if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) { - warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e); - return Ok(progress); - } else { - block_hash_cache.write().await.insert(p.0, None); + } + + if let Some(p) = &new_progress { + if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) { + warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e); + return Ok(progress); + } else { + let mut cache = block_hash_cache.write().await; + match cache.get(&p.0) { + Some(Some(v)) + if v.block_hash == p.1 + && v.first_submission_index == p.2.unwrap() => {} + _ => { + cache.insert(p.0, None); + } } } } @@ -650,26 +661,29 @@ async fn revert_one_block( #[derive(Debug)] pub enum LogFetchProgress { SyncedBlock((u64, H256, Option>)), - Transaction(Transaction), + Transaction((Transaction, u64)), Reverted(u64), } -fn submission_event_to_transaction(e: SubmitFilter) -> LogFetchProgress { - LogFetchProgress::Transaction(Transaction { - stream_ids: vec![], - data: vec![], - data_merkle_root: nodes_to_root(&e.submission.nodes), - merkle_nodes: e - .submission - .nodes - .iter() - // the submission height is the height of the root node starting from height 0. - .map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into())) - .collect(), - start_entry_index: e.start_pos.as_u64(), - size: e.submission.length.as_u64(), - seq: e.submission_index.as_u64(), - }) +fn submission_event_to_transaction(e: SubmitFilter, block_number: u64) -> LogFetchProgress { + LogFetchProgress::Transaction(( + Transaction { + stream_ids: vec![], + data: vec![], + data_merkle_root: nodes_to_root(&e.submission.nodes), + merkle_nodes: e + .submission + .nodes + .iter() + // the submission height is the height of the root node starting from height 0. + .map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into())) + .collect(), + start_entry_index: e.start_pos.as_u64(), + size: e.submission.length.as_u64(), + seq: e.submission_index.as_u64(), + }, + block_number, + )) } fn nodes_to_root(node_list: &[SubmissionNode]) -> DataRoot { diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 08968af..48b59f8 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -103,16 +103,7 @@ impl LogSyncManager { }; let (mut start_block_number, mut start_block_hash) = - match log_sync_manager.store.get_sync_progress()? { - // No previous progress, so just use config. - None => { - let block_number = log_sync_manager.config.start_block_number; - let block_hash = - log_sync_manager.get_block(block_number.into()).await?.1; - (block_number, block_hash) - } - Some((block_number, block_hash)) => (block_number, block_hash), - }; + get_start_block_number_with_hash(&log_sync_manager).await?; let (mut finalized_block_number, mut finalized_block_hash) = match log_sync_manager.get_block(BlockNumber::Finalized).await { @@ -306,6 +297,13 @@ impl LogSyncManager { mut rx: UnboundedReceiver, watch_progress_tx: &Option>, ) -> Result<()> { + let mut log_latest_block_number = + if let Some(block_number) = self.store.get_log_latest_block_number()? { + block_number + } else { + 0 + }; + while let Some(data) = rx.recv().await { trace!("handle_data: data={:?}", data); match data { @@ -345,21 +343,24 @@ impl LogSyncManager { } } } - LogFetchProgress::Transaction(tx) => { + LogFetchProgress::Transaction((tx, block_number)) => { let mut stop = false; match self.put_tx(tx.clone()).await { Some(false) => stop = true, - Some(true) => {} + Some(true) => { + if let Err(e) = self.store.put_log_latest_block_number(block_number) { + warn!("failed to put log latest block number, error={:?}", e); + } + + log_latest_block_number = block_number; + } _ => { stop = true; - if let Some(progress_tx) = watch_progress_tx { - if let Some((block_number, _)) = self.store.get_sync_progress()? { - if let Err(e) = progress_tx.send(block_number) { - error!("failed to send watch progress, error={:?}", e); - } else { - continue; - } + if let Err(e) = progress_tx.send(log_latest_block_number) { + error!("failed to send watch progress, error={:?}", e); + } else { + continue; } } } @@ -468,6 +469,33 @@ impl LogSyncManager { } } +async fn get_start_block_number_with_hash( + log_sync_manager: &LogSyncManager, +) -> Result<(u64, H256), anyhow::Error> { + if let Some(block_number) = log_sync_manager.store.get_log_latest_block_number()? { + if let Some(Some(val)) = log_sync_manager + .block_hash_cache + .read() + .await + .get(&block_number) + { + return Ok((block_number, val.block_hash)); + } + } + + let (start_block_number, start_block_hash) = match log_sync_manager.store.get_sync_progress()? { + // No previous progress, so just use config. + None => { + let block_number = log_sync_manager.config.start_block_number; + let block_hash = log_sync_manager.get_block(block_number.into()).await?.1; + (block_number, block_hash) + } + Some((block_number, block_hash)) => (block_number, block_hash), + }; + + Ok((start_block_number, start_block_hash)) +} + async fn run_and_log( mut on_error: impl FnMut(), f: impl Future> + Send, diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index c0e4874..64ac9e0 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -345,6 +345,10 @@ impl LogStoreWrite for LogManager { self.tx_store.put_progress(progress) } + fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { + self.tx_store.put_log_latest_block_number(block_number) + } + /// Return the reverted Transactions in order. /// `tx_seq == u64::MAX` is a special case for reverting all transactions. fn revert_to(&self, tx_seq: u64) -> Result> { @@ -534,6 +538,10 @@ impl LogStoreRead for LogManager { self.tx_store.get_progress() } + fn get_log_latest_block_number(&self) -> Result> { + self.tx_store.get_log_latest_block_number() + } + fn get_block_hash_by_number(&self, block_number: u64) -> Result)>> { self.tx_store.get_block_hash_by_number(block_number) } diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index a4ffb94..a2a6442 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -59,6 +59,8 @@ pub trait LogStoreRead: LogStoreChunkRead { fn get_sync_progress(&self) -> Result>; + fn get_log_latest_block_number(&self) -> Result>; + fn get_block_hash_by_number(&self, block_number: u64) -> Result)>>; fn get_block_hashes(&self) -> Result>; @@ -126,6 +128,9 @@ pub trait LogStoreWrite: LogStoreChunkWrite { /// Store the progress of synced block number and its hash. fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()>; + /// Store the latest block number which has log + fn put_log_latest_block_number(&self, block_number: u64) -> Result<()>; + /// Revert the log state to a given tx seq. /// This is needed when transactions are reverted because of chain reorg. /// diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index b638726..48ee2ab 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -19,6 +19,7 @@ use tracing::{error, instrument}; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const NEXT_TX_KEY: &str = "next_tx_seq"; +const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key"; const TX_STATUS_FINALIZED: u8 = 0; const TX_STATUS_PRUNED: u8 = 1; @@ -214,6 +215,25 @@ impl TransactionStore { )) } + #[instrument(skip(self))] + pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { + Ok(self.kvdb.put( + COL_MISC, + LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(), + &block_number.as_ssz_bytes(), + )?) + } + + #[instrument(skip(self))] + pub fn get_log_latest_block_number(&self) -> Result> { + Ok(Some( + ::from_ssz_bytes(&try_option!(self + .kvdb + .get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?)) + .map_err(Error::from)?, + )) + } + pub fn get_block_hash_by_number( &self, block_number: u64,