Publish NewFile message when any file finalized

This commit is contained in:
boqiu 2024-10-23 17:05:55 +08:00
parent e06936f381
commit d12bf66129
3 changed files with 22 additions and 11 deletions

View File

@ -5,11 +5,14 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::types::NewFile;
use network::PubsubMessage;
use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService, Swarm,
};
use pruner::PrunerMessage;
use shared_types::timestamp_now;
use std::sync::Arc;
use storage::log_store::Store as LogStore;
use storage_async::Store;
@ -44,6 +47,8 @@ pub struct RouterService {
/// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP
/// port).
upnp_mappings: (Option<u16>, Option<u16>),
store: Arc<dyn LogStore>,
}
impl RouterService {
@ -63,7 +68,6 @@ impl RouterService {
local_keypair: Keypair,
config: Config,
) {
let store = Store::new(store, executor.clone());
let peers = Arc::new(RwLock::new(PeerManager::new(config.clone())));
// create the network service and spawn the task
@ -81,11 +85,12 @@ impl RouterService {
sync_send,
chunk_pool_send,
local_keypair,
store,
Store::new(store.clone(), executor.clone()),
file_location_cache,
peers,
),
upnp_mappings: (None, None),
store,
};
// spawn service
@ -328,14 +333,15 @@ impl RouterService {
}
}
NetworkMessage::AnnounceLocalFile { tx_id } => {
if self
.libp2p_event_handler
.publish_file(tx_id)
.await
.is_some()
{
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
}
let shard_config = self.store.get_shard_config();
let msg = PubsubMessage::NewFile(NewFile {
tx_id,
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
timestamp: timestamp_now(),
});
self.libp2p.swarm.behaviour_mut().publish(vec![msg]);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
}
NetworkMessage::UPnPMappingEstablished {
tcp_socket,

View File

@ -545,6 +545,8 @@ impl SerialSyncController {
info!(%self.tx_seq, "Succeeded to finalize file");
self.state = SyncState::Completed;
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
self.ctx
.send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
}
Ok(false) => {
warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx");

View File

@ -642,7 +642,10 @@ impl SyncService {
Some(s) => s,
None => {
debug!(%tx.seq, "No more data needed");
self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?;
if self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await? {
self.ctx
.send(NetworkMessage::AnnounceLocalFile { tx_id: tx.id() });
}
return Ok(());
}
};