This commit is contained in:
Joel 2024-09-03 17:17:22 +08:00
parent 7b84b4fcd0
commit e53743bfbe
5 changed files with 127 additions and 52 deletions

View File

@ -252,7 +252,10 @@ impl LogEntryFetcher {
}) { }) {
Ok(event) => { Ok(event) => {
if let Err(e) = recover_tx if let Err(e) = recover_tx
.send(submission_event_to_transaction(event)) .send(submission_event_to_transaction(
event,
log.block_number.expect("block number exist").as_u64(),
))
.and_then(|_| match sync_progress { .and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b), Some(b) => recover_tx.send(b),
None => Ok(()), None => Ok(()),
@ -501,8 +504,11 @@ impl LogEntryFetcher {
first_submission_index = Some(submit_filter.submission_index.as_u64()); first_submission_index = Some(submit_filter.submission_index.as_u64());
} }
log_events.push(submission_event_to_transaction(submit_filter)); log_events
.push(submission_event_to_transaction(submit_filter, block_number));
} }
info!("synced {} events", log_events.len());
} }
let new_progress = if block.hash.is_some() && block.number.is_some() { let new_progress = if block.hash.is_some() && block.number.is_some() {
@ -514,21 +520,26 @@ impl LogEntryFetcher {
} else { } else {
None None
}; };
if !log_events.is_empty() {
info!("synced {} events", log_events.len());
for log in log_events.into_iter() { for log in log_events.into_iter() {
if let Err(e) = watch_tx.send(log) { if let Err(e) = watch_tx.send(log) {
warn!("send LogFetchProgress::Transaction failed: {:?}", e); warn!("send LogFetchProgress::Transaction failed: {:?}", e);
return Ok(progress); return Ok(progress);
} }
} }
if let Some(p) = &new_progress { if let Some(p) = &new_progress {
if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) { if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e); warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e);
return Ok(progress); return Ok(progress);
} else { } else {
block_hash_cache.write().await.insert(p.0, None); let mut cache = block_hash_cache.write().await;
match cache.get(&p.0) {
Some(Some(v))
if v.block_hash == p.1
&& v.first_submission_index == p.2.unwrap() => {}
_ => {
cache.insert(p.0, None);
}
} }
} }
} }
@ -650,12 +661,13 @@ async fn revert_one_block(
#[derive(Debug)] #[derive(Debug)]
pub enum LogFetchProgress { pub enum LogFetchProgress {
SyncedBlock((u64, H256, Option<Option<u64>>)), SyncedBlock((u64, H256, Option<Option<u64>>)),
Transaction(Transaction), Transaction((Transaction, u64)),
Reverted(u64), Reverted(u64),
} }
fn submission_event_to_transaction(e: SubmitFilter) -> LogFetchProgress { fn submission_event_to_transaction(e: SubmitFilter, block_number: u64) -> LogFetchProgress {
LogFetchProgress::Transaction(Transaction { LogFetchProgress::Transaction((
Transaction {
stream_ids: vec![], stream_ids: vec![],
data: vec![], data: vec![],
data_merkle_root: nodes_to_root(&e.submission.nodes), data_merkle_root: nodes_to_root(&e.submission.nodes),
@ -669,7 +681,9 @@ fn submission_event_to_transaction(e: SubmitFilter) -> LogFetchProgress {
start_entry_index: e.start_pos.as_u64(), start_entry_index: e.start_pos.as_u64(),
size: e.submission.length.as_u64(), size: e.submission.length.as_u64(),
seq: e.submission_index.as_u64(), seq: e.submission_index.as_u64(),
}) },
block_number,
))
} }
fn nodes_to_root(node_list: &[SubmissionNode]) -> DataRoot { fn nodes_to_root(node_list: &[SubmissionNode]) -> DataRoot {

View File

@ -103,16 +103,7 @@ impl LogSyncManager {
}; };
let (mut start_block_number, mut start_block_hash) = let (mut start_block_number, mut start_block_hash) =
match log_sync_manager.store.get_sync_progress()? { get_start_block_number_with_hash(&log_sync_manager).await?;
// No previous progress, so just use config.
None => {
let block_number = log_sync_manager.config.start_block_number;
let block_hash =
log_sync_manager.get_block(block_number.into()).await?.1;
(block_number, block_hash)
}
Some((block_number, block_hash)) => (block_number, block_hash),
};
let (mut finalized_block_number, mut finalized_block_hash) = let (mut finalized_block_number, mut finalized_block_hash) =
match log_sync_manager.get_block(BlockNumber::Finalized).await { match log_sync_manager.get_block(BlockNumber::Finalized).await {
@ -306,6 +297,13 @@ impl LogSyncManager {
mut rx: UnboundedReceiver<LogFetchProgress>, mut rx: UnboundedReceiver<LogFetchProgress>,
watch_progress_tx: &Option<UnboundedSender<u64>>, watch_progress_tx: &Option<UnboundedSender<u64>>,
) -> Result<()> { ) -> Result<()> {
let mut log_latest_block_number =
if let Some(block_number) = self.store.get_log_latest_block_number()? {
block_number
} else {
0
};
while let Some(data) = rx.recv().await { while let Some(data) = rx.recv().await {
trace!("handle_data: data={:?}", data); trace!("handle_data: data={:?}", data);
match data { match data {
@ -345,17 +343,21 @@ impl LogSyncManager {
} }
} }
} }
LogFetchProgress::Transaction(tx) => { LogFetchProgress::Transaction((tx, block_number)) => {
let mut stop = false; let mut stop = false;
match self.put_tx(tx.clone()).await { match self.put_tx(tx.clone()).await {
Some(false) => stop = true, Some(false) => stop = true,
Some(true) => {} Some(true) => {
if let Err(e) = self.store.put_log_latest_block_number(block_number) {
warn!("failed to put log latest block number, error={:?}", e);
}
log_latest_block_number = block_number;
}
_ => { _ => {
stop = true; stop = true;
if let Some(progress_tx) = watch_progress_tx { if let Some(progress_tx) = watch_progress_tx {
if let Some((block_number, _)) = self.store.get_sync_progress()? { if let Err(e) = progress_tx.send(log_latest_block_number) {
if let Err(e) = progress_tx.send(block_number) {
error!("failed to send watch progress, error={:?}", e); error!("failed to send watch progress, error={:?}", e);
} else { } else {
continue; continue;
@ -363,7 +365,6 @@ impl LogSyncManager {
} }
} }
} }
}
if stop { if stop {
// Unexpected error. // Unexpected error.
@ -468,6 +469,33 @@ impl LogSyncManager {
} }
} }
async fn get_start_block_number_with_hash(
log_sync_manager: &LogSyncManager,
) -> Result<(u64, H256), anyhow::Error> {
if let Some(block_number) = log_sync_manager.store.get_log_latest_block_number()? {
if let Some(Some(val)) = log_sync_manager
.block_hash_cache
.read()
.await
.get(&block_number)
{
return Ok((block_number, val.block_hash));
}
}
let (start_block_number, start_block_hash) = match log_sync_manager.store.get_sync_progress()? {
// No previous progress, so just use config.
None => {
let block_number = log_sync_manager.config.start_block_number;
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
(block_number, block_hash)
}
Some((block_number, block_hash)) => (block_number, block_hash),
};
Ok((start_block_number, start_block_hash))
}
async fn run_and_log<R, E>( async fn run_and_log<R, E>(
mut on_error: impl FnMut(), mut on_error: impl FnMut(),
f: impl Future<Output = std::result::Result<R, E>> + Send, f: impl Future<Output = std::result::Result<R, E>> + Send,

View File

@ -345,6 +345,10 @@ impl LogStoreWrite for LogManager {
self.tx_store.put_progress(progress) self.tx_store.put_progress(progress)
} }
fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
self.tx_store.put_log_latest_block_number(block_number)
}
/// Return the reverted Transactions in order. /// Return the reverted Transactions in order.
/// `tx_seq == u64::MAX` is a special case for reverting all transactions. /// `tx_seq == u64::MAX` is a special case for reverting all transactions.
fn revert_to(&self, tx_seq: u64) -> Result<Vec<Transaction>> { fn revert_to(&self, tx_seq: u64) -> Result<Vec<Transaction>> {
@ -534,6 +538,10 @@ impl LogStoreRead for LogManager {
self.tx_store.get_progress() self.tx_store.get_progress()
} }
fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
self.tx_store.get_log_latest_block_number()
}
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>> { fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>> {
self.tx_store.get_block_hash_by_number(block_number) self.tx_store.get_block_hash_by_number(block_number)
} }

View File

@ -59,6 +59,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>; fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
fn get_log_latest_block_number(&self) -> Result<Option<u64>>;
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>>; fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>>;
fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>>; fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>>;
@ -126,6 +128,9 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
/// Store the progress of synced block number and its hash. /// Store the progress of synced block number and its hash.
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>; fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
/// Store the latest block number which has log
fn put_log_latest_block_number(&self, block_number: u64) -> Result<()>;
/// Revert the log state to a given tx seq. /// Revert the log state to a given tx seq.
/// This is needed when transactions are reverted because of chain reorg. /// This is needed when transactions are reverted because of chain reorg.
/// ///

View File

@ -19,6 +19,7 @@ use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq"; const NEXT_TX_KEY: &str = "next_tx_seq";
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
const TX_STATUS_FINALIZED: u8 = 0; const TX_STATUS_FINALIZED: u8 = 0;
const TX_STATUS_PRUNED: u8 = 1; const TX_STATUS_PRUNED: u8 = 1;
@ -214,6 +215,25 @@ impl TransactionStore {
)) ))
} }
#[instrument(skip(self))]
pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
Ok(self.kvdb.put(
COL_MISC,
LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(),
&block_number.as_ssz_bytes(),
)?)
}
#[instrument(skip(self))]
pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
Ok(Some(
<u64>::from_ssz_bytes(&try_option!(self
.kvdb
.get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?))
.map_err(Error::from)?,
))
}
pub fn get_block_hash_by_number( pub fn get_block_hash_by_number(
&self, &self,
block_number: u64, block_number: u64,