Compare commits

..

1 Commits

Author SHA1 Message Date
0g-peterzhb
31827d2791
Merge 9a11e969e6 into 4bb2fac98f 2024-12-05 13:05:01 +07:00
8 changed files with 84 additions and 69 deletions

View File

@ -7,7 +7,8 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals; pub use globals::NetworkGlobals;
pub use pubsub::{ pub use pubsub::{
AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, NewFile, PubsubMessage, AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage, PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
SignedMessage, SnappyTransform,
}; };
pub use topics::{GossipEncoding, GossipKind, GossipTopic}; pub use topics::{GossipEncoding, GossipKind, GossipTopic};

View File

@ -6,7 +6,7 @@ use libp2p::{
gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage}, gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage},
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
use shared_types::{timestamp_now, ShardConfig, TxID}; use shared_types::TxID;
use snap::raw::{decompress_len, Decoder, Encoder}; use snap::raw::{decompress_len, Decoder, Encoder};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
@ -162,28 +162,14 @@ pub struct AnnounceChunks {
} }
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct TimedMessage<T: Encode + Decode> { pub struct AnnounceShardConfig {
pub inner: T, pub num_shard: usize,
pub shard_id: usize,
pub peer_id: WrappedPeerId,
pub at: WrappedMultiaddr,
pub timestamp: u32, pub timestamp: u32,
} }
impl<T: Encode + Decode> Deref for TimedMessage<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T: Encode + Decode> From<T> for TimedMessage<T> {
fn from(value: T) -> Self {
Self {
inner: value,
timestamp: timestamp_now(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)]
pub struct SignedMessage<T: Encode + Decode> { pub struct SignedMessage<T: Encode + Decode> {
pub inner: T, pub inner: T,
@ -224,6 +210,7 @@ impl<T: Encode + Decode> HasSignature for SignedMessage<T> {
} }
pub type SignedAnnounceFile = SignedMessage<AnnounceFile>; pub type SignedAnnounceFile = SignedMessage<AnnounceFile>;
pub type SignedAnnounceShardConfig = SignedMessage<AnnounceShardConfig>;
pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>; pub type SignedAnnounceChunks = SignedMessage<AnnounceChunks>;
type SignedAnnounceFiles = Vec<SignedAnnounceFile>; type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
@ -235,7 +222,7 @@ pub enum PubsubMessage {
FindFile(FindFile), FindFile(FindFile),
FindChunks(FindChunks), FindChunks(FindChunks),
AnnounceFile(Vec<SignedAnnounceFile>), AnnounceFile(Vec<SignedAnnounceFile>),
AnnounceShardConfig(TimedMessage<ShardConfig>), AnnounceShardConfig(SignedAnnounceShardConfig),
AnnounceChunks(SignedAnnounceChunks), AnnounceChunks(SignedAnnounceChunks),
} }
@ -355,7 +342,7 @@ impl PubsubMessage {
.map_err(|e| format!("{:?}", e))?, .map_err(|e| format!("{:?}", e))?,
)), )),
GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig( GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig(
TimedMessage::<ShardConfig>::from_ssz_bytes(data) SignedAnnounceShardConfig::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?, .map_err(|e| format!("{:?}", e))?,
)), )),
} }

View File

@ -13,7 +13,7 @@ pub const FIND_FILE_TOPIC: &str = "find_file";
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks"; pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file"; pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks"; pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config_v2"; pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
/// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// A gossipsub topic which encapsulates the type of messages that should be sent and received over
/// the pubsub protocol and the way the messages should be encoded. /// the pubsub protocol and the way the messages should be encoded.

View File

@ -6,7 +6,7 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use network::multiaddr::Protocol; use network::multiaddr::Protocol;
use network::rpc::methods::FileAnnouncement; use network::rpc::methods::FileAnnouncement;
use network::types::{NewFile, TimedMessage}; use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
use network::{ use network::{
rpc::StatusMessage, rpc::StatusMessage,
types::{ types::{
@ -383,7 +383,7 @@ impl Libp2pEventHandler {
} }
PubsubMessage::AnnounceShardConfig(msg) => { PubsubMessage::AnnounceShardConfig(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1); metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1);
self.on_announce_shard_config(propagation_source, source, msg) self.on_announce_shard_config(propagation_source, msg)
} }
} }
} }
@ -553,6 +553,35 @@ impl Libp2pEventHandler {
Some(signed) Some(signed)
} }
pub async fn construct_announce_shard_config_message(
&self,
shard_config: ShardConfig,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = self.construct_announced_ip().await?;
let timestamp = timestamp_now();
let msg = AnnounceShardConfig {
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
peer_id: peer_id.into(),
at: addr.into(),
timestamp,
};
let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) {
Ok(signed) => signed,
Err(e) => {
error!(%e, "Failed to sign AnnounceShardConfig message");
return None;
}
};
signed.resend_timestamp = timestamp;
Some(PubsubMessage::AnnounceShardConfig(signed))
}
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance { async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
let FindFile { let FindFile {
tx_id, timestamp, .. tx_id, timestamp, ..
@ -843,34 +872,52 @@ impl Libp2pEventHandler {
fn on_announce_shard_config( fn on_announce_shard_config(
&self, &self,
propagation_source: PeerId, propagation_source: PeerId,
source: PeerId, msg: SignedAnnounceShardConfig,
msg: TimedMessage<shared_types::ShardConfig>,
) -> MessageAcceptance { ) -> MessageAcceptance {
// validate timestamp // verify message signature
if !verify_signature(&msg, &msg.peer_id, propagation_source) {
return MessageAcceptance::Reject;
}
// verify public ip address if required
let addr = msg.at.clone().into();
if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) {
return MessageAcceptance::Reject;
}
// verify announced ip address if required
if !self.config.private_ip_enabled
&& self.config.check_announced_ip
&& !self.verify_announced_address(&msg.peer_id, &addr)
{
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since( let d = duration_since(
msg.timestamp, msg.resend_timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(), metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD_LATENCY.clone(),
); );
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT { if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
debug!(?d, %propagation_source, %source, ?msg, "Invalid timestamp, ignoring AnnounceShardConfig message"); debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
return MessageAcceptance::Ignore; return MessageAcceptance::Ignore;
} }
let shard_config = match ShardConfig::try_from(msg.inner) { let shard_config = ShardConfig {
Ok(v) => v, shard_id: msg.shard_id,
Err(_) => return MessageAcceptance::Reject, num_shard: msg.num_shard,
}; };
// insert message to cache
self.file_location_cache
.insert_peer_config(source, shard_config);
// notify sync layer // notify sync layer
self.send_to_sync(SyncMessage::AnnounceShardConfig { self.send_to_sync(SyncMessage::AnnounceShardConfig {
shard_config, shard_config,
peer_id: source, peer_id: msg.peer_id.clone().into(),
addr,
}); });
// insert message to cache
self.file_location_cache
.insert_peer_config(msg.peer_id.clone().into(), shard_config);
MessageAcceptance::Accept MessageAcceptance::Accept
} }

View File

@ -384,10 +384,13 @@ impl RouterService {
PrunerMessage::ChangeShardConfig(shard_config) => { PrunerMessage::ChangeShardConfig(shard_config) => {
self.libp2p_event_handler self.libp2p_event_handler
.send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config)); .send_to_chunk_pool(ChunkPoolMessage::ChangeShardConfig(shard_config));
if let Some(msg) = self
let shard_config = shared_types::ShardConfig::from(shard_config); .libp2p_event_handler
self.libp2p_event_handler .construct_announce_shard_config_message(shard_config)
.publish(PubsubMessage::AnnounceShardConfig(shard_config.into())); .await
{
self.libp2p_event_handler.publish(msg)
}
} }
} }
} }

View File

@ -403,13 +403,6 @@ pub enum TxSeqOrRoot {
Root(DataRoot), Root(DataRoot),
} }
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, DeriveEncode, DeriveDecode)]
#[serde(rename_all = "camelCase")]
pub struct ShardConfig {
pub num_shard: usize,
pub shard_id: usize,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::str::FromStr; use std::str::FromStr;

View File

@ -60,23 +60,6 @@ impl TryFrom<Option<String>> for ShardConfig {
} }
} }
impl TryFrom<shared_types::ShardConfig> for ShardConfig {
type Error = String;
fn try_from(value: shared_types::ShardConfig) -> Result<Self, Self::Error> {
Self::new(value.shard_id, value.num_shard)
}
}
impl From<ShardConfig> for shared_types::ShardConfig {
fn from(value: ShardConfig) -> Self {
Self {
num_shard: value.num_shard,
shard_id: value.shard_id,
}
}
}
impl ShardConfig { impl ShardConfig {
pub fn new(id: usize, num: usize) -> Result<Self, String> { pub fn new(id: usize, num: usize) -> Result<Self, String> {
let config = ShardConfig { let config = ShardConfig {

View File

@ -65,6 +65,7 @@ pub enum SyncMessage {
AnnounceShardConfig { AnnounceShardConfig {
shard_config: ShardConfig, shard_config: ShardConfig,
peer_id: PeerId, peer_id: PeerId,
addr: Multiaddr,
}, },
AnnounceChunksGossip { AnnounceChunksGossip {
msg: AnnounceChunks, msg: AnnounceChunks,