From 6b2420cac4a09e91aab7a7bddf891dbdd229f898 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:54:01 +0800 Subject: [PATCH] Refactor find file (#294) * refactor find file gossip * refactor find chunks gossip * refactor announce file gossip * fix announce file issue * refactor find chunks gossip --- .../src/file_location_cache.rs | 7 +- node/file_location_cache/src/test_util.rs | 15 +- node/network/src/types/mod.rs | 2 +- node/network/src/types/pubsub.rs | 35 ++-- node/network/src/types/topics.rs | 8 +- node/router/src/libp2p_event_handler.rs | 161 ++++++------------ node/router/src/metrics.rs | 5 +- node/shared_types/src/lib.rs | 9 + node/sync/src/controllers/serial.rs | 30 ++-- node/sync/src/service.rs | 48 ++---- 10 files changed, 125 insertions(+), 195 deletions(-) diff --git a/node/file_location_cache/src/file_location_cache.rs b/node/file_location_cache/src/file_location_cache.rs index e5ee6e1..fc81f58 100644 --- a/node/file_location_cache/src/file_location_cache.rs +++ b/node/file_location_cache/src/file_location_cache.rs @@ -297,10 +297,9 @@ impl FileLocationCache { INSERT_BATCH.update(announcement.tx_ids.len() as u64); let peer_id = *announcement.peer_id; - // FIXME: Check validity. - let shard_config = ShardConfig { - shard_id: announcement.shard_id, - num_shard: announcement.num_shard, + let shard_config = match ShardConfig::try_from(announcement.shard_config) { + Ok(v) => v, + Err(_) => return, }; self.insert_peer_config(peer_id, shard_config); diff --git a/node/file_location_cache/src/test_util.rs b/node/file_location_cache/src/test_util.rs index b43f6d2..8e8da73 100644 --- a/node/file_location_cache/src/test_util.rs +++ b/node/file_location_cache/src/test_util.rs @@ -1,6 +1,6 @@ use network::{ libp2p::identity, - types::{AnnounceFile, SignedAnnounceFile, SignedMessage}, + types::{AnnounceFile, SignedAnnounceFile, SignedMessage, TimedMessage}, Multiaddr, PeerId, }; use shared_types::{timestamp_now, TxID}; @@ -34,12 +34,13 @@ impl AnnounceFileBuilder { let at: Multiaddr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap(); let timestamp = self.timestamp.unwrap_or_else(timestamp_now); - let msg = AnnounceFile { - tx_ids: vec![tx_id], - num_shard: 1, - shard_id: 0, - peer_id: peer_id.into(), - at: at.into(), + let msg = TimedMessage { + inner: AnnounceFile { + tx_ids: vec![tx_id], + shard_config: Default::default(), + peer_id: peer_id.into(), + at: at.into(), + }, timestamp, }; diff --git a/node/network/src/types/mod.rs b/node/network/src/types/mod.rs index 87058fe..0f12720 100644 --- a/node/network/src/types/mod.rs +++ b/node/network/src/types/mod.rs @@ -8,6 +8,6 @@ pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::{ AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, PubsubMessage, - SignedAnnounceChunks, SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage, + SignedAnnounceFile, SignedMessage, SnappyTransform, TimedMessage, }; pub use topics::{GossipEncoding, GossipKind, GossipTopic}; diff --git a/node/network/src/types/pubsub.rs b/node/network/src/types/pubsub.rs index 2dc2cc9..28ca741 100644 --- a/node/network/src/types/pubsub.rs +++ b/node/network/src/types/pubsub.rs @@ -117,11 +117,7 @@ impl ssz::Decode for WrappedPeerId { #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct FindFile { pub tx_id: TxID, - pub num_shard: usize, - pub shard_id: usize, - /// Indicates whether publish to neighboar nodes only. - pub neighbors_only: bool, - pub timestamp: u32, + pub maybe_shard_config: Option, } #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] @@ -129,17 +125,14 @@ pub struct FindChunks { pub tx_id: TxID, pub index_start: u64, // inclusive pub index_end: u64, // exclusive - pub timestamp: u32, } #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] pub struct AnnounceFile { pub tx_ids: Vec, - pub num_shard: usize, - pub shard_id: usize, + pub shard_config: ShardConfig, pub peer_id: WrappedPeerId, pub at: WrappedMultiaddr, - pub timestamp: u32, } #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] @@ -149,7 +142,6 @@ pub struct AnnounceChunks { pub index_end: u64, // exclusive pub peer_id: WrappedPeerId, pub at: WrappedMultiaddr, - pub timestamp: u32, } #[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode)] @@ -214,9 +206,7 @@ impl HasSignature for SignedMessage { } } -pub type SignedAnnounceFile = SignedMessage; -pub type SignedAnnounceChunks = SignedMessage; - +pub type SignedAnnounceFile = SignedMessage>; type SignedAnnounceFiles = Vec; #[derive(Debug, Clone, PartialEq, Eq)] @@ -226,11 +216,16 @@ pub enum PubsubMessage { NewFile(TimedMessage), /// Published to neighbors for file sync, and answered by `AnswerFile` RPC. AskFile(TimedMessage), - FindFile(FindFile), - FindChunks(FindChunks), + /// Published to network to find specified file. + FindFile(TimedMessage), + /// Published to network to find specified chunks. + FindChunks(TimedMessage), + /// Published to network to announce file. AnnounceFile(Vec), + /// Published to network to announce shard config. AnnounceShardConfig(TimedMessage), - AnnounceChunks(SignedAnnounceChunks), + /// Published to network to announce chunks. + AnnounceChunks(TimedMessage), } // Implements the `DataTransform` trait of gossipsub to employ snappy compression @@ -341,17 +336,19 @@ impl PubsubMessage { .map_err(|e| format!("{:?}", e))?, )), GossipKind::FindFile => Ok(PubsubMessage::FindFile( - FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, + TimedMessage::::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, )), GossipKind::FindChunks => Ok(PubsubMessage::FindChunks( - FindChunks::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?, + TimedMessage::::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, )), GossipKind::AnnounceFile => Ok(PubsubMessage::AnnounceFile( SignedAnnounceFiles::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, )), GossipKind::AnnounceChunks => Ok(PubsubMessage::AnnounceChunks( - SignedAnnounceChunks::from_ssz_bytes(data) + TimedMessage::::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, )), GossipKind::AnnounceShardConfig => Ok(PubsubMessage::AnnounceShardConfig( diff --git a/node/network/src/types/topics.rs b/node/network/src/types/topics.rs index 9552773..683b5ef 100644 --- a/node/network/src/types/topics.rs +++ b/node/network/src/types/topics.rs @@ -10,10 +10,10 @@ pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const EXAMPLE_TOPIC: &str = "example"; pub const NEW_FILE_TOPIC: &str = "new_file_v2"; pub const ASK_FILE_TOPIC: &str = "ask_file"; -pub const FIND_FILE_TOPIC: &str = "find_file"; -pub const FIND_CHUNKS_TOPIC: &str = "find_chunks"; -pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file"; -pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks"; +pub const FIND_FILE_TOPIC: &str = "find_file_v2"; +pub const FIND_CHUNKS_TOPIC: &str = "find_chunks_v2"; +pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file_v2"; +pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks_v2"; pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config_v2"; /// A gossipsub topic which encapsulates the type of messages that should be sent and received over diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index feed888..f37f972 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -9,8 +9,8 @@ use network::types::TimedMessage; use network::{ rpc::StatusMessage, types::{ - AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, SignedAnnounceChunks, - SignedAnnounceFile, SignedMessage, + AnnounceChunks, AnnounceFile, FindChunks, FindFile, HasSignature, SignedAnnounceFile, + SignedMessage, }, Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId, PublicKey, PubsubMessage, Request, RequestId, Response, @@ -375,10 +375,7 @@ impl Libp2pEventHandler { PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore, PubsubMessage::NewFile(msg) => self.on_new_file(propagation_source, msg).await, PubsubMessage::AskFile(msg) => self.on_ask_file(propagation_source, msg).await, - PubsubMessage::FindFile(msg) => { - metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1); - self.on_find_file(propagation_source, msg).await - } + PubsubMessage::FindFile(msg) => self.on_find_file(propagation_source, msg).await, PubsubMessage::FindChunks(msg) => self.on_find_chunks(propagation_source, msg).await, PubsubMessage::AnnounceFile(msgs) => { metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1); @@ -573,12 +570,13 @@ impl Libp2pEventHandler { let timestamp = timestamp_now(); let shard_config = self.store.get_store().get_shard_config(); - let msg = AnnounceFile { - tx_ids, - num_shard: shard_config.num_shard, - shard_id: shard_config.shard_id, - peer_id: peer_id.into(), - at: addr.into(), + let msg = TimedMessage { + inner: AnnounceFile { + tx_ids, + shard_config: shard_config.into(), + peer_id: peer_id.into(), + at: addr.into(), + }, timestamp, }; @@ -595,80 +593,44 @@ impl Libp2pEventHandler { Some(signed) } - async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance { - let FindFile { - tx_id, timestamp, .. - } = msg; - + async fn on_find_file(&self, from: PeerId, msg: TimedMessage) -> MessageAcceptance { // verify timestamp - let d = duration_since( - timestamp, - metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(), - ); - let timeout = if msg.neighbors_only { - *PUBSUB_TIMEOUT_NEIGHBORS - } else { - *PUBSUB_TIMEOUT_NETWORK - }; - if d < TOLERABLE_DRIFT.neg() || d > timeout { - debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message"); - metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1); - if msg.neighbors_only { - self.send_to_network(NetworkMessage::ReportPeer { - peer_id: from, - action: PeerAction::LowToleranceError, - source: ReportSource::Gossipsub, - msg: "Received out of date FindFile message", - }); - } + if !metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.verify_timestamp( + from, + msg.timestamp, + *PUBSUB_TIMEOUT_NETWORK, + None, + ) { return MessageAcceptance::Ignore; } - // verify announced shard config - let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) { - Ok(v) => v, - Err(_) => return MessageAcceptance::Reject, - }; - - // handle on shard config mismatch - let my_shard_config = self.store.get_store().get_shard_config(); - if !my_shard_config.intersect(&announced_shard_config) { - return if msg.neighbors_only { - MessageAcceptance::Ignore - } else { - MessageAcceptance::Accept + // verify announced shard config if specified + if let Some(shard_config) = msg.maybe_shard_config { + let announced_shard_config = match ShardConfig::try_from(shard_config) { + Ok(v) => v, + Err(_) => return MessageAcceptance::Reject, }; + + // forward FIND_FILE to the network if shard config mismatch + let my_shard_config = self.store.get_store().get_shard_config(); + if !my_shard_config.intersect(&announced_shard_config) { + return MessageAcceptance::Accept; + } } // check if we have it + let tx_id = msg.tx_id; if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) { if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await { if tx.id() == tx_id { trace!(?tx_id, "Found file locally, responding to FindFile query"); - - if msg.neighbors_only { - // announce file via RPC to avoid flooding pubsub message - self.send_to_network(NetworkMessage::SendRequest { - peer_id: from, - request: Request::AnswerFile(ShardedFile { - tx_id, - shard_config: my_shard_config.into(), - }), - request_id: RequestId::Router(Instant::now()), - }); - } else if self.publish_file(tx_id).await.is_some() { - metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1); - return MessageAcceptance::Ignore; - } + self.publish_file(tx_id).await; + metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1); + return MessageAcceptance::Ignore; } } } - // do not forward to whole network if only find file from neighbor nodes - if msg.neighbors_only { - return MessageAcceptance::Ignore; - } - // try from cache if let Some(mut msg) = self.file_location_cache.get_one(tx_id) { trace!(?tx_id, "Found file in cache, responding to FindFile query"); @@ -694,7 +656,6 @@ impl Libp2pEventHandler { ) -> Option { let peer_id = *self.network_globals.peer_id.read(); let addr = self.construct_announced_ip().await?; - let timestamp = timestamp_now(); let msg = AnnounceChunks { tx_id, @@ -702,26 +663,15 @@ impl Libp2pEventHandler { index_end, peer_id: peer_id.into(), at: addr.into(), - timestamp, }; - let mut signed = match SignedMessage::sign_message(msg, &self.local_keypair) { - Ok(signed) => signed, - Err(e) => { - error!(%tx_id.seq, %e, "Failed to sign AnnounceChunks message"); - return None; - } - }; - - signed.resend_timestamp = timestamp; - - Some(PubsubMessage::AnnounceChunks(signed)) + Some(PubsubMessage::AnnounceChunks(msg.into())) } async fn on_find_chunks( &self, propagation_source: PeerId, - msg: FindChunks, + msg: TimedMessage, ) -> MessageAcceptance { // verify timestamp if !metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.verify_timestamp( @@ -851,7 +801,7 @@ impl Libp2pEventHandler { } // verify announced shard config - let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) { + let announced_shard_config = match ShardConfig::try_from(msg.shard_config) { Ok(v) => v, Err(_) => return MessageAcceptance::Reject, }; @@ -862,7 +812,7 @@ impl Libp2pEventHandler { metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY.clone(), ); if d < TOLERABLE_DRIFT.neg() || d > *PUBSUB_TIMEOUT_NETWORK { - debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceFile message"); + debug!(?d, %propagation_source, "Invalid resend timestamp, ignoring AnnounceFile message"); metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT.mark(1); return MessageAcceptance::Ignore; } @@ -922,7 +872,7 @@ impl Libp2pEventHandler { fn on_announce_chunks( &self, propagation_source: PeerId, - msg: SignedAnnounceChunks, + msg: TimedMessage, ) -> MessageAcceptance { // verify timestamp if !metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS.verify_timestamp( @@ -934,11 +884,6 @@ impl Libp2pEventHandler { return MessageAcceptance::Ignore; } - // verify message signature - if !verify_signature(&msg, &msg.peer_id, propagation_source) { - return MessageAcceptance::Reject; - } - // verify public ip address if required let addr = msg.at.clone().into(); if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) { @@ -1009,23 +954,17 @@ impl Libp2pEventHandler { true } - async fn publish_file(&self, tx_id: TxID) -> Option { - match self.file_batcher.write().await.add(tx_id) { - Some(batch) => { - let announcement = self.construct_announce_file_message(batch).await?; - Some(self.publish_announcement(announcement).await) + async fn publish_file(&self, tx_id: TxID) { + if let Some(batch) = self.file_batcher.write().await.add(tx_id) { + if let Some(announcement) = self.construct_announce_file_message(batch).await { + self.publish_announcement(announcement).await; } - None => Some(false), } } - async fn publish_announcement(&self, announcement: SignedAnnounceFile) -> bool { - match self.announcement_batcher.write().await.add(announcement) { - Some(batch) => { - self.publish(PubsubMessage::AnnounceFile(batch)); - true - } - None => false, + async fn publish_announcement(&self, announcement: SignedAnnounceFile) { + if let Some(batch) = self.announcement_batcher.write().await.add(announcement) { + self.publish(PubsubMessage::AnnounceFile(batch)); } } @@ -1378,11 +1317,11 @@ mod tests { ) -> MessageAcceptance { let (alice, bob) = (PeerId::random(), PeerId::random()); let id = MessageId::new(b"dummy message"); - let message = PubsubMessage::FindFile(FindFile { - tx_id, - num_shard: 1, - shard_id: 0, - neighbors_only: false, + let message = PubsubMessage::FindFile(TimedMessage { + inner: FindFile { + tx_id, + maybe_shard_config: None, + }, timestamp, }); handler.on_pubsub_message(alice, bob, &id, message).await @@ -1471,7 +1410,7 @@ mod tests { .await .unwrap(); let malicious_addr: Multiaddr = "/ip4/127.0.0.38/tcp/30000".parse().unwrap(); - file.inner.at = malicious_addr.into(); + file.inner.inner.at = malicious_addr.into(); let message = PubsubMessage::AnnounceFile(vec![file]); // failed to verify signature diff --git a/node/router/src/metrics.rs b/node/router/src/metrics.rs index a6cf06c..d291c14 100644 --- a/node/router/src/metrics.rs +++ b/node/router/src/metrics.rs @@ -74,12 +74,11 @@ lazy_static::lazy_static! { pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: PubsubMsgHandleMetrics = PubsubMsgHandleMetrics::new("announce_shard"); // libp2p_event_handler: find & announce file - pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps"); - pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024); - pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "timeout"); + pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: PubsubMsgHandleMetrics = PubsubMsgHandleMetrics::new("find_file"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "store"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_CACHE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "cache"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_FORWARD: Arc = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "forward"); + pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "qps"); pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_LATENCY: Arc = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_announce_file", "latency", 1024); pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE_TIMEOUT: Arc = register_meter_with_group("router_libp2p_handle_pubsub_announce_file", "timeout"); diff --git a/node/shared_types/src/lib.rs b/node/shared_types/src/lib.rs index 18f75f9..1043858 100644 --- a/node/shared_types/src/lib.rs +++ b/node/shared_types/src/lib.rs @@ -409,6 +409,15 @@ pub struct ShardConfig { pub shard_id: usize, } +impl Default for ShardConfig { + fn default() -> Self { + ShardConfig { + num_shard: 1, + shard_id: 0, + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, DeriveEncode, DeriveDecode)] pub struct ShardedFile { pub tx_id: TxID, diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 7c4f885..0865b3e 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -10,7 +10,7 @@ use network::{ PeerAction, PeerId, PubsubMessage, SyncId as RequestId, }; use rand::Rng; -use shared_types::{timestamp_now, ChunkArrayWithProof, ShardedFile, TxID, CHUNK_SIZE}; +use shared_types::{ChunkArrayWithProof, ShardedFile, TxID, CHUNK_SIZE}; use ssz::Encode; use std::{sync::Arc, time::Instant}; use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE}; @@ -220,25 +220,27 @@ impl SerialSyncController { .into(), ) } else { - PubsubMessage::FindFile(FindFile { - tx_id: self.tx_id, - num_shard: shard_config.num_shard, - shard_id: shard_config.shard_id, - neighbors_only: self.config.neighbors_only, - timestamp: timestamp_now(), - }) + PubsubMessage::FindFile( + FindFile { + tx_id: self.tx_id, + maybe_shard_config: Some(shard_config.into()), + } + .into(), + ) }; self.ctx.publish(msg); } fn publish_find_chunks(&self) { - self.ctx.publish(PubsubMessage::FindChunks(FindChunks { - tx_id: self.tx_id, - index_start: self.goal.index_start, - index_end: self.goal.index_end, - timestamp: timestamp_now(), - })); + self.ctx.publish(PubsubMessage::FindChunks( + FindChunks { + tx_id: self.tx_id, + index_start: self.goal.index_start, + index_end: self.goal.index_end, + } + .into(), + )); } /// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 15f18ba..90e1c31 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -13,9 +13,7 @@ use network::{ rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, NetworkSender, PeerId, PeerRequestId, PubsubMessage, SyncId as RequestId, }; -use shared_types::{ - bytes_to_chunks, timestamp_now, ChunkArrayWithProof, ShardedFile, Transaction, TxID, -}; +use shared_types::{bytes_to_chunks, ChunkArrayWithProof, ShardedFile, Transaction, TxID}; use std::sync::atomic::Ordering; use std::{ cmp, @@ -580,9 +578,7 @@ impl SyncService { async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> { // file already exists - if self.store.check_tx_completed(tx_seq).await? - || self.store.check_tx_pruned(tx_seq).await? - { + if self.store.get_store().get_tx_status(tx_seq)?.is_some() { return Ok(()); } // broadcast find file @@ -590,14 +586,13 @@ impl SyncService { Some(tx) => tx, None => bail!("Transaction not found"), }; - let shard_config = self.store.get_store().get_shard_config(); - self.ctx.publish(PubsubMessage::FindFile(FindFile { - tx_id: tx.id(), - num_shard: shard_config.num_shard, - shard_id: shard_config.shard_id, - neighbors_only: false, - timestamp: timestamp_now(), - })); + self.ctx.publish(PubsubMessage::FindFile( + FindFile { + tx_id: tx.id(), + maybe_shard_config: None, + } + .into(), + )); Ok(()) } @@ -646,10 +641,8 @@ impl SyncService { }; // file already exists - if self.store.check_tx_completed(tx_seq).await? - || self.store.check_tx_pruned(tx_seq).await? - { - bail!("File already exists"); + if let Some(status) = self.store.get_store().get_tx_status(tx_seq)? { + bail!("File already exists [{:?}]", status); } let (index_start, index_end, all_chunks) = match maybe_range { @@ -724,21 +717,12 @@ impl SyncService { return; } - // File already exists and ignore the AnnounceFile message - match self.store.check_tx_completed(tx_seq).await { - Ok(true) => return, - Ok(false) => {} + // File already exists or pruned, just ignore the AnnounceFile message + match self.store.get_store().get_tx_status(tx_seq) { + Ok(Some(_)) => return, + Ok(None) => {} Err(err) => { - error!(%tx_seq, %err, "Failed to check if file finalized"); - return; - } - } - - match self.store.check_tx_pruned(tx_seq).await { - Ok(true) => return, - Ok(false) => {} - Err(err) => { - error!(%tx_seq, %err, "Failed to check if file pruned"); + error!(%tx_seq, %err, "Failed to get tx status"); return; } }