mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-23 13:36:08 +00:00
Return the first finalized tx by data root if possible. (#278)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Only use tx seq for tx status. * Return the first finalized tx by data root if possible. This index is used for upload/download segments and file status check. In all the cases, if there is a finalized transaction, we should use it.
This commit is contained in:
parent
0da3c374db
commit
40104de891
@ -246,12 +246,7 @@ impl RpcServerImpl {
|
||||
}
|
||||
|
||||
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
||||
let (finalized, pruned) = match self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_store()
|
||||
.get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))?
|
||||
{
|
||||
let (finalized, pruned) = match self.ctx.log_store.get_store().get_tx_status(tx.seq)? {
|
||||
Some(TxStatus::Finalized) => (true, false),
|
||||
Some(TxStatus::Pruned) => (false, true),
|
||||
None => (false, false),
|
||||
|
@ -21,7 +21,6 @@ use rayon::prelude::ParallelSlice;
|
||||
use shared_types::{
|
||||
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
|
||||
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
||||
TxSeqOrRoot,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
@ -538,7 +537,15 @@ impl LogStoreRead for LogManager {
|
||||
}
|
||||
|
||||
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> {
|
||||
self.tx_store.get_first_tx_seq_by_data_root(data_root)
|
||||
let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
|
||||
for tx_seq in &seq_list {
|
||||
if self.tx_store.check_tx_completed(*tx_seq)? {
|
||||
// Return the first finalized tx if possible.
|
||||
return Ok(Some(*tx_seq));
|
||||
}
|
||||
}
|
||||
// No tx is finalized, return the first one.
|
||||
Ok(seq_list.first().cloned())
|
||||
}
|
||||
|
||||
fn get_chunk_with_proof_by_tx_and_index(
|
||||
@ -582,14 +589,7 @@ impl LogStoreRead for LogManager {
|
||||
}))
|
||||
}
|
||||
|
||||
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>> {
|
||||
let tx_seq = match tx_seq_or_data_root {
|
||||
TxSeqOrRoot::TxSeq(v) => v,
|
||||
TxSeqOrRoot::Root(root) => {
|
||||
try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?)
|
||||
}
|
||||
};
|
||||
|
||||
fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
|
||||
self.tx_store.get_tx_status(tx_seq)
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ use ethereum_types::H256;
|
||||
use flow_store::PadPair;
|
||||
use shared_types::{
|
||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||
Transaction, TxSeqOrRoot,
|
||||
Transaction,
|
||||
};
|
||||
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
|
||||
|
||||
@ -31,8 +31,12 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
||||
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
|
||||
|
||||
/// Get a transaction by the data root of its data.
|
||||
/// If all txs are not finalized, return the first one.
|
||||
/// Otherwise, return the first finalized tx.
|
||||
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>;
|
||||
|
||||
/// If all txs are not finalized, return the first one.
|
||||
/// Otherwise, return the first finalized tx.
|
||||
fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
|
||||
match self.get_tx_seq_by_data_root(data_root)? {
|
||||
Some(seq) => self.get_tx_by_seq_number(seq),
|
||||
@ -58,7 +62,7 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
||||
|
||||
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
|
||||
|
||||
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>>;
|
||||
fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>>;
|
||||
|
||||
fn next_tx_seq(&self) -> u64;
|
||||
|
||||
|
@ -181,14 +181,6 @@ impl TransactionStore {
|
||||
Ok(Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?)
|
||||
}
|
||||
|
||||
pub fn get_first_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>> {
|
||||
let value = try_option!(self
|
||||
.kvdb
|
||||
.get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())?);
|
||||
let seq_list = Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?;
|
||||
Ok(seq_list.first().cloned())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
|
||||
Ok(self.kvdb.put(
|
||||
|
@ -1,7 +1,6 @@
|
||||
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
|
||||
use anyhow::{bail, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use shared_types::TxSeqOrRoot;
|
||||
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
|
||||
use storage_async::Store;
|
||||
use tokio::sync::RwLock;
|
||||
@ -86,11 +85,7 @@ impl Batcher {
|
||||
|
||||
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
||||
// file already finalized or even pruned
|
||||
if let Some(tx_status) = self
|
||||
.store
|
||||
.get_store()
|
||||
.get_tx_status(TxSeqOrRoot::TxSeq(tx_seq))?
|
||||
{
|
||||
if let Some(tx_status) = self.store.get_store().get_tx_status(tx_seq)? {
|
||||
let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await;
|
||||
if num_terminated > 0 {
|
||||
info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db");
|
||||
|
@ -87,12 +87,7 @@ impl HistoricalTxWriter {
|
||||
}
|
||||
|
||||
// write tx in sync store if not finalized or pruned
|
||||
if self
|
||||
.store
|
||||
.get_store()
|
||||
.get_tx_status(shared_types::TxSeqOrRoot::TxSeq(next_tx_seq))?
|
||||
.is_none()
|
||||
{
|
||||
if self.store.get_store().get_tx_status(next_tx_seq)?.is_none() {
|
||||
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user