Add new P2P protocol NewFile

This commit is contained in:
boqiu 2024-10-23 16:39:59 +08:00
parent 8f17a7ad72
commit e06936f381
6 changed files with 43 additions and 2 deletions

View File

@ -20,6 +20,8 @@ pub struct GossipCache {
topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>, topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>,
/// Timeout for Example messages. /// Timeout for Example messages.
example: Option<Duration>, example: Option<Duration>,
/// Timeout for NewFile messages.
new_file: Option<Duration>,
/// Timeout for FindFile messages. /// Timeout for FindFile messages.
find_file: Option<Duration>, find_file: Option<Duration>,
/// Timeout for FindChunks messages. /// Timeout for FindChunks messages.
@ -37,6 +39,8 @@ pub struct GossipCacheBuilder {
default_timeout: Option<Duration>, default_timeout: Option<Duration>,
/// Timeout for Example messages. /// Timeout for Example messages.
example: Option<Duration>, example: Option<Duration>,
/// Timeout for NewFile messges.
new_file: Option<Duration>,
/// Timeout for blocks FindFile messages. /// Timeout for blocks FindFile messages.
find_file: Option<Duration>, find_file: Option<Duration>,
/// Timeout for blocks FindChunks messages. /// Timeout for blocks FindChunks messages.
@ -64,6 +68,12 @@ impl GossipCacheBuilder {
self self
} }
/// Timeout for NewFile messages.
pub fn new_file_timeout(mut self, timeout: Duration) -> Self {
self.new_file = Some(timeout);
self
}
/// Timeout for FindFile messages. /// Timeout for FindFile messages.
pub fn find_file_timeout(mut self, timeout: Duration) -> Self { pub fn find_file_timeout(mut self, timeout: Duration) -> Self {
self.find_file = Some(timeout); self.find_file = Some(timeout);
@ -98,6 +108,7 @@ impl GossipCacheBuilder {
let GossipCacheBuilder { let GossipCacheBuilder {
default_timeout, default_timeout,
example, example,
new_file,
find_file, find_file,
find_chunks, find_chunks,
announce_file, announce_file,
@ -109,6 +120,7 @@ impl GossipCacheBuilder {
expirations: DelayQueue::default(), expirations: DelayQueue::default(),
topic_msgs: HashMap::default(), topic_msgs: HashMap::default(),
example: example.or(default_timeout), example: example.or(default_timeout),
new_file: new_file.or(default_timeout),
find_file: find_file.or(default_timeout), find_file: find_file.or(default_timeout),
find_chunks: find_chunks.or(default_timeout), find_chunks: find_chunks.or(default_timeout),
announce_file: announce_file.or(default_timeout), announce_file: announce_file.or(default_timeout),
@ -129,6 +141,7 @@ impl GossipCache {
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) { pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
let expire_timeout = match topic.kind() { let expire_timeout = match topic.kind() {
GossipKind::Example => self.example, GossipKind::Example => self.example,
GossipKind::NewFile => self.new_file,
GossipKind::FindFile => self.find_file, GossipKind::FindFile => self.find_file,
GossipKind::FindChunks => self.find_chunks, GossipKind::FindChunks => self.find_chunks,
GossipKind::AnnounceFile => self.announce_file, GossipKind::AnnounceFile => self.announce_file,

View File

@ -232,6 +232,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into(); let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
topic.hash() topic.hash()
}; };
params
.topics
.insert(get_hash(GossipKind::NewFile), TopicScoreParams::default());
params params
.topics .topics
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default()); .insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());

View File

@ -7,7 +7,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals; pub use globals::NetworkGlobals;
pub use pubsub::{ pub use pubsub::{
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig, PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
SignedMessage, SnappyTransform, SignedMessage, SnappyTransform,
}; };

View File

@ -114,6 +114,14 @@ impl ssz::Decode for WrappedPeerId {
} }
} }
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct NewFile {
pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
pub timestamp: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct FindFile { pub struct FindFile {
pub tx_id: TxID, pub tx_id: TxID,
@ -205,6 +213,7 @@ type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum PubsubMessage { pub enum PubsubMessage {
ExampleMessage(u64), ExampleMessage(u64),
NewFile(NewFile),
FindFile(FindFile), FindFile(FindFile),
FindChunks(FindChunks), FindChunks(FindChunks),
AnnounceFile(Vec<SignedAnnounceFile>), AnnounceFile(Vec<SignedAnnounceFile>),
@ -283,6 +292,7 @@ impl PubsubMessage {
pub fn kind(&self) -> GossipKind { pub fn kind(&self) -> GossipKind {
match self { match self {
PubsubMessage::ExampleMessage(_) => GossipKind::Example, PubsubMessage::ExampleMessage(_) => GossipKind::Example,
PubsubMessage::NewFile(_) => GossipKind::NewFile,
PubsubMessage::FindFile(_) => GossipKind::FindFile, PubsubMessage::FindFile(_) => GossipKind::FindFile,
PubsubMessage::FindChunks(_) => GossipKind::FindChunks, PubsubMessage::FindChunks(_) => GossipKind::FindChunks,
PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile, PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile,
@ -309,6 +319,9 @@ impl PubsubMessage {
GossipKind::Example => Ok(PubsubMessage::ExampleMessage( GossipKind::Example => Ok(PubsubMessage::ExampleMessage(
u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)), )),
GossipKind::NewFile => Ok(PubsubMessage::NewFile(
NewFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)),
GossipKind::FindFile => Ok(PubsubMessage::FindFile( GossipKind::FindFile => Ok(PubsubMessage::FindFile(
FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)), )),
@ -341,6 +354,7 @@ impl PubsubMessage {
// messages for us. // messages for us.
match &self { match &self {
PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(), PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(),
PubsubMessage::NewFile(data) => data.as_ssz_bytes(),
PubsubMessage::FindFile(data) => data.as_ssz_bytes(), PubsubMessage::FindFile(data) => data.as_ssz_bytes(),
PubsubMessage::FindChunks(data) => data.as_ssz_bytes(), PubsubMessage::FindChunks(data) => data.as_ssz_bytes(),
PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(), PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(),
@ -356,6 +370,9 @@ impl std::fmt::Display for PubsubMessage {
PubsubMessage::ExampleMessage(msg) => { PubsubMessage::ExampleMessage(msg) => {
write!(f, "Example message: {}", msg) write!(f, "Example message: {}", msg)
} }
PubsubMessage::NewFile(msg) => {
write!(f, "NewFile message: {:?}", msg)
}
PubsubMessage::FindFile(msg) => { PubsubMessage::FindFile(msg) => {
write!(f, "FindFile message: {:?}", msg) write!(f, "FindFile message: {:?}", msg)
} }

View File

@ -8,13 +8,15 @@ use strum::AsRefStr;
pub const TOPIC_PREFIX: &str = "eth2"; pub const TOPIC_PREFIX: &str = "eth2";
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
pub const EXAMPLE_TOPIC: &str = "example"; pub const EXAMPLE_TOPIC: &str = "example";
pub const NEW_FILE_TOPIC: &str = "new_file";
pub const FIND_FILE_TOPIC: &str = "find_file"; pub const FIND_FILE_TOPIC: &str = "find_file";
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks"; pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file"; pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks"; pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config"; pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
pub const CORE_TOPICS: [GossipKind; 4] = [ pub const CORE_TOPICS: [GossipKind; 5] = [
GossipKind::NewFile,
GossipKind::FindFile, GossipKind::FindFile,
GossipKind::FindChunks, GossipKind::FindChunks,
GossipKind::AnnounceFile, GossipKind::AnnounceFile,
@ -37,6 +39,7 @@ pub struct GossipTopic {
#[strum(serialize_all = "snake_case")] #[strum(serialize_all = "snake_case")]
pub enum GossipKind { pub enum GossipKind {
Example, Example,
NewFile,
FindFile, FindFile,
FindChunks, FindChunks,
AnnounceFile, AnnounceFile,
@ -77,6 +80,7 @@ impl GossipTopic {
let kind = match topic_parts[2] { let kind = match topic_parts[2] {
EXAMPLE_TOPIC => GossipKind::Example, EXAMPLE_TOPIC => GossipKind::Example,
NEW_FILE_TOPIC => GossipKind::NewFile,
FIND_FILE_TOPIC => GossipKind::FindFile, FIND_FILE_TOPIC => GossipKind::FindFile,
FIND_CHUNKS_TOPIC => GossipKind::FindChunks, FIND_CHUNKS_TOPIC => GossipKind::FindChunks,
ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile, ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile,
@ -106,6 +110,7 @@ impl From<GossipTopic> for String {
let kind = match topic.kind { let kind = match topic.kind {
GossipKind::Example => EXAMPLE_TOPIC, GossipKind::Example => EXAMPLE_TOPIC,
GossipKind::NewFile => NEW_FILE_TOPIC,
GossipKind::FindFile => FIND_FILE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC,
GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
@ -125,6 +130,7 @@ impl std::fmt::Display for GossipTopic {
let kind = match self.kind { let kind = match self.kind {
GossipKind::Example => EXAMPLE_TOPIC, GossipKind::Example => EXAMPLE_TOPIC,
GossipKind::NewFile => NEW_FILE_TOPIC,
GossipKind::FindFile => FIND_FILE_TOPIC, GossipKind::FindFile => FIND_FILE_TOPIC,
GossipKind::FindChunks => FIND_CHUNKS_TOPIC, GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC, GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,

View File

@ -316,6 +316,8 @@ impl Libp2pEventHandler {
match message { match message {
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
// TODO qbit: handle the NewFile pubsub message
PubsubMessage::NewFile(_) => todo!(),
PubsubMessage::FindFile(msg) => { PubsubMessage::FindFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
self.on_find_file(msg).await self.on_find_file(msg).await