From 7f144511414aec4d3c8efa247baea88cb7059e5c Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Fri, 13 Sep 2024 07:40:43 +0800 Subject: [PATCH] Add zgs rpc to check file finality (#196) * Add tx seq or root enum * Add zgs rpc to check file finality * trailing whitespace --- Cargo.lock | 1 + node/rpc/src/zgs/api.rs | 5 ++- node/rpc/src/zgs/impl.rs | 27 +++++++++++- node/shared_types/Cargo.toml | 3 ++ node/shared_types/src/lib.rs | 85 ++++++++++++++++++++++++++++++++++++ 5 files changed, 119 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 470227a..9dca6e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7017,6 +7017,7 @@ dependencies = [ "merkle_light", "merkle_tree", "serde", + "serde_json", "tiny-keccak", "tracing", "typenum", diff --git a/node/rpc/src/zgs/api.rs b/node/rpc/src/zgs/api.rs index 2ed6fa3..7a55db2 100644 --- a/node/rpc/src/zgs/api.rs +++ b/node/rpc/src/zgs/api.rs @@ -1,7 +1,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status}; use jsonrpsee::core::RpcResult; use jsonrpsee::proc_macros::rpc; -use shared_types::{DataRoot, FlowProof}; +use shared_types::{DataRoot, FlowProof, TxSeqOrRoot}; use storage::config::ShardConfig; #[rpc(server, client, namespace = "zgs")] @@ -30,6 +30,9 @@ pub trait Rpc { index: usize, ) -> RpcResult>; + #[method(name = "checkFileFinalized")] + async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult>; + #[method(name = "getFileInfo")] async fn get_file_info(&self, data_root: DataRoot) -> RpcResult>; diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index a961cf4..7d5934f 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -5,7 +5,7 @@ use crate::Context; use chunk_pool::{FileID, SegmentInfo}; use jsonrpsee::core::async_trait; use jsonrpsee::core::RpcResult; -use shared_types::{DataRoot, FlowProof, Transaction, CHUNK_SIZE}; +use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE}; use std::fmt::{Debug, Formatter, Result}; use storage::config::ShardConfig; use storage::try_option; @@ -137,6 +137,31 @@ impl RpcServer for RpcServerImpl { })) } + async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult> { + debug!(?tx_seq_or_root, "zgs_checkFileFinalized"); + + let seq = match tx_seq_or_root { + TxSeqOrRoot::TxSeq(v) => v, + TxSeqOrRoot::Root(v) => { + try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?) + } + }; + + if self.ctx.log_store.check_tx_completed(seq).await? { + Ok(Some(true)) + } else if self + .ctx + .log_store + .get_tx_by_seq_number(seq) + .await? + .is_some() + { + Ok(Some(false)) + } else { + Ok(None) + } + } + async fn get_file_info(&self, data_root: DataRoot) -> RpcResult> { debug!(%data_root, "zgs_getFileInfo"); diff --git a/node/shared_types/Cargo.toml b/node/shared_types/Cargo.toml index 9297a9e..86a4cad 100644 --- a/node/shared_types/Cargo.toml +++ b/node/shared_types/Cargo.toml @@ -18,3 +18,6 @@ tracing = "0.1.35" typenum = "1.15.0" serde = { version = "1.0.137", features = ["derive"] } chrono = "0.4.19" + +[dev-dependencies] +serde_json = "1.0.82" diff --git a/node/shared_types/src/lib.rs b/node/shared_types/src/lib.rs index 904cf67..e2f507c 100644 --- a/node/shared_types/src/lib.rs +++ b/node/shared_types/src/lib.rs @@ -9,11 +9,13 @@ use merkle_light::merkle::MerkleTree; use merkle_light::proof::Proof as RawFileProof; use merkle_light::{hash::Algorithm, merkle::next_pow2}; use merkle_tree::RawLeafSha3Algorithm; +use serde::de::Visitor; use serde::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; use std::fmt; use std::hash::Hasher; +use std::str::FromStr; use tiny_keccak::{Hasher as KeccakHasher, Keccak}; use tracing::debug; @@ -391,3 +393,86 @@ pub struct ProtocolVersion { pub minor: u8, pub build: u8, } + +#[derive(Debug)] +pub enum TxSeqOrRoot { + TxSeq(u64), + Root(DataRoot), +} + +impl Serialize for TxSeqOrRoot { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + TxSeqOrRoot::TxSeq(seq) => seq.serialize(serializer), + TxSeqOrRoot::Root(root) => root.serialize(serializer), + } + } +} + +impl<'a> Deserialize<'a> for TxSeqOrRoot { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + deserializer.deserialize_any(TxSeqOrRootVisitor) + } +} + +struct TxSeqOrRootVisitor; + +impl<'a> Visitor<'a> for TxSeqOrRootVisitor { + type Value = TxSeqOrRoot; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "an u64 integer or a hex64 value") + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + Ok(TxSeqOrRoot::TxSeq(v)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + let root: H256 = H256::from_str(v).map_err(E::custom)?; + Ok(TxSeqOrRoot::Root(root)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tx_seq_or_root_serde() { + // serialize tx seq + let tx_seq = TxSeqOrRoot::TxSeq(666); + assert_eq!(serde_json::to_string(&tx_seq).unwrap(), "666".to_string()); + + // serialize root + let hash_str = "0xa906f46f8b9f15908dbee7adc5492ff30779c3abe114ccdb7079ecdcb72eb855"; + let hash_quoted = format!("\"{}\"", hash_str); + let hash = H256::from_str(hash_str).unwrap(); + let root = TxSeqOrRoot::Root(hash); + assert_eq!(serde_json::to_string(&root).unwrap(), hash_quoted); + + // deserialize tx seq + assert!(matches!( + serde_json::from_str::("777").unwrap(), + TxSeqOrRoot::TxSeq(777) + )); + + // deserialize root + assert!(matches!( + serde_json::from_str::(hash_quoted.as_str()).unwrap(), + TxSeqOrRoot::Root(v) if v == hash, + )); + } +}