Compare commits

...

5 Commits

Author SHA1 Message Date
peilun-conflux
0812d2e748
Merge 3957f8b28d into 59d24b073d 2024-09-13 07:41:36 +08:00
Jason
59d24b073d
change the Pora result from debug to info (#192)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
Co-authored-by: jason1 <jason1@cambricon.com>
2024-09-13 07:41:23 +08:00
Bo QIU
7f14451141
Add zgs rpc to check file finality (#196)
* Add tx seq or root enum

* Add zgs rpc to check file finality

* trailing whitespace
2024-09-13 07:40:43 +08:00
peilun-conflux
5c81abb79f
Use iterator to return padding data. (#197)
* Use iterator to return padding data.

* Fix order.
2024-09-13 07:38:47 +08:00
Peilun Li
3957f8b28d Add TopicScoreParams for gossipsubs. 2024-09-09 17:58:41 +08:00
8 changed files with 150 additions and 9 deletions

1
Cargo.lock generated
View File

@ -7017,6 +7017,7 @@ dependencies = [
"merkle_light",
"merkle_tree",
"serde",
"serde_json",
"tiny-keccak",
"tracing",
"typenum",

View File

@ -210,7 +210,7 @@ impl PoraService {
let timer = time::Instant::now();
if let Some(answer) = miner.batch_iteration(nonce, self.iter_batch).await {
debug!("Hit Pora answer {:?}", answer);
info!("Hit Pora answer {:?}", answer);
if self.mine_answer_sender.send(answer).is_err() {
warn!("Mine submitter channel closed");
}

View File

@ -13,6 +13,7 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic, SnappyTransform};
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use futures::stream::StreamExt;
use libp2p::gossipsub::error::PublishError;
use libp2p::gossipsub::TopicScoreParams;
use libp2p::{
core::{
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
@ -226,7 +227,30 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
// trace!(behaviour_log, "Using peer score params"; "params" => ?params);
let params = libp2p::gossipsub::PeerScoreParams::default();
let mut params = libp2p::gossipsub::PeerScoreParams::default();
let get_hash = |kind: GossipKind| -> TopicHash {
let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
topic.hash()
};
params
.topics
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
params.topics.insert(
get_hash(GossipKind::FindChunks),
TopicScoreParams::default(),
);
params.topics.insert(
get_hash(GossipKind::AnnounceFile),
TopicScoreParams::default(),
);
params.topics.insert(
get_hash(GossipKind::AnnounceShardConfig),
TopicScoreParams::default(),
);
params.topics.insert(
get_hash(GossipKind::AnnounceChunks),
TopicScoreParams::default(),
);
// Set up a scoring update interval
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);

View File

@ -1,7 +1,7 @@
use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use shared_types::{DataRoot, FlowProof};
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
use storage::config::ShardConfig;
#[rpc(server, client, namespace = "zgs")]
@ -30,6 +30,9 @@ pub trait Rpc {
index: usize,
) -> RpcResult<Option<SegmentWithProof>>;
#[method(name = "checkFileFinalized")]
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
#[method(name = "getFileInfo")]
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>>;

View File

@ -5,7 +5,7 @@ use crate::Context;
use chunk_pool::{FileID, SegmentInfo};
use jsonrpsee::core::async_trait;
use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, CHUNK_SIZE};
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig;
use storage::try_option;
@ -137,6 +137,31 @@ impl RpcServer for RpcServerImpl {
}))
}
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
debug!(?tx_seq_or_root, "zgs_checkFileFinalized");
let seq = match tx_seq_or_root {
TxSeqOrRoot::TxSeq(v) => v,
TxSeqOrRoot::Root(v) => {
try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?)
}
};
if self.ctx.log_store.check_tx_completed(seq).await? {
Ok(Some(true))
} else if self
.ctx
.log_store
.get_tx_by_seq_number(seq)
.await?
.is_some()
{
Ok(Some(false))
} else {
Ok(None)
}
}
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>> {
debug!(%data_root, "zgs_getFileInfo");

View File

@ -18,3 +18,6 @@ tracing = "0.1.35"
typenum = "1.15.0"
serde = { version = "1.0.137", features = ["derive"] }
chrono = "0.4.19"
[dev-dependencies]
serde_json = "1.0.82"

View File

@ -9,11 +9,13 @@ use merkle_light::merkle::MerkleTree;
use merkle_light::proof::Proof as RawFileProof;
use merkle_light::{hash::Algorithm, merkle::next_pow2};
use merkle_tree::RawLeafSha3Algorithm;
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use ssz::Encode;
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::fmt;
use std::hash::Hasher;
use std::str::FromStr;
use tiny_keccak::{Hasher as KeccakHasher, Keccak};
use tracing::debug;
@ -391,3 +393,86 @@ pub struct ProtocolVersion {
pub minor: u8,
pub build: u8,
}
#[derive(Debug)]
pub enum TxSeqOrRoot {
TxSeq(u64),
Root(DataRoot),
}
impl Serialize for TxSeqOrRoot {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
TxSeqOrRoot::TxSeq(seq) => seq.serialize(serializer),
TxSeqOrRoot::Root(root) => root.serialize(serializer),
}
}
}
impl<'a> Deserialize<'a> for TxSeqOrRoot {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
deserializer.deserialize_any(TxSeqOrRootVisitor)
}
}
struct TxSeqOrRootVisitor;
impl<'a> Visitor<'a> for TxSeqOrRootVisitor {
type Value = TxSeqOrRoot;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "an u64 integer or a hex64 value")
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(TxSeqOrRoot::TxSeq(v))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let root: H256 = H256::from_str(v).map_err(E::custom)?;
Ok(TxSeqOrRoot::Root(root))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tx_seq_or_root_serde() {
// serialize tx seq
let tx_seq = TxSeqOrRoot::TxSeq(666);
assert_eq!(serde_json::to_string(&tx_seq).unwrap(), "666".to_string());
// serialize root
let hash_str = "0xa906f46f8b9f15908dbee7adc5492ff30779c3abe114ccdb7079ecdcb72eb855";
let hash_quoted = format!("\"{}\"", hash_str);
let hash = H256::from_str(hash_str).unwrap();
let root = TxSeqOrRoot::Root(hash);
assert_eq!(serde_json::to_string(&root).unwrap(), hash_quoted);
// deserialize tx seq
assert!(matches!(
serde_json::from_str::<TxSeqOrRoot>("777").unwrap(),
TxSeqOrRoot::TxSeq(777)
));
// deserialize root
assert!(matches!(
serde_json::from_str::<TxSeqOrRoot>(hash_quoted.as_str()).unwrap(),
TxSeqOrRoot::Root(v) if v == hash,
));
}
}

View File

@ -989,16 +989,16 @@ impl LogManager {
}
// FIXME(zz): Implement padding.
pub fn padding(len: usize) -> Vec<Vec<u8>> {
pub fn padding(len: usize) -> Box<dyn Iterator<Item = Vec<u8>>> {
let remainder = len % PAD_MAX_SIZE;
let n = len / PAD_MAX_SIZE;
let mut pad_data = vec![Self::padding_raw(PAD_MAX_SIZE); n];
let iter = (0..n).map(|_| Self::padding_raw(PAD_MAX_SIZE));
if remainder == 0 {
pad_data
Box::new(iter)
} else {
// insert the remainder to the front, so the rest are processed with alignment.
pad_data.insert(0, Self::padding_raw(remainder));
pad_data
let new_iter = vec![Self::padding_raw(remainder)].into_iter().chain(iter);
Box::new(new_iter)
}
}