From 71581496723c0ea97ce7dc0103d64654ff82ffbe Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Mon, 24 Mar 2025 12:14:53 +0800 Subject: [PATCH] add api of getting available file info by root --- node/rpc/src/zgs/api.rs | 6 ++++- node/rpc/src/zgs/impl.rs | 31 ++++++++++++++++++----- node/storage-async/src/lib.rs | 16 +++++++++--- node/storage/src/log_store/log_manager.rs | 18 +++++++++++-- node/storage/src/log_store/mod.rs | 18 +++++++++---- 5 files changed, 71 insertions(+), 18 deletions(-) diff --git a/node/rpc/src/zgs/api.rs b/node/rpc/src/zgs/api.rs index 32dcfb4..a922da5 100644 --- a/node/rpc/src/zgs/api.rs +++ b/node/rpc/src/zgs/api.rs @@ -63,7 +63,11 @@ pub trait Rpc { async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult>; #[method(name = "getFileInfo")] - async fn get_file_info(&self, data_root: DataRoot) -> RpcResult>; + async fn get_file_info( + &self, + data_root: DataRoot, + need_available: bool, + ) -> RpcResult>; #[method(name = "getFileInfoByTxSeq")] async fn get_file_info_by_tx_seq(&self, tx_seq: u64) -> RpcResult>; diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 190aa53..f0462b2 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -95,7 +95,7 @@ impl RpcServer for RpcServerImpl { let tx_seq = try_option!( self.ctx .log_store - .get_tx_seq_by_data_root(&data_root) + .get_tx_seq_by_data_root(&data_root, true) .await? ); @@ -121,7 +121,12 @@ impl RpcServer for RpcServerImpl { ) -> RpcResult> { info!(%data_root, %index, "zgs_downloadSegmentWithProof"); - let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?); + let tx = try_option!( + self.ctx + .log_store + .get_tx_by_data_root(&data_root, true) + .await? + ); self.get_segment_with_proof_by_tx(tx, index).await } @@ -144,7 +149,12 @@ impl RpcServer for RpcServerImpl { let seq = match tx_seq_or_root { TxSeqOrRoot::TxSeq(v) => v, TxSeqOrRoot::Root(v) => { - try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?) + try_option!( + self.ctx + .log_store + .get_tx_seq_by_data_root(&v, false) + .await? + ) } }; @@ -163,10 +173,19 @@ impl RpcServer for RpcServerImpl { } } - async fn get_file_info(&self, data_root: DataRoot) -> RpcResult> { + async fn get_file_info( + &self, + data_root: DataRoot, + need_available: bool, + ) -> RpcResult> { debug!(%data_root, "zgs_getFileInfo"); - let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?); + let tx = try_option!( + self.ctx + .log_store + .get_tx_by_data_root(&data_root, need_available) + .await? + ); Ok(Some(self.get_file_info_by_tx(tx).await?)) } @@ -288,7 +307,7 @@ impl RpcServerImpl { let maybe_tx = self .ctx .log_store - .get_tx_by_data_root(&segment.root) + .get_tx_by_data_root(&segment.root, false) .await?; self.put_segment_with_maybe_tx(segment, maybe_tx).await diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index b564f42..57de0e2 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -59,15 +59,23 @@ impl Store { delegate!(fn get_proof_at_root(root: Option, index: u64, length: u64) -> Result); delegate!(fn get_context() -> Result<(DataRoot, u64)>); - pub async fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result> { + pub async fn get_tx_seq_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result> { let root = *data_root; - self.spawn(move |store| store.get_tx_seq_by_data_root(&root)) + self.spawn(move |store| store.get_tx_seq_by_data_root(&root, need_available)) .await } - pub async fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result> { + pub async fn get_tx_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result> { let root = *data_root; - self.spawn(move |store| store.get_tx_by_data_root(&root)) + self.spawn(move |store| store.get_tx_by_data_root(&root, need_available)) .await } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 6a024e2..6e2f934 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -511,7 +511,7 @@ impl LogStoreChunkRead for LogManager { index_start: usize, index_end: usize, ) -> crate::error::Result> { - let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root)?); + let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root, true)?); self.get_chunks_by_tx_and_index_range(tx_seq, index_start, index_end) } @@ -536,13 +536,27 @@ impl LogStoreRead for LogManager { self.tx_store.get_tx_by_seq_number(seq) } - fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result> { + fn get_tx_seq_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> crate::error::Result> { let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?; + let mut available_seq = None; for tx_seq in &seq_list { if self.tx_store.check_tx_completed(*tx_seq)? { // Return the first finalized tx if possible. return Ok(Some(*tx_seq)); } + if need_available + && available_seq.is_none() + && !self.tx_store.check_tx_pruned(*tx_seq)? + { + available_seq = Some(*tx_seq); + } + } + if need_available { + return Ok(available_seq); } // No tx is finalized, return the first one. Ok(seq_list.first().cloned()) diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index fc2f82f..593f6b3 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -31,14 +31,22 @@ pub trait LogStoreRead: LogStoreChunkRead { fn get_tx_by_seq_number(&self, seq: u64) -> Result>; /// Get a transaction by the data root of its data. - /// If all txs are not finalized, return the first one. + /// If all txs are not finalized, return the first one if need available is false. /// Otherwise, return the first finalized tx. - fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result>; + fn get_tx_seq_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result>; - /// If all txs are not finalized, return the first one. + /// If all txs are not finalized, return the first one if need available is false. /// Otherwise, return the first finalized tx. - fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result> { - match self.get_tx_seq_by_data_root(data_root)? { + fn get_tx_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result> { + match self.get_tx_seq_by_data_root(data_root, need_available)? { Some(seq) => self.get_tx_by_seq_number(seq), None => Ok(None), }