From 40104de8918b5e0696d7cdc1de1eae30634bdc08 Mon Sep 17 00:00:00 2001 From: peilun-conflux <48905552+peilun-conflux@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:59:50 +0800 Subject: [PATCH] Return the first finalized tx by data root if possible. (#278) * Only use tx seq for tx status. * Return the first finalized tx by data root if possible. This index is used for upload/download segments and file status check. In all the cases, if there is a finalized transaction, we should use it. --- node/rpc/src/zgs/impl.rs | 7 +------ node/storage/src/log_store/log_manager.rs | 20 +++++++++---------- node/storage/src/log_store/mod.rs | 8 ++++++-- node/storage/src/log_store/tx_store.rs | 8 -------- node/sync/src/auto_sync/batcher.rs | 7 +------ .../src/auto_sync/historical_tx_writer.rs | 7 +------ 6 files changed, 19 insertions(+), 38 deletions(-) diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 9746cc0..8f399f8 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -246,12 +246,7 @@ impl RpcServerImpl { } async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult { - let (finalized, pruned) = match self - .ctx - .log_store - .get_store() - .get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))? - { + let (finalized, pruned) = match self.ctx.log_store.get_store().get_tx_status(tx.seq)? { Some(TxStatus::Finalized) => (true, false), Some(TxStatus::Pruned) => (false, true), None => (false, false), diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 8a9d161..1c1e27a 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -21,7 +21,6 @@ use rayon::prelude::ParallelSlice; use shared_types::{ bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction, - TxSeqOrRoot, }; use std::cmp::Ordering; @@ -538,7 +537,15 @@ impl LogStoreRead for LogManager { } fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result> { - self.tx_store.get_first_tx_seq_by_data_root(data_root) + let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?; + for tx_seq in &seq_list { + if self.tx_store.check_tx_completed(*tx_seq)? { + // Return the first finalized tx if possible. + return Ok(Some(*tx_seq)); + } + } + // No tx is finalized, return the first one. + Ok(seq_list.first().cloned()) } fn get_chunk_with_proof_by_tx_and_index( @@ -582,14 +589,7 @@ impl LogStoreRead for LogManager { })) } - fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result> { - let tx_seq = match tx_seq_or_data_root { - TxSeqOrRoot::TxSeq(v) => v, - TxSeqOrRoot::Root(root) => { - try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?) - } - }; - + fn get_tx_status(&self, tx_seq: u64) -> Result> { self.tx_store.get_tx_status(tx_seq) } diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index b143a4c..fc2f82f 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -4,7 +4,7 @@ use ethereum_types::H256; use flow_store::PadPair; use shared_types::{ Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, - Transaction, TxSeqOrRoot, + Transaction, }; use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD}; @@ -31,8 +31,12 @@ pub trait LogStoreRead: LogStoreChunkRead { fn get_tx_by_seq_number(&self, seq: u64) -> Result>; /// Get a transaction by the data root of its data. + /// If all txs are not finalized, return the first one. + /// Otherwise, return the first finalized tx. fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result>; + /// If all txs are not finalized, return the first one. + /// Otherwise, return the first finalized tx. fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result> { match self.get_tx_seq_by_data_root(data_root)? { Some(seq) => self.get_tx_by_seq_number(seq), @@ -58,7 +62,7 @@ pub trait LogStoreRead: LogStoreChunkRead { fn check_tx_pruned(&self, tx_seq: u64) -> Result; - fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result>; + fn get_tx_status(&self, tx_seq: u64) -> Result>; fn next_tx_seq(&self) -> u64; diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 280c279..93fd701 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -181,14 +181,6 @@ impl TransactionStore { Ok(Vec::::from_ssz_bytes(&value).map_err(Error::from)?) } - pub fn get_first_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result> { - let value = try_option!(self - .kvdb - .get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())?); - let seq_list = Vec::::from_ssz_bytes(&value).map_err(Error::from)?; - Ok(seq_list.first().cloned()) - } - #[instrument(skip(self))] pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { Ok(self.kvdb.put( diff --git a/node/sync/src/auto_sync/batcher.rs b/node/sync/src/auto_sync/batcher.rs index 8c35002..2d1cf73 100644 --- a/node/sync/src/auto_sync/batcher.rs +++ b/node/sync/src/auto_sync/batcher.rs @@ -1,7 +1,6 @@ use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender}; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; -use shared_types::TxSeqOrRoot; use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration}; use storage_async::Store; use tokio::sync::RwLock; @@ -86,11 +85,7 @@ impl Batcher { async fn poll_tx(&self, tx_seq: u64) -> Result> { // file already finalized or even pruned - if let Some(tx_status) = self - .store - .get_store() - .get_tx_status(TxSeqOrRoot::TxSeq(tx_seq))? - { + if let Some(tx_status) = self.store.get_store().get_tx_status(tx_seq)? { let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await; if num_terminated > 0 { info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db"); diff --git a/node/sync/src/auto_sync/historical_tx_writer.rs b/node/sync/src/auto_sync/historical_tx_writer.rs index d905712..ef15373 100644 --- a/node/sync/src/auto_sync/historical_tx_writer.rs +++ b/node/sync/src/auto_sync/historical_tx_writer.rs @@ -87,12 +87,7 @@ impl HistoricalTxWriter { } // write tx in sync store if not finalized or pruned - if self - .store - .get_store() - .get_tx_status(shared_types::TxSeqOrRoot::TxSeq(next_tx_seq))? - .is_none() - { + if self.store.get_store().get_tx_status(next_tx_seq)?.is_none() { self.sync_store.insert(next_tx_seq, Queue::Ready).await?; }