From 09b34fbf07f84f758724f3e79cd00e7b1ace0d7f Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 15:43:28 +0800 Subject: [PATCH] Add shard config in FindFile --- node/network/src/types/pubsub.rs | 2 ++ node/router/src/libp2p_event_handler.rs | 24 ++++++++++++++++++++++-- node/sync/src/controllers/serial.rs | 3 +++ node/sync/src/service.rs | 3 +++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 700878b..717fe0c 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -125,6 +125,8 @@ pub struct NewFile { #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct FindFile { pub tx_id: TxID, + pub num_shard: usize, + pub shard_id: usize, pub timestamp: u32, } diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index bccfb2b..9dff174 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -544,7 +544,9 @@ impl Libp2pEventHandler { } async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance { - let FindFile { tx_id, timestamp } = msg; + let FindFile { + tx_id, timestamp, .. + } = msg; // verify timestamp let d = duration_since( @@ -557,6 +559,19 @@ impl Libp2pEventHandler { return MessageAcceptance::Ignore; } + // verify announced shard config + let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) { + Ok(v) => v, + Err(_) => return MessageAcceptance::Reject, + }; + + // propagate FindFile query to other nodes if shard mismatch + let my_shard_config = self.store.get_store().get_shard_config(); + if !my_shard_config.intersect(&announced_shard_config) { + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1); + return MessageAcceptance::Accept; + } + // check if we have it if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { @@ -1261,7 +1276,12 @@ mod tests { ) -> MessageAcceptance { let (alice, bob) = (PeerId::random(), PeerId::random()); let id = MessageId::new(b"dummy message"); - let message = PubsubMessage::FindFile(FindFile { tx_id, timestamp }); + let message = PubsubMessage::FindFile(FindFile { + tx_id, + num_shard: 1, + shard_id: 0, + timestamp, + }); handler.on_pubsub_message(alice, bob, &id, message).await } diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 6b4f5e4..d4072bb 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -199,8 +199,11 @@ impl SerialSyncController { return (false, num_new_peers); } + let shard_config = self.store.get_store().get_shard_config(); self.ctx.publish(PubsubMessage::FindFile(FindFile { tx_id: self.tx_id, + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, timestamp: timestamp_now(), })); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index d7de9c0..9c164a3 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -585,8 +585,11 @@ impl SyncService { Some(tx) => tx, None => bail!("Transaction not found"), }; + let shard_config = self.store.get_store().get_shard_config(); self.ctx.publish(PubsubMessage::FindFile(FindFile { tx_id: tx.id(), + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, timestamp: timestamp_now(), })); Ok(())