Add zgs rpc to check file finality (#196)

* Add tx seq or root enum

* Add zgs rpc to check file finality

* trailing whitespace
This commit is contained in:
Bo QIU 2024-09-13 07:40:43 +08:00 committed by GitHub
parent 5c81abb79f
commit 7f14451141
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 119 additions and 2 deletions

1
Cargo.lock generated
View File

@ -7017,6 +7017,7 @@ dependencies = [
"merkle_light",
"merkle_tree",
"serde",
"serde_json",
"tiny-keccak",
"tracing",
"typenum",

View File

@ -1,7 +1,7 @@
use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use shared_types::{DataRoot, FlowProof};
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
use storage::config::ShardConfig;
#[rpc(server, client, namespace = "zgs")]
@ -30,6 +30,9 @@ pub trait Rpc {
index: usize,
) -> RpcResult<Option<SegmentWithProof>>;
#[method(name = "checkFileFinalized")]
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
#[method(name = "getFileInfo")]
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>>;

View File

@ -5,7 +5,7 @@ use crate::Context;
use chunk_pool::{FileID, SegmentInfo};
use jsonrpsee::core::async_trait;
use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, CHUNK_SIZE};
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig;
use storage::try_option;
@ -137,6 +137,31 @@ impl RpcServer for RpcServerImpl {
}))
}
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
debug!(?tx_seq_or_root, "zgs_checkFileFinalized");
let seq = match tx_seq_or_root {
TxSeqOrRoot::TxSeq(v) => v,
TxSeqOrRoot::Root(v) => {
try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?)
}
};
if self.ctx.log_store.check_tx_completed(seq).await? {
Ok(Some(true))
} else if self
.ctx
.log_store
.get_tx_by_seq_number(seq)
.await?
.is_some()
{
Ok(Some(false))
} else {
Ok(None)
}
}
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>> {
debug!(%data_root, "zgs_getFileInfo");

View File

@ -18,3 +18,6 @@ tracing = "0.1.35"
typenum = "1.15.0"
serde = { version = "1.0.137", features = ["derive"] }
chrono = "0.4.19"
[dev-dependencies]
serde_json = "1.0.82"

View File

@ -9,11 +9,13 @@ use merkle_light::merkle::MerkleTree;
use merkle_light::proof::Proof as RawFileProof;
use merkle_light::{hash::Algorithm, merkle::next_pow2};
use merkle_tree::RawLeafSha3Algorithm;
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use ssz::Encode;
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::fmt;
use std::hash::Hasher;
use std::str::FromStr;
use tiny_keccak::{Hasher as KeccakHasher, Keccak};
use tracing::debug;
@ -391,3 +393,86 @@ pub struct ProtocolVersion {
pub minor: u8,
pub build: u8,
}
#[derive(Debug)]
pub enum TxSeqOrRoot {
TxSeq(u64),
Root(DataRoot),
}
impl Serialize for TxSeqOrRoot {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
TxSeqOrRoot::TxSeq(seq) => seq.serialize(serializer),
TxSeqOrRoot::Root(root) => root.serialize(serializer),
}
}
}
impl<'a> Deserialize<'a> for TxSeqOrRoot {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
deserializer.deserialize_any(TxSeqOrRootVisitor)
}
}
struct TxSeqOrRootVisitor;
impl<'a> Visitor<'a> for TxSeqOrRootVisitor {
type Value = TxSeqOrRoot;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "an u64 integer or a hex64 value")
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(TxSeqOrRoot::TxSeq(v))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let root: H256 = H256::from_str(v).map_err(E::custom)?;
Ok(TxSeqOrRoot::Root(root))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tx_seq_or_root_serde() {
// serialize tx seq
let tx_seq = TxSeqOrRoot::TxSeq(666);
assert_eq!(serde_json::to_string(&tx_seq).unwrap(), "666".to_string());
// serialize root
let hash_str = "0xa906f46f8b9f15908dbee7adc5492ff30779c3abe114ccdb7079ecdcb72eb855";
let hash_quoted = format!("\"{}\"", hash_str);
let hash = H256::from_str(hash_str).unwrap();
let root = TxSeqOrRoot::Root(hash);
assert_eq!(serde_json::to_string(&root).unwrap(), hash_quoted);
// deserialize tx seq
assert!(matches!(
serde_json::from_str::<TxSeqOrRoot>("777").unwrap(),
TxSeqOrRoot::TxSeq(777)
));
// deserialize root
assert!(matches!(
serde_json::from_str::<TxSeqOrRoot>(hash_quoted.as_str()).unwrap(),
TxSeqOrRoot::Root(v) if v == hash,
));
}
}