mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-23 13:36:08 +00:00
RPC return file pruned info (#266)
* RPC return file pruned info * return tx status in atomic manner * fix clippy
This commit is contained in:
parent
16e70bde68
commit
d93f453d50
@ -53,6 +53,8 @@ pub struct FileInfo {
|
|||||||
pub finalized: bool,
|
pub finalized: bool,
|
||||||
pub is_cached: bool,
|
pub is_cached: bool,
|
||||||
pub uploaded_seg_num: usize,
|
pub uploaded_seg_num: usize,
|
||||||
|
/// Whether file is pruned, in which case `finalized` will be `false`.
|
||||||
|
pub pruned: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
@ -8,6 +8,7 @@ use jsonrpsee::core::RpcResult;
|
|||||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||||
use std::fmt::{Debug, Formatter, Result};
|
use std::fmt::{Debug, Formatter, Result};
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
|
use storage::log_store::tx_store::TxStatus;
|
||||||
use storage::{try_option, H256};
|
use storage::{try_option, H256};
|
||||||
|
|
||||||
pub struct RpcServerImpl {
|
pub struct RpcServerImpl {
|
||||||
@ -245,7 +246,17 @@ impl RpcServerImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
||||||
let finalized = self.ctx.log_store.check_tx_completed(tx.seq).await?;
|
let (finalized, pruned) = match self
|
||||||
|
.ctx
|
||||||
|
.log_store
|
||||||
|
.get_store()
|
||||||
|
.get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))?
|
||||||
|
{
|
||||||
|
Some(TxStatus::Finalized) => (true, false),
|
||||||
|
Some(TxStatus::Pruned) => (false, true),
|
||||||
|
None => (false, false),
|
||||||
|
};
|
||||||
|
|
||||||
let (uploaded_seg_num, is_cached) = match self
|
let (uploaded_seg_num, is_cached) = match self
|
||||||
.ctx
|
.ctx
|
||||||
.chunk_pool
|
.chunk_pool
|
||||||
@ -254,7 +265,7 @@ impl RpcServerImpl {
|
|||||||
{
|
{
|
||||||
Some(v) => v,
|
Some(v) => v,
|
||||||
_ => (
|
_ => (
|
||||||
if finalized {
|
if finalized || pruned {
|
||||||
let chunks_per_segment = self.ctx.config.chunks_per_segment;
|
let chunks_per_segment = self.ctx.config.chunks_per_segment;
|
||||||
let (num_segments, _) = SegmentWithProof::split_file_into_segments(
|
let (num_segments, _) = SegmentWithProof::split_file_into_segments(
|
||||||
tx.size as usize,
|
tx.size as usize,
|
||||||
@ -273,6 +284,7 @@ impl RpcServerImpl {
|
|||||||
finalized,
|
finalized,
|
||||||
is_cached,
|
is_cached,
|
||||||
uploaded_seg_num,
|
uploaded_seg_num,
|
||||||
|
pruned,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ use crate::config::ShardConfig;
|
|||||||
use crate::log_store::flow_store::{
|
use crate::log_store::flow_store::{
|
||||||
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
|
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
|
||||||
};
|
};
|
||||||
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore};
|
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus};
|
||||||
use crate::log_store::{
|
use crate::log_store::{
|
||||||
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
|
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
|
||||||
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
|
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
|
||||||
@ -21,6 +21,7 @@ use rayon::prelude::ParallelSlice;
|
|||||||
use shared_types::{
|
use shared_types::{
|
||||||
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
|
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
|
||||||
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
||||||
|
TxSeqOrRoot,
|
||||||
};
|
};
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
|
|
||||||
@ -572,6 +573,17 @@ impl LogStoreRead for LogManager {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>> {
|
||||||
|
let tx_seq = match tx_seq_or_data_root {
|
||||||
|
TxSeqOrRoot::TxSeq(v) => v,
|
||||||
|
TxSeqOrRoot::Root(root) => {
|
||||||
|
try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.tx_store.get_tx_status(tx_seq)
|
||||||
|
}
|
||||||
|
|
||||||
fn check_tx_completed(&self, tx_seq: u64) -> crate::error::Result<bool> {
|
fn check_tx_completed(&self, tx_seq: u64) -> crate::error::Result<bool> {
|
||||||
self.tx_store.check_tx_completed(tx_seq)
|
self.tx_store.check_tx_completed(tx_seq)
|
||||||
}
|
}
|
||||||
|
@ -4,13 +4,13 @@ use ethereum_types::H256;
|
|||||||
use flow_store::PadPair;
|
use flow_store::PadPair;
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||||
Transaction,
|
Transaction, TxSeqOrRoot,
|
||||||
};
|
};
|
||||||
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
|
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
|
||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
|
|
||||||
use self::tx_store::BlockHashAndSubmissionIndex;
|
use self::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
|
||||||
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
mod flow_store;
|
mod flow_store;
|
||||||
@ -57,6 +57,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
|||||||
|
|
||||||
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
|
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
|
||||||
|
|
||||||
|
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>>;
|
||||||
|
|
||||||
fn next_tx_seq(&self) -> u64;
|
fn next_tx_seq(&self) -> u64;
|
||||||
|
|
||||||
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
|
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
|
||||||
|
@ -21,8 +21,31 @@ const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
|||||||
const NEXT_TX_KEY: &str = "next_tx_seq";
|
const NEXT_TX_KEY: &str = "next_tx_seq";
|
||||||
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
|
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
|
||||||
|
|
||||||
const TX_STATUS_FINALIZED: u8 = 0;
|
pub enum TxStatus {
|
||||||
const TX_STATUS_PRUNED: u8 = 1;
|
Finalized,
|
||||||
|
Pruned,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<TxStatus> for u8 {
|
||||||
|
fn from(value: TxStatus) -> Self {
|
||||||
|
match value {
|
||||||
|
TxStatus::Finalized => 0,
|
||||||
|
TxStatus::Pruned => 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<u8> for TxStatus {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
||||||
|
match value {
|
||||||
|
0 => Ok(TxStatus::Finalized),
|
||||||
|
1 => Ok(TxStatus::Pruned),
|
||||||
|
_ => Err(anyhow!("invalid value for tx status {}", value)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct BlockHashAndSubmissionIndex {
|
pub struct BlockHashAndSubmissionIndex {
|
||||||
@ -163,24 +186,35 @@ impl TransactionStore {
|
|||||||
Ok(self.kvdb.put(
|
Ok(self.kvdb.put(
|
||||||
COL_TX_COMPLETED,
|
COL_TX_COMPLETED,
|
||||||
&tx_seq.to_be_bytes(),
|
&tx_seq.to_be_bytes(),
|
||||||
&[TX_STATUS_FINALIZED],
|
&[TxStatus::Finalized.into()],
|
||||||
)?)
|
)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
|
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
|
||||||
Ok(self
|
Ok(self.kvdb.put(
|
||||||
.kvdb
|
COL_TX_COMPLETED,
|
||||||
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
|
&tx_seq.to_be_bytes(),
|
||||||
|
&[TxStatus::Pruned.into()],
|
||||||
|
)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
|
||||||
|
let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?);
|
||||||
|
match value.first() {
|
||||||
|
Some(v) => Ok(Some(TxStatus::try_from(*v)?)),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
||||||
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
|
let status = self.get_tx_status(tx_seq)?;
|
||||||
== Some(vec![TX_STATUS_FINALIZED]))
|
Ok(matches!(status, Some(TxStatus::Finalized)))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
|
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
|
||||||
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
|
let status = self.get_tx_status(tx_seq)?;
|
||||||
|
Ok(matches!(status, Some(TxStatus::Pruned)))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next_tx_seq(&self) -> u64 {
|
pub fn next_tx_seq(&self) -> u64 {
|
||||||
|
Loading…
Reference in New Issue
Block a user