Compare commits

...

3 Commits

Author SHA1 Message Date
peilun-conflux
d792cc9979
Merge 07704fedc9 into 1de7afec14 2024-11-12 17:26:12 +08:00
Bo QIU
1de7afec14
Add more metrics for network unbounded channel (#264)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Add metrics for file finalization in chunk pool

* Add metrics for network unbounded channel
2024-11-12 17:25:49 +08:00
Peilun Li
07704fedc9 fix: wait for async storage to complete before receiving the result. 2024-10-30 18:51:45 +08:00
18 changed files with 92 additions and 82 deletions

3
Cargo.lock generated
View File

@ -912,7 +912,9 @@ dependencies = [
"anyhow", "anyhow",
"async-lock 2.8.0", "async-lock 2.8.0",
"hashlink 0.8.4", "hashlink 0.8.4",
"lazy_static",
"log_entry_sync", "log_entry_sync",
"metrics",
"network", "network",
"shared_types", "shared_types",
"storage-async", "storage-async",
@ -5030,6 +5032,7 @@ dependencies = [
name = "network" name = "network"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"channel",
"directory", "directory",
"dirs 4.0.0", "dirs 4.0.0",
"discv5", "discv5",

View File

@ -13,3 +13,5 @@ tokio = { version = "1.19.2", features = ["sync"] }
async-lock = "2.5.0" async-lock = "2.5.0"
hashlink = "0.8.0" hashlink = "0.8.0"
tracing = "0.1.35" tracing = "0.1.35"
lazy_static = "1.4.0"
metrics = { workspace = true }

View File

@ -1,11 +1,16 @@
use super::mem_pool::MemoryChunkPool; use super::mem_pool::MemoryChunkPool;
use crate::mem_pool::FileID; use crate::mem_pool::FileID;
use anyhow::Result; use anyhow::Result;
use network::NetworkMessage; use metrics::{Histogram, Sample};
use network::{NetworkMessage, NetworkSender};
use shared_types::{ChunkArray, FileProof}; use shared_types::{ChunkArray, FileProof};
use std::{sync::Arc, time::SystemTime}; use std::{sync::Arc, time::Instant};
use storage_async::{ShardConfig, Store}; use storage_async::{ShardConfig, Store};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::UnboundedReceiver;
lazy_static::lazy_static! {
pub static ref FINALIZE_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("chunk_pool_finalize_file_latency", 1024);
}
/// Handle the cached file when uploaded completely and verified from blockchain. /// Handle the cached file when uploaded completely and verified from blockchain.
/// Generally, the file will be persisted into log store. /// Generally, the file will be persisted into log store.
@ -13,7 +18,7 @@ pub struct ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>, receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>, mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>, log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>, sender: NetworkSender,
} }
impl ChunkPoolHandler { impl ChunkPoolHandler {
@ -21,7 +26,7 @@ impl ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>, receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>, mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>, log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>, sender: NetworkSender,
) -> Self { ) -> Self {
ChunkPoolHandler { ChunkPoolHandler {
receiver, receiver,
@ -68,7 +73,7 @@ impl ChunkPoolHandler {
} }
} }
let start = SystemTime::now(); let start = Instant::now();
if !self if !self
.log_store .log_store
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash) .finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
@ -77,8 +82,9 @@ impl ChunkPoolHandler {
return Ok(false); return Ok(false);
} }
let elapsed = start.elapsed()?; let elapsed = start.elapsed();
debug!(?id, ?elapsed, "Transaction finalized"); debug!(?id, ?elapsed, "Transaction finalized");
FINALIZE_FILE_LATENCY.update_since(start);
// always remove file from pool after transaction finalized // always remove file from pool after transaction finalized
self.mem_pool.remove_file(&id.root).await; self.mem_pool.remove_file(&id.root).await;

View File

@ -29,7 +29,7 @@ impl Config {
pub fn unbounded( pub fn unbounded(
config: Config, config: Config,
log_store: Arc<storage_async::Store>, log_store: Arc<storage_async::Store>,
network_send: tokio::sync::mpsc::UnboundedSender<network::NetworkMessage>, network_send: network::NetworkSender,
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) { ) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

View File

@ -3,13 +3,12 @@ use crate::monitor::Monitor;
use crate::sealer::Sealer; use crate::sealer::Sealer;
use crate::submitter::Submitter; use crate::submitter::Submitter;
use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher}; use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher};
use network::NetworkMessage; use network::NetworkSender;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use storage::config::ShardConfig; use storage::config::ShardConfig;
use storage_async::Store; use storage_async::Store;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum MinerMessage { pub enum MinerMessage {
@ -29,7 +28,7 @@ pub struct MineService;
impl MineService { impl MineService {
pub async fn spawn( pub async fn spawn(
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
_network_send: mpsc::UnboundedSender<NetworkMessage>, _network_send: NetworkSender,
config: MinerConfig, config: MinerConfig,
store: Arc<Store>, store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> { ) -> Result<broadcast::Sender<MinerMessage>, String> {

View File

@ -41,6 +41,7 @@ if-addrs = "0.10.1"
slog = "2.7.0" slog = "2.7.0"
igd = "0.12.1" igd = "0.12.1"
duration-str = "0.5.1" duration-str = "0.5.1"
channel = { path = "../../common/channel" }
[dependencies.libp2p] [dependencies.libp2p]
version = "0.45.1" version = "0.45.1"

View File

@ -159,3 +159,10 @@ pub enum NetworkMessage {
udp_socket: Option<SocketAddr>, udp_socket: Option<SocketAddr>,
}, },
} }
pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
channel::metrics::unbounded_channel("network")
}

View File

@ -3,10 +3,9 @@
//! Currently supported strategies: //! Currently supported strategies:
//! - UPnP //! - UPnP
use crate::{NetworkConfig, NetworkMessage}; use crate::{NetworkConfig, NetworkMessage, NetworkSender};
use if_addrs::get_if_addrs; use if_addrs::get_if_addrs;
use std::net::{IpAddr, SocketAddr, SocketAddrV4}; use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc;
/// Configuration required to construct the UPnP port mappings. /// Configuration required to construct the UPnP port mappings.
pub struct UPnPConfig { pub struct UPnPConfig {
@ -36,10 +35,7 @@ impl UPnPConfig {
} }
/// Attempts to construct external port mappings with UPnP. /// Attempts to construct external port mappings with UPnP.
pub fn construct_upnp_mappings( pub fn construct_upnp_mappings(config: UPnPConfig, network_send: NetworkSender) {
config: UPnPConfig,
network_send: mpsc::UnboundedSender<NetworkMessage>,
) {
info!("UPnP Attempting to initialise routes"); info!("UPnP Attempting to initialise routes");
match igd::search_gateway(Default::default()) { match igd::search_gateway(Default::default()) {
Err(e) => info!(error = %e, "UPnP not available"), Err(e) => info!(error = %e, "UPnP not available"),

View File

@ -4,7 +4,7 @@ use crate::discovery::enr;
use crate::multiaddr::Protocol; use crate::multiaddr::Protocol;
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId}; use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId};
use crate::types::{error, GossipKind}; use crate::types::{error, GossipKind};
use crate::{EnrExt, NetworkMessage}; use crate::{EnrExt, NetworkSender};
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource}; use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
use futures::prelude::*; use futures::prelude::*;
use libp2p::core::{ use libp2p::core::{
@ -21,7 +21,6 @@ use std::io::prelude::*;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS}; use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
@ -60,7 +59,7 @@ pub struct Context<'a> {
impl<AppReqId: ReqId> Service<AppReqId> { impl<AppReqId: ReqId> Service<AppReqId> {
pub async fn new( pub async fn new(
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
network_sender: UnboundedSender<NetworkMessage>, network_sender: NetworkSender,
ctx: Context<'_>, ctx: Context<'_>,
) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> { ) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> {
trace!("Libp2p Service starting"); trace!("Libp2p Service starting");

View File

@ -1,6 +1,7 @@
#![cfg(test)] #![cfg(test)]
use libp2p::gossipsub::GossipsubConfigBuilder; use libp2p::gossipsub::GossipsubConfigBuilder;
use network::new_network_channel;
use network::Enr; use network::Enr;
use network::EnrExt; use network::EnrExt;
use network::Multiaddr; use network::Multiaddr;
@ -22,7 +23,6 @@ pub mod swarm;
type ReqId = usize; type ReqId = usize;
use tempfile::Builder as TempBuilder; use tempfile::Builder as TempBuilder;
use tokio::sync::mpsc::unbounded_channel;
#[allow(unused)] #[allow(unused)]
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal); pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
@ -72,7 +72,7 @@ pub async fn build_libp2p_instance(rt: Weak<Runtime>, boot_nodes: Vec<Enr>) -> L
let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx); let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
let libp2p_context = network::Context { config: &config }; let libp2p_context = network::Context { config: &config };
let (sender, _) = unbounded_channel(); let (sender, _) = new_network_channel();
Libp2pInstance( Libp2pInstance(
LibP2PService::new(executor, sender, libp2p_context) LibP2PService::new(executor, sender, libp2p_context)
.await .await

View File

@ -16,7 +16,7 @@ use network::{
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId, Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
PublicKey, PubsubMessage, Request, RequestId, Response, PublicKey, PubsubMessage, Request, RequestId, Response,
}; };
use network::{Multiaddr, PeerAction, ReportSource}; use network::{Multiaddr, NetworkSender, PeerAction, ReportSource};
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID}; use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
use storage::config::ShardConfig; use storage::config::ShardConfig;
use storage_async::Store; use storage_async::Store;
@ -88,7 +88,7 @@ pub struct Libp2pEventHandler {
/// A collection of global variables, accessible outside of the network service. /// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals>, network_globals: Arc<NetworkGlobals>,
/// A channel to the router service. /// A channel to the router service.
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: NetworkSender,
/// A channel to the syncing service. /// A channel to the syncing service.
sync_send: SyncSender, sync_send: SyncSender,
/// A channel to the RPC chunk pool service. /// A channel to the RPC chunk pool service.
@ -112,7 +112,7 @@ impl Libp2pEventHandler {
pub fn new( pub fn new(
config: Config, config: Config,
network_globals: Arc<NetworkGlobals>, network_globals: Arc<NetworkGlobals>,
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: NetworkSender,
sync_send: SyncSender, sync_send: SyncSender,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>, chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
local_keypair: Keypair, local_keypair: Keypair,
@ -1010,10 +1010,12 @@ mod tests {
use network::{ use network::{
discovery::{CombinedKey, ConnectionId}, discovery::{CombinedKey, ConnectionId},
discv5::enr::EnrBuilder, discv5::enr::EnrBuilder,
new_network_channel,
rpc::{GetChunksRequest, StatusMessage, SubstreamId}, rpc::{GetChunksRequest, StatusMessage, SubstreamId},
types::FindFile, types::FindFile,
CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals, CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals,
NetworkMessage, PeerId, PubsubMessage, Request, RequestId, Response, SyncId, NetworkMessage, NetworkReceiver, PeerId, PubsubMessage, Request, RequestId, Response,
SyncId,
}; };
use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID}; use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID};
use storage::{ use storage::{
@ -1035,8 +1037,8 @@ mod tests {
runtime: TestRuntime, runtime: TestRuntime,
network_globals: Arc<NetworkGlobals>, network_globals: Arc<NetworkGlobals>,
keypair: Keypair, keypair: Keypair,
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: NetworkSender,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>, network_recv: NetworkReceiver,
sync_send: SyncSender, sync_send: SyncSender,
sync_recv: SyncReceiver, sync_recv: SyncReceiver,
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>, chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
@ -1050,7 +1052,7 @@ mod tests {
fn default() -> Self { fn default() -> Self {
let runtime = TestRuntime::default(); let runtime = TestRuntime::default();
let (network_globals, keypair) = Context::new_network_globals(); let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel(); let (network_send, network_recv) = new_network_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded("test"); let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel(); let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();

View File

@ -5,11 +5,9 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*}; use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage; use miner::MinerMessage;
use network::types::NewFile;
use network::PubsubMessage;
use network::{ use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId, types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
Service as LibP2PService, Swarm, NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
}; };
use pruner::PrunerMessage; use pruner::PrunerMessage;
use shared_types::timestamp_now; use shared_types::timestamp_now;
@ -33,7 +31,7 @@ pub struct RouterService {
network_globals: Arc<NetworkGlobals>, network_globals: Arc<NetworkGlobals>,
/// The receiver channel for Zgs to communicate with the network service. /// The receiver channel for Zgs to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage>, network_recv: NetworkReceiver,
/// The receiver channel for Zgs to communicate with the pruner service. /// The receiver channel for Zgs to communicate with the pruner service.
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>, pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
@ -57,8 +55,8 @@ impl RouterService {
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
libp2p: LibP2PService<RequestId>, libp2p: LibP2PService<RequestId>,
network_globals: Arc<NetworkGlobals>, network_globals: Arc<NetworkGlobals>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>, network_recv: NetworkReceiver,
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: NetworkSender,
sync_send: SyncSender, sync_send: SyncSender,
_miner_send: Option<broadcast::Sender<MinerMessage>>, _miner_send: Option<broadcast::Sender<MinerMessage>>,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>, chunk_pool_send: UnboundedSender<ChunkPoolMessage>,

View File

@ -17,15 +17,13 @@ use file_location_cache::FileLocationCache;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use network::NetworkGlobals; use network::{NetworkGlobals, NetworkMessage, NetworkSender};
use network::NetworkMessage;
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use storage_async::Store; use storage_async::Store;
use sync::{SyncRequest, SyncResponse, SyncSender}; use sync::{SyncRequest, SyncResponse, SyncSender};
use task_executor::ShutdownReason; use task_executor::ShutdownReason;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use zgs::RpcServer as ZgsRpcServer; use zgs::RpcServer as ZgsRpcServer;
use zgs_miner::MinerMessage; use zgs_miner::MinerMessage;
@ -42,7 +40,7 @@ pub struct Context {
pub config: RPCConfig, pub config: RPCConfig,
pub file_location_cache: Arc<FileLocationCache>, pub file_location_cache: Arc<FileLocationCache>,
pub network_globals: Arc<NetworkGlobals>, pub network_globals: Arc<NetworkGlobals>,
pub network_send: UnboundedSender<NetworkMessage>, pub network_send: NetworkSender,
pub sync_send: SyncSender, pub sync_send: SyncSender,
pub chunk_pool: Arc<MemoryChunkPool>, pub chunk_pool: Arc<MemoryChunkPool>,
pub log_store: Arc<Store>, pub log_store: Arc<Store>,

View File

@ -4,8 +4,8 @@ use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager}; use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig}; use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
use network::{ use network::{
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId, self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
Service as LibP2PService, NetworkSender, RequestId, Service as LibP2PService,
}; };
use pruner::{Pruner, PrunerConfig, PrunerMessage}; use pruner::{Pruner, PrunerConfig, PrunerMessage};
use router::RouterService; use router::RouterService;
@ -27,15 +27,12 @@ macro_rules! require {
} }
struct NetworkComponents { struct NetworkComponents {
send: mpsc::UnboundedSender<NetworkMessage>, send: NetworkSender,
globals: Arc<NetworkGlobals>, globals: Arc<NetworkGlobals>,
keypair: Keypair, keypair: Keypair,
// note: these will be owned by the router service // note: these will be owned by the router service
owned: Option<( owned: Option<(LibP2PService<RequestId>, NetworkReceiver)>,
LibP2PService<RequestId>,
mpsc::UnboundedReceiver<NetworkMessage>,
)>,
} }
struct SyncComponents { struct SyncComponents {
@ -144,7 +141,7 @@ impl ClientBuilder {
let service_context = network::Context { config }; let service_context = network::Context { config };
// construct communication channel // construct communication channel
let (send, recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (send, recv) = new_network_channel();
// launch libp2p service // launch libp2p service
let (globals, keypair, libp2p) = let (globals, keypair, libp2p) =

View File

@ -1,7 +1,7 @@
#[macro_use] #[macro_use]
extern crate tracing; extern crate tracing;
use anyhow::bail; use anyhow::{anyhow, bail};
use shared_types::{ use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
}; };
@ -140,17 +140,21 @@ impl Store {
let store = self.store.clone(); let store = self.store.clone();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.executor.spawn_blocking( self.executor
move || { .spawn_blocking_handle(
// FIXME(zz): Not all functions need `write`. Refactor store usage. move || {
let res = f(&*store); // FIXME(zz): Not all functions need `write`. Refactor store usage.
let res = f(&*store);
if tx.send(res).is_err() { if tx.send(res).is_err() {
error!("Unable to complete async storage operation: the receiver dropped"); error!("Unable to complete async storage operation: the receiver dropped");
} }
}, },
WORKER_TASK_NAME, WORKER_TASK_NAME,
); )
.ok_or(anyhow!("Unable to spawn async storage work"))?
.await
.map_err(|e| anyhow!("join error: e={:?}", e))?;
rx.await rx.await
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string()))) .unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))

View File

@ -1,12 +1,11 @@
use network::{NetworkMessage, PeerAction, PeerId, PubsubMessage, ReportSource}; use network::{NetworkMessage, NetworkSender, PeerAction, PeerId, PubsubMessage, ReportSource};
use tokio::sync::mpsc;
pub struct SyncNetworkContext { pub struct SyncNetworkContext {
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: NetworkSender,
} }
impl SyncNetworkContext { impl SyncNetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>) -> Self { pub fn new(network_send: NetworkSender) -> Self {
Self { network_send } Self { network_send }
} }

View File

@ -750,13 +750,13 @@ mod tests {
use crate::test_util::create_2_store; use crate::test_util::create_2_store;
use crate::test_util::tests::create_file_location_cache; use crate::test_util::tests::create_file_location_cache;
use libp2p::identity; use libp2p::identity;
use network::{new_network_channel, NetworkReceiver};
use network::{ReportSource, Request}; use network::{ReportSource, Request};
use storage::log_store::log_manager::LogConfig; use storage::log_store::log_manager::LogConfig;
use storage::log_store::log_manager::LogManager; use storage::log_store::log_manager::LogManager;
use storage::log_store::LogStoreRead; use storage::log_store::LogStoreRead;
use storage::H256; use storage::H256;
use task_executor::{test_utils::TestRuntime, TaskExecutor}; use task_executor::{test_utils::TestRuntime, TaskExecutor};
use tokio::sync::mpsc::{self, UnboundedReceiver};
#[test] #[test]
fn test_status() { fn test_status() {
@ -1649,7 +1649,7 @@ mod tests {
fn create_default_controller( fn create_default_controller(
task_executor: TaskExecutor, task_executor: TaskExecutor,
peer_id: Option<PeerId>, peer_id: Option<PeerId>,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) { ) -> (SerialSyncController, NetworkReceiver) {
let tx_id = TxID { let tx_id = TxID {
seq: 0, seq: 0,
hash: H256::random(), hash: H256::random(),
@ -1668,8 +1668,8 @@ mod tests {
store: Arc<LogManager>, store: Arc<LogManager>,
tx_id: TxID, tx_id: TxID,
num_chunks: usize, num_chunks: usize,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) { ) -> (SerialSyncController, NetworkReceiver) {
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, network_recv) = new_network_channel();
let ctx = Arc::new(SyncNetworkContext::new(network_send)); let ctx = Arc::new(SyncNetworkContext::new(network_send));
let peer_id = match peer_id { let peer_id = match peer_id {

View File

@ -10,10 +10,9 @@ use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent; use log_entry_sync::LogSyncEvent;
use network::rpc::methods::FileAnnouncement; use network::rpc::methods::FileAnnouncement;
use network::types::{AnnounceChunks, FindFile, NewFile}; use network::types::{AnnounceChunks, FindFile, NewFile};
use network::PubsubMessage;
use network::{ use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId, rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, NetworkSender,
PeerRequestId, SyncId as RequestId, PeerId, PeerRequestId, PubsubMessage, SyncId as RequestId,
}; };
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID}; use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -27,7 +26,7 @@ use storage::error::Result as StorageResult;
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE}; use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage::log_store::Store as LogStore; use storage::log_store::Store as LogStore;
use storage_async::Store; use storage_async::Store;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, oneshot};
pub type SyncSender = channel::Sender<SyncMessage, SyncRequest, SyncResponse>; pub type SyncSender = channel::Sender<SyncMessage, SyncRequest, SyncResponse>;
pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse>; pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse>;
@ -142,7 +141,7 @@ pub struct SyncService {
impl SyncService { impl SyncService {
pub async fn spawn( pub async fn spawn(
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: NetworkSender,
store: Arc<dyn LogStore>, store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>, file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>, event_recv: broadcast::Receiver<LogSyncEvent>,
@ -163,7 +162,7 @@ impl SyncService {
pub async fn spawn_with_config( pub async fn spawn_with_config(
config: Config, config: Config,
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>, network_send: NetworkSender,
store: Arc<dyn LogStore>, store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>, file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>, event_recv: broadcast::Receiver<LogSyncEvent>,
@ -897,7 +896,9 @@ mod tests {
use crate::test_util::tests::create_file_location_cache; use crate::test_util::tests::create_file_location_cache;
use libp2p::identity; use libp2p::identity;
use network::discovery::ConnectionId; use network::discovery::ConnectionId;
use network::new_network_channel;
use network::rpc::SubstreamId; use network::rpc::SubstreamId;
use network::NetworkReceiver;
use network::ReportSource; use network::ReportSource;
use shared_types::ChunkArray; use shared_types::ChunkArray;
use shared_types::Transaction; use shared_types::Transaction;
@ -909,8 +910,6 @@ mod tests {
use storage::log_store::LogStoreRead; use storage::log_store::LogStoreRead;
use storage::H256; use storage::H256;
use task_executor::test_utils::TestRuntime; use task_executor::test_utils::TestRuntime;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
struct TestSyncRuntime { struct TestSyncRuntime {
runtime: TestRuntime, runtime: TestRuntime,
@ -924,8 +923,8 @@ mod tests {
init_peer_id: PeerId, init_peer_id: PeerId,
file_location_cache: Arc<FileLocationCache>, file_location_cache: Arc<FileLocationCache>,
network_send: UnboundedSender<NetworkMessage>, network_send: NetworkSender,
network_recv: UnboundedReceiver<NetworkMessage>, network_recv: NetworkReceiver,
event_send: broadcast::Sender<LogSyncEvent>, event_send: broadcast::Sender<LogSyncEvent>,
catch_up_end_recv: Option<oneshot::Receiver<()>>, catch_up_end_recv: Option<oneshot::Receiver<()>>,
} }
@ -942,7 +941,7 @@ mod tests {
let (store, peer_store, txs, data) = create_2_store(chunk_counts); let (store, peer_store, txs, data) = create_2_store(chunk_counts);
let init_data = data[0].clone(); let init_data = data[0].clone();
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id(); let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, network_recv) = new_network_channel();
let (event_send, _) = broadcast::channel(16); let (event_send, _) = broadcast::channel(16);
let (_, catch_up_end_recv) = oneshot::channel(); let (_, catch_up_end_recv) = oneshot::channel();
@ -1006,7 +1005,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> = let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![txs[0].id()]); create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, mut network_recv) = new_network_channel();
let (_, sync_recv) = channel::Channel::unbounded("test"); let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService { let mut sync = SyncService {
@ -1035,7 +1034,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> = let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![txs[0].id()]); create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, mut network_recv) = new_network_channel();
let (_, sync_recv) = channel::Channel::unbounded("test"); let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService { let mut sync = SyncService {
@ -1354,7 +1353,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> = let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![]); create_file_location_cache(init_peer_id, vec![]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>(); let (network_send, mut network_recv) = new_network_channel();
let (_event_send, event_recv) = broadcast::channel(16); let (_event_send, event_recv) = broadcast::channel(16);
let (_, catch_up_end_recv) = oneshot::channel(); let (_, catch_up_end_recv) = oneshot::channel();
let sync_send = SyncService::spawn_with_config( let sync_send = SyncService::spawn_with_config(
@ -1781,7 +1780,7 @@ mod tests {
} }
async fn receive_chunk_request( async fn receive_chunk_request(
network_recv: &mut UnboundedReceiver<NetworkMessage>, network_recv: &mut NetworkReceiver,
sync_send: &SyncSender, sync_send: &SyncSender,
peer_store: Arc<LogManager>, peer_store: Arc<LogManager>,
init_peer_id: PeerId, init_peer_id: PeerId,