From d12bf66129a9754e1264dd48442eacc72994c8f9 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 23 Oct 2024 17:05:55 +0800 Subject: [PATCH] Publish NewFile message when any file finalized --- node/router/src/service.rs | 26 ++++++++++++++++---------- node/sync/src/controllers/serial.rs | 2 ++ node/sync/src/service.rs | 5 ++++- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 938c2c3..94bbeb5 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -5,11 +5,14 @@ use chunk_pool::ChunkPoolMessage; use file_location_cache::FileLocationCache; use futures::{channel::mpsc::Sender, prelude::*}; use miner::MinerMessage; +use network::types::NewFile; +use network::PubsubMessage; use network::{ BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId, Service as LibP2PService, Swarm, }; use pruner::PrunerMessage; +use shared_types::timestamp_now; use std::sync::Arc; use storage::log_store::Store as LogStore; use storage_async::Store; @@ -44,6 +47,8 @@ pub struct RouterService { /// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP /// port). upnp_mappings: (Option, Option), + + store: Arc, } impl RouterService { @@ -63,7 +68,6 @@ impl RouterService { local_keypair: Keypair, config: Config, ) { - let store = Store::new(store, executor.clone()); let peers = Arc::new(RwLock::new(PeerManager::new(config.clone()))); // create the network service and spawn the task @@ -81,11 +85,12 @@ impl RouterService { sync_send, chunk_pool_send, local_keypair, - store, + Store::new(store.clone(), executor.clone()), file_location_cache, peers, ), upnp_mappings: (None, None), + store, }; // spawn service @@ -328,14 +333,15 @@ impl RouterService { } } NetworkMessage::AnnounceLocalFile { tx_id } => { - if self - .libp2p_event_handler - .publish_file(tx_id) - .await - .is_some() - { - metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); - } + let shard_config = self.store.get_shard_config(); + let msg = PubsubMessage::NewFile(NewFile { + tx_id, + num_shard: shard_config.num_shard, + shard_id: shard_config.shard_id, + timestamp: timestamp_now(), + }); + self.libp2p.swarm.behaviour_mut().publish(vec![msg]); + metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1); } NetworkMessage::UPnPMappingEstablished { tcp_socket, diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index dcf724a..6b4f5e4 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -545,6 +545,8 @@ impl SerialSyncController { info!(%self.tx_seq, "Succeeded to finalize file"); self.state = SyncState::Completed; metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0); + self.ctx + .send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id }); } Ok(false) => { warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx"); diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 7efba4e..fee3f52 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -642,7 +642,10 @@ impl SyncService { Some(s) => s, None => { debug!(%tx.seq, "No more data needed"); - self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?; + if self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await? { + self.ctx + .send(NetworkMessage::AnnounceLocalFile { tx_id: tx.id() }); + } return Ok(()); } };