mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Compare commits
5 Commits
b39e761fc6
...
0812d2e748
Author | SHA1 | Date | |
---|---|---|---|
|
0812d2e748 | ||
|
59d24b073d | ||
|
7f14451141 | ||
|
5c81abb79f | ||
|
3957f8b28d |
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7017,6 +7017,7 @@ dependencies = [
|
||||
"merkle_light",
|
||||
"merkle_tree",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tiny-keccak",
|
||||
"tracing",
|
||||
"typenum",
|
||||
|
@ -210,7 +210,7 @@ impl PoraService {
|
||||
let timer = time::Instant::now();
|
||||
|
||||
if let Some(answer) = miner.batch_iteration(nonce, self.iter_batch).await {
|
||||
debug!("Hit Pora answer {:?}", answer);
|
||||
info!("Hit Pora answer {:?}", answer);
|
||||
if self.mine_answer_sender.send(answer).is_err() {
|
||||
warn!("Mine submitter channel closed");
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic, SnappyTransform};
|
||||
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
|
||||
use futures::stream::StreamExt;
|
||||
use libp2p::gossipsub::error::PublishError;
|
||||
use libp2p::gossipsub::TopicScoreParams;
|
||||
use libp2p::{
|
||||
core::{
|
||||
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
|
||||
@ -226,7 +227,30 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
|
||||
|
||||
// trace!(behaviour_log, "Using peer score params"; "params" => ?params);
|
||||
|
||||
let params = libp2p::gossipsub::PeerScoreParams::default();
|
||||
let mut params = libp2p::gossipsub::PeerScoreParams::default();
|
||||
let get_hash = |kind: GossipKind| -> TopicHash {
|
||||
let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
|
||||
topic.hash()
|
||||
};
|
||||
params
|
||||
.topics
|
||||
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
|
||||
params.topics.insert(
|
||||
get_hash(GossipKind::FindChunks),
|
||||
TopicScoreParams::default(),
|
||||
);
|
||||
params.topics.insert(
|
||||
get_hash(GossipKind::AnnounceFile),
|
||||
TopicScoreParams::default(),
|
||||
);
|
||||
params.topics.insert(
|
||||
get_hash(GossipKind::AnnounceShardConfig),
|
||||
TopicScoreParams::default(),
|
||||
);
|
||||
params.topics.insert(
|
||||
get_hash(GossipKind::AnnounceChunks),
|
||||
TopicScoreParams::default(),
|
||||
);
|
||||
|
||||
// Set up a scoring update interval
|
||||
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
||||
|
@ -1,7 +1,7 @@
|
||||
use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::proc_macros::rpc;
|
||||
use shared_types::{DataRoot, FlowProof};
|
||||
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
|
||||
use storage::config::ShardConfig;
|
||||
|
||||
#[rpc(server, client, namespace = "zgs")]
|
||||
@ -30,6 +30,9 @@ pub trait Rpc {
|
||||
index: usize,
|
||||
) -> RpcResult<Option<SegmentWithProof>>;
|
||||
|
||||
#[method(name = "checkFileFinalized")]
|
||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
|
||||
|
||||
#[method(name = "getFileInfo")]
|
||||
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>>;
|
||||
|
||||
|
@ -5,7 +5,7 @@ use crate::Context;
|
||||
use chunk_pool::{FileID, SegmentInfo};
|
||||
use jsonrpsee::core::async_trait;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use shared_types::{DataRoot, FlowProof, Transaction, CHUNK_SIZE};
|
||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||
use std::fmt::{Debug, Formatter, Result};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::try_option;
|
||||
@ -137,6 +137,31 @@ impl RpcServer for RpcServerImpl {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
|
||||
debug!(?tx_seq_or_root, "zgs_checkFileFinalized");
|
||||
|
||||
let seq = match tx_seq_or_root {
|
||||
TxSeqOrRoot::TxSeq(v) => v,
|
||||
TxSeqOrRoot::Root(v) => {
|
||||
try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?)
|
||||
}
|
||||
};
|
||||
|
||||
if self.ctx.log_store.check_tx_completed(seq).await? {
|
||||
Ok(Some(true))
|
||||
} else if self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_tx_by_seq_number(seq)
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
Ok(Some(false))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>> {
|
||||
debug!(%data_root, "zgs_getFileInfo");
|
||||
|
||||
|
@ -18,3 +18,6 @@ tracing = "0.1.35"
|
||||
typenum = "1.15.0"
|
||||
serde = { version = "1.0.137", features = ["derive"] }
|
||||
chrono = "0.4.19"
|
||||
|
||||
[dev-dependencies]
|
||||
serde_json = "1.0.82"
|
||||
|
@ -9,11 +9,13 @@ use merkle_light::merkle::MerkleTree;
|
||||
use merkle_light::proof::Proof as RawFileProof;
|
||||
use merkle_light::{hash::Algorithm, merkle::next_pow2};
|
||||
use merkle_tree::RawLeafSha3Algorithm;
|
||||
use serde::de::Visitor;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz::Encode;
|
||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||
use std::fmt;
|
||||
use std::hash::Hasher;
|
||||
use std::str::FromStr;
|
||||
use tiny_keccak::{Hasher as KeccakHasher, Keccak};
|
||||
use tracing::debug;
|
||||
|
||||
@ -391,3 +393,86 @@ pub struct ProtocolVersion {
|
||||
pub minor: u8,
|
||||
pub build: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TxSeqOrRoot {
|
||||
TxSeq(u64),
|
||||
Root(DataRoot),
|
||||
}
|
||||
|
||||
impl Serialize for TxSeqOrRoot {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match self {
|
||||
TxSeqOrRoot::TxSeq(seq) => seq.serialize(serializer),
|
||||
TxSeqOrRoot::Root(root) => root.serialize(serializer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Deserialize<'a> for TxSeqOrRoot {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'a>,
|
||||
{
|
||||
deserializer.deserialize_any(TxSeqOrRootVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
struct TxSeqOrRootVisitor;
|
||||
|
||||
impl<'a> Visitor<'a> for TxSeqOrRootVisitor {
|
||||
type Value = TxSeqOrRoot;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(formatter, "an u64 integer or a hex64 value")
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
Ok(TxSeqOrRoot::TxSeq(v))
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
let root: H256 = H256::from_str(v).map_err(E::custom)?;
|
||||
Ok(TxSeqOrRoot::Root(root))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_tx_seq_or_root_serde() {
|
||||
// serialize tx seq
|
||||
let tx_seq = TxSeqOrRoot::TxSeq(666);
|
||||
assert_eq!(serde_json::to_string(&tx_seq).unwrap(), "666".to_string());
|
||||
|
||||
// serialize root
|
||||
let hash_str = "0xa906f46f8b9f15908dbee7adc5492ff30779c3abe114ccdb7079ecdcb72eb855";
|
||||
let hash_quoted = format!("\"{}\"", hash_str);
|
||||
let hash = H256::from_str(hash_str).unwrap();
|
||||
let root = TxSeqOrRoot::Root(hash);
|
||||
assert_eq!(serde_json::to_string(&root).unwrap(), hash_quoted);
|
||||
|
||||
// deserialize tx seq
|
||||
assert!(matches!(
|
||||
serde_json::from_str::<TxSeqOrRoot>("777").unwrap(),
|
||||
TxSeqOrRoot::TxSeq(777)
|
||||
));
|
||||
|
||||
// deserialize root
|
||||
assert!(matches!(
|
||||
serde_json::from_str::<TxSeqOrRoot>(hash_quoted.as_str()).unwrap(),
|
||||
TxSeqOrRoot::Root(v) if v == hash,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
@ -989,16 +989,16 @@ impl LogManager {
|
||||
}
|
||||
|
||||
// FIXME(zz): Implement padding.
|
||||
pub fn padding(len: usize) -> Vec<Vec<u8>> {
|
||||
pub fn padding(len: usize) -> Box<dyn Iterator<Item = Vec<u8>>> {
|
||||
let remainder = len % PAD_MAX_SIZE;
|
||||
let n = len / PAD_MAX_SIZE;
|
||||
let mut pad_data = vec![Self::padding_raw(PAD_MAX_SIZE); n];
|
||||
let iter = (0..n).map(|_| Self::padding_raw(PAD_MAX_SIZE));
|
||||
if remainder == 0 {
|
||||
pad_data
|
||||
Box::new(iter)
|
||||
} else {
|
||||
// insert the remainder to the front, so the rest are processed with alignment.
|
||||
pad_data.insert(0, Self::padding_raw(remainder));
|
||||
pad_data
|
||||
let new_iter = vec![Self::padding_raw(remainder)].into_iter().chain(iter);
|
||||
Box::new(new_iter)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user