mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-25 07:45:17 +00:00
Compare commits
2 Commits
31827d2791
...
d78a2e3238
Author | SHA1 | Date | |
---|---|---|---|
|
d78a2e3238 | ||
|
c5ddcc1f17 |
@ -7,8 +7,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
|
|||||||
|
|
||||||
pub use globals::NetworkGlobals;
|
pub use globals::NetworkGlobals;
|
||||||
pub use pubsub::{
|
pub use pubsub::{
|
||||||
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
|
AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, NewFile, PubsubMessage,
|
||||||
PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
|
SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage,
|
||||||
SignedMessage, SnappyTransform,
|
|
||||||
};
|
};
|
||||||
pub use topics::{GossipEncoding, GossipKind, GossipTopic};
|
pub use topics::{GossipEncoding, GossipKind, GossipTopic};
|
||||||
|
@ -6,7 +6,7 @@ use libp2p::{
|
|||||||
gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage},
|
gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage},
|
||||||
Multiaddr, PeerId,
|
Multiaddr, PeerId,
|
||||||
};
|
};
|
||||||
use shared_types::TxID;
|
use shared_types::{timestamp_now, ShardConfig, TxID};
|
||||||
use snap::raw::{decompress_len, Decoder, Encoder};
|
use snap::raw::{decompress_len, Decoder, Encoder};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
@ -162,14 +162,28 @@ pub struct AnnounceChunks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
|
||||||
pub struct AnnounceShardConfig {
|
pub struct TimedMessage<T: Encode + Decode> {
|
||||||
pub num_shard: usize,
|
pub inner: T,
|
||||||
pub shard_id: usize,
|
|
||||||
pub peer_id: WrappedPeerId,
|
|
||||||
pub at: WrappedMultiaddr,
|
|
||||||
pub timestamp: u32,
|
pub timestamp: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: Encode + Decode> Deref for TimedMessage<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Encode + Decode> From<T> for TimedMessage<T> {
|
||||||
|
fn from(value: T) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: value,
|
||||||
|
timestamp: timestamp_now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
|
||||||
pub struct SignedMessage<T: Encode + Decode> {
|
pub struct SignedMessage<T: Encode + Decode> {
|
||||||
pub inner: T,
|
pub inner: T,
|
||||||
@ -210,7 +224,6 @@ impl<T: Encode + Decode> HasSignature for SignedMessage<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub type SignedAnnounceFile = SignedMessage<AnnounceFile>;
|
pub type SignedAnnounceFile = SignedMessage<AnnounceFile>;
|
||||||
pub type SignedAnnounceShardConfig = SignedMessage<AnnounceShardConfig>;
|
|
||||||
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;
|
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;
|
||||||
|
|
||||||
type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
|
type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
|
||||||
@ -222,7 +235,7 @@ pub enum PubsubMessage {
|
|||||||
FindFile(FindFile),
|
FindFile(FindFile),
|
||||||
FindChunks(FindChunks),
|
FindChunks(FindChunks),
|
||||||
AnnounceFile(Vec<SignedAnnounceFile>),
|
AnnounceFile(Vec<SignedAnnounceFile>),
|
||||||
AnnounceShardConfig(SignedAnnounceShardConfig),
|
AnnounceShardConfig(TimedMessage<ShardConfig>),
|
||||||
AnnounceChunks(SignedAnnounceChunks),
|
AnnounceChunks(SignedAnnounceChunks),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,7 +355,7 @@ impl PubsubMessage {
|
|||||||
.map_err(|e| format!("{:?}", e))?,
|
.map_err(|e| format!("{:?}", e))?,
|
||||||
)),
|
)),
|
||||||
GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig(
|
GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig(
|
||||||
SignedAnnounceShardConfig::from_ssz_bytes(data)
|
TimedMessage::<ShardConfig>::from_ssz_bytes(data)
|
||||||
.map_err(|e| format!("{:?}", e))?,
|
.map_err(|e| format!("{:?}", e))?,
|
||||||
)),
|
)),
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,7 @@ pub const FIND_FILE_TOPIC: &str = "find_file";
|
|||||||
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
|
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
|
||||||
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
|
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
|
||||||
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
|
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
|
||||||
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
|
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config_v2";
|
||||||
|
|
||||||
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
|
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
|
||||||
/// the pubsub protocol and the way the messages should be encoded.
|
/// the pubsub protocol and the way the messages should be encoded.
|
||||||
|
@ -6,7 +6,7 @@ use chunk_pool::ChunkPoolMessage;
|
|||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use network::multiaddr::Protocol;
|
use network::multiaddr::Protocol;
|
||||||
use network::rpc::methods::FileAnnouncement;
|
use network::rpc::methods::FileAnnouncement;
|
||||||
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
|
use network::types::{NewFile, TimedMessage};
|
||||||
use network::{
|
use network::{
|
||||||
rpc::StatusMessage,
|
rpc::StatusMessage,
|
||||||
types::{
|
types::{
|
||||||
@ -383,7 +383,7 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
PubsubMessage::AnnounceShardConfig(msg) => {
|
PubsubMessage::AnnounceShardConfig(msg) => {
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1);
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1);
|
||||||
self.on_announce_shard_config(propagation_source, msg)
|
self.on_announce_shard_config(propagation_source, source, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -553,35 +553,6 @@ impl Libp2pEventHandler {
|
|||||||
Some(signed)
|
Some(signed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn construct_announce_shard_config_message(
|
|
||||||
&self,
|
|
||||||
shard_config: ShardConfig,
|
|
||||||
) -> Option<PubsubMessage> {
|
|
||||||
let peer_id = *self.network_globals.peer_id.read();
|
|
||||||
let addr = self.construct_announced_ip().await?;
|
|
||||||
let timestamp = timestamp_now();
|
|
||||||
|
|
||||||
let msg = AnnounceShardConfig {
|
|
||||||
num_shard: shard_config.num_shard,
|
|
||||||
shard_id: shard_config.shard_id,
|
|
||||||
peer_id: peer_id.into(),
|
|
||||||
at: addr.into(),
|
|
||||||
timestamp,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
|
|
||||||
Ok(signed) => signed,
|
|
||||||
Err(e) => {
|
|
||||||
error!(%e, "Failed to sign AnnounceShardConfig message");
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
signed.resend_timestamp = timestamp;
|
|
||||||
|
|
||||||
Some(PubsubMessage::AnnounceShardConfig(signed))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
|
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
|
||||||
let FindFile {
|
let FindFile {
|
||||||
tx_id, timestamp, ..
|
tx_id, timestamp, ..
|
||||||
@ -872,51 +843,33 @@ impl Libp2pEventHandler {
|
|||||||
fn on_announce_shard_config(
|
fn on_announce_shard_config(
|
||||||
&self,
|
&self,
|
||||||
propagation_source: PeerId,
|
propagation_source: PeerId,
|
||||||
msg: SignedAnnounceShardConfig,
|
source: PeerId,
|
||||||
|
msg: TimedMessage<shared_types::ShardConfig>,
|
||||||
) -> MessageAcceptance {
|
) -> MessageAcceptance {
|
||||||
// verify message signature
|
// validate timestamp
|
||||||
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
|
|
||||||
return MessageAcceptance::Reject;
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify public ip address if required
|
|
||||||
let addr = msg.at.clone().into();
|
|
||||||
if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) {
|
|
||||||
return MessageAcceptance::Reject;
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify announced ip address if required
|
|
||||||
if !self.config.private_ip_enabled
|
|
||||||
&& self.config.check_announced_ip
|
|
||||||
&& !self.verify_announced_address(&msg.peer_id, &addr)
|
|
||||||
{
|
|
||||||
return MessageAcceptance::Reject;
|
|
||||||
}
|
|
||||||
|
|
||||||
// propagate gossip to peers
|
|
||||||
let d = duration_since(
|
let d = duration_since(
|
||||||
msg.resend_timestamp,
|
msg.timestamp,
|
||||||
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(),
|
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(),
|
||||||
);
|
);
|
||||||
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
|
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
|
||||||
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
|
debug!(?d, %propagation_source, %source, ?msg, "Invalid timestamp, ignoring AnnounceShardConfig message");
|
||||||
return MessageAcceptance::Ignore;
|
return MessageAcceptance::Ignore;
|
||||||
}
|
}
|
||||||
|
|
||||||
let shard_config = ShardConfig {
|
let shard_config = match ShardConfig::try_from(msg.inner) {
|
||||||
shard_id: msg.shard_id,
|
Ok(v) => v,
|
||||||
num_shard: msg.num_shard,
|
Err(_) => return MessageAcceptance::Reject,
|
||||||
};
|
};
|
||||||
// notify sync layer
|
|
||||||
self.send_to_sync(SyncMessage::AnnounceShardConfig {
|
|
||||||
shard_config,
|
|
||||||
peer_id: msg.peer_id.clone().into(),
|
|
||||||
addr,
|
|
||||||
});
|
|
||||||
|
|
||||||
// insert message to cache
|
// insert message to cache
|
||||||
self.file_location_cache
|
self.file_location_cache
|
||||||
.insert_peer_config(msg.peer_id.clone().into(), shard_config);
|
.insert_peer_config(source, shard_config);
|
||||||
|
|
||||||
|
// notify sync layer
|
||||||
|
self.send_to_sync(SyncMessage::AnnounceShardConfig {
|
||||||
|
shard_config,
|
||||||
|
peer_id: source,
|
||||||
|
});
|
||||||
|
|
||||||
MessageAcceptance::Accept
|
MessageAcceptance::Accept
|
||||||
}
|
}
|
||||||
|
@ -384,13 +384,10 @@ impl RouterService {
|
|||||||
PrunerMessage::ChangeShardConfig(shard_config) => {
|
PrunerMessage::ChangeShardConfig(shard_config) => {
|
||||||
self.libp2p_event_handler
|
self.libp2p_event_handler
|
||||||
.send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config));
|
.send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config));
|
||||||
if let Some(msg) = self
|
|
||||||
.libp2p_event_handler
|
let shard_config = shared_types::ShardConfig::from(shard_config);
|
||||||
.construct_announce_shard_config_message(shard_config)
|
self.libp2p_event_handler
|
||||||
.await
|
.publish(PubsubMessage::AnnounceShardConfig(shard_config.into()));
|
||||||
{
|
|
||||||
self.libp2p_event_handler.publish(msg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -403,6 +403,13 @@ pub enum TxSeqOrRoot {
|
|||||||
Root(DataRoot),
|
Root(DataRoot),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, DeriveEncode, DeriveDecode)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ShardConfig {
|
||||||
|
pub num_shard: usize,
|
||||||
|
pub shard_id: usize,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
@ -60,6 +60,23 @@ impl TryFrom<Option<String>> for ShardConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TryFrom<shared_types::ShardConfig> for ShardConfig {
|
||||||
|
type Error = String;
|
||||||
|
|
||||||
|
fn try_from(value: shared_types::ShardConfig) -> Result<Self, Self::Error> {
|
||||||
|
Self::new(value.shard_id, value.num_shard)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ShardConfig> for shared_types::ShardConfig {
|
||||||
|
fn from(value: ShardConfig) -> Self {
|
||||||
|
Self {
|
||||||
|
num_shard: value.num_shard,
|
||||||
|
shard_id: value.shard_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ShardConfig {
|
impl ShardConfig {
|
||||||
pub fn new(id: usize, num: usize) -> Result<Self, String> {
|
pub fn new(id: usize, num: usize) -> Result<Self, String> {
|
||||||
let config = ShardConfig {
|
let config = ShardConfig {
|
||||||
|
@ -65,7 +65,6 @@ pub enum SyncMessage {
|
|||||||
AnnounceShardConfig {
|
AnnounceShardConfig {
|
||||||
shard_config: ShardConfig,
|
shard_config: ShardConfig,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
addr: Multiaddr,
|
|
||||||
},
|
},
|
||||||
AnnounceChunksGossip {
|
AnnounceChunksGossip {
|
||||||
msg: AnnounceChunks,
|
msg: AnnounceChunks,
|
||||||
|
Loading…
Reference in New Issue
Block a user