Add shard config in FindFile

This commit is contained in:
boqiu 2024-10-24 15:43:28 +08:00
parent ceb165d79b
commit 09b34fbf07
4 changed files with 30 additions and 2 deletions

View File

@ -125,6 +125,8 @@ pub struct NewFile {
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct FindFile { pub struct FindFile {
pub tx_id: TxID, pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
pub timestamp: u32, pub timestamp: u32,
} }

View File

@ -544,7 +544,9 @@ impl Libp2pEventHandler {
} }
async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance { async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance {
let FindFile { tx_id, timestamp } = msg; let FindFile {
tx_id, timestamp, ..
} = msg;
// verify timestamp // verify timestamp
let d = duration_since( let d = duration_since(
@ -557,6 +559,19 @@ impl Libp2pEventHandler {
return MessageAcceptance::Ignore; return MessageAcceptance::Ignore;
} }
// verify announced shard config
let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
Ok(v) => v,
Err(_) => return MessageAcceptance::Reject,
};
// propagate FindFile query to other nodes if shard mismatch
let my_shard_config = self.store.get_store().get_shard_config();
if !my_shard_config.intersect(&announced_shard_config) {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD.mark(1);
return MessageAcceptance::Accept;
}
// check if we have it // check if we have it
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
@ -1261,7 +1276,12 @@ mod tests {
) -> MessageAcceptance { ) -> MessageAcceptance {
let (alice, bob) = (PeerId::random(), PeerId::random()); let (alice, bob) = (PeerId::random(), PeerId::random());
let id = MessageId::new(b"dummy message"); let id = MessageId::new(b"dummy message");
let message = PubsubMessage::FindFile(FindFile { tx_id, timestamp }); let message = PubsubMessage::FindFile(FindFile {
tx_id,
num_shard: 1,
shard_id: 0,
timestamp,
});
handler.on_pubsub_message(alice, bob, &id, message).await handler.on_pubsub_message(alice, bob, &id, message).await
} }

View File

@ -199,8 +199,11 @@ impl SerialSyncController {
return (false, num_new_peers); return (false, num_new_peers);
} }
let shard_config = self.store.get_store().get_shard_config();
self.ctx.publish(PubsubMessage::FindFile(FindFile { self.ctx.publish(PubsubMessage::FindFile(FindFile {
tx_id: self.tx_id, tx_id: self.tx_id,
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
timestamp: timestamp_now(), timestamp: timestamp_now(),
})); }));

View File

@ -585,8 +585,11 @@ impl SyncService {
Some(tx) => tx, Some(tx) => tx,
None => bail!("Transaction not found"), None => bail!("Transaction not found"),
}; };
let shard_config = self.store.get_store().get_shard_config();
self.ctx.publish(PubsubMessage::FindFile(FindFile { self.ctx.publish(PubsubMessage::FindFile(FindFile {
tx_id: tx.id(), tx_id: tx.id(),
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
timestamp: timestamp_now(), timestamp: timestamp_now(),
})); }));
Ok(()) Ok(())