mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-26 08:15:17 +00:00
Compare commits
2 Commits
992deb0979
...
d792cc9979
Author | SHA1 | Date | |
---|---|---|---|
|
d792cc9979 | ||
|
1de7afec14 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -912,7 +912,9 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"async-lock 2.8.0",
|
"async-lock 2.8.0",
|
||||||
"hashlink 0.8.4",
|
"hashlink 0.8.4",
|
||||||
|
"lazy_static",
|
||||||
"log_entry_sync",
|
"log_entry_sync",
|
||||||
|
"metrics",
|
||||||
"network",
|
"network",
|
||||||
"shared_types",
|
"shared_types",
|
||||||
"storage-async",
|
"storage-async",
|
||||||
@ -5030,6 +5032,7 @@ dependencies = [
|
|||||||
name = "network"
|
name = "network"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"channel",
|
||||||
"directory",
|
"directory",
|
||||||
"dirs 4.0.0",
|
"dirs 4.0.0",
|
||||||
"discv5",
|
"discv5",
|
||||||
|
@ -13,3 +13,5 @@ tokio = { version = "1.19.2", features = ["sync"] }
|
|||||||
async-lock = "2.5.0"
|
async-lock = "2.5.0"
|
||||||
hashlink = "0.8.0"
|
hashlink = "0.8.0"
|
||||||
tracing = "0.1.35"
|
tracing = "0.1.35"
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
metrics = { workspace = true }
|
||||||
|
@ -1,11 +1,16 @@
|
|||||||
use super::mem_pool::MemoryChunkPool;
|
use super::mem_pool::MemoryChunkPool;
|
||||||
use crate::mem_pool::FileID;
|
use crate::mem_pool::FileID;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use network::NetworkMessage;
|
use metrics::{Histogram, Sample};
|
||||||
|
use network::{NetworkMessage, NetworkSender};
|
||||||
use shared_types::{ChunkArray, FileProof};
|
use shared_types::{ChunkArray, FileProof};
|
||||||
use std::{sync::Arc, time::SystemTime};
|
use std::{sync::Arc, time::Instant};
|
||||||
use storage_async::{ShardConfig, Store};
|
use storage_async::{ShardConfig, Store};
|
||||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
use tokio::sync::mpsc::UnboundedReceiver;
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
pub static ref FINALIZE_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("chunk_pool_finalize_file_latency", 1024);
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle the cached file when uploaded completely and verified from blockchain.
|
/// Handle the cached file when uploaded completely and verified from blockchain.
|
||||||
/// Generally, the file will be persisted into log store.
|
/// Generally, the file will be persisted into log store.
|
||||||
@ -13,7 +18,7 @@ pub struct ChunkPoolHandler {
|
|||||||
receiver: UnboundedReceiver<ChunkPoolMessage>,
|
receiver: UnboundedReceiver<ChunkPoolMessage>,
|
||||||
mem_pool: Arc<MemoryChunkPool>,
|
mem_pool: Arc<MemoryChunkPool>,
|
||||||
log_store: Arc<Store>,
|
log_store: Arc<Store>,
|
||||||
sender: UnboundedSender<NetworkMessage>,
|
sender: NetworkSender,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChunkPoolHandler {
|
impl ChunkPoolHandler {
|
||||||
@ -21,7 +26,7 @@ impl ChunkPoolHandler {
|
|||||||
receiver: UnboundedReceiver<ChunkPoolMessage>,
|
receiver: UnboundedReceiver<ChunkPoolMessage>,
|
||||||
mem_pool: Arc<MemoryChunkPool>,
|
mem_pool: Arc<MemoryChunkPool>,
|
||||||
log_store: Arc<Store>,
|
log_store: Arc<Store>,
|
||||||
sender: UnboundedSender<NetworkMessage>,
|
sender: NetworkSender,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
ChunkPoolHandler {
|
ChunkPoolHandler {
|
||||||
receiver,
|
receiver,
|
||||||
@ -68,7 +73,7 @@ impl ChunkPoolHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let start = SystemTime::now();
|
let start = Instant::now();
|
||||||
if !self
|
if !self
|
||||||
.log_store
|
.log_store
|
||||||
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
|
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
|
||||||
@ -77,8 +82,9 @@ impl ChunkPoolHandler {
|
|||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
let elapsed = start.elapsed()?;
|
let elapsed = start.elapsed();
|
||||||
debug!(?id, ?elapsed, "Transaction finalized");
|
debug!(?id, ?elapsed, "Transaction finalized");
|
||||||
|
FINALIZE_FILE_LATENCY.update_since(start);
|
||||||
|
|
||||||
// always remove file from pool after transaction finalized
|
// always remove file from pool after transaction finalized
|
||||||
self.mem_pool.remove_file(&id.root).await;
|
self.mem_pool.remove_file(&id.root).await;
|
||||||
|
@ -29,7 +29,7 @@ impl Config {
|
|||||||
pub fn unbounded(
|
pub fn unbounded(
|
||||||
config: Config,
|
config: Config,
|
||||||
log_store: Arc<storage_async::Store>,
|
log_store: Arc<storage_async::Store>,
|
||||||
network_send: tokio::sync::mpsc::UnboundedSender<network::NetworkMessage>,
|
network_send: network::NetworkSender,
|
||||||
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
|
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
|
||||||
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
|
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
@ -3,13 +3,12 @@ use crate::monitor::Monitor;
|
|||||||
use crate::sealer::Sealer;
|
use crate::sealer::Sealer;
|
||||||
use crate::submitter::Submitter;
|
use crate::submitter::Submitter;
|
||||||
use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher};
|
use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher};
|
||||||
use network::NetworkMessage;
|
use network::NetworkSender;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum MinerMessage {
|
pub enum MinerMessage {
|
||||||
@ -29,7 +28,7 @@ pub struct MineService;
|
|||||||
impl MineService {
|
impl MineService {
|
||||||
pub async fn spawn(
|
pub async fn spawn(
|
||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
_network_send: mpsc::UnboundedSender<NetworkMessage>,
|
_network_send: NetworkSender,
|
||||||
config: MinerConfig,
|
config: MinerConfig,
|
||||||
store: Arc<Store>,
|
store: Arc<Store>,
|
||||||
) -> Result<broadcast::Sender<MinerMessage>, String> {
|
) -> Result<broadcast::Sender<MinerMessage>, String> {
|
||||||
|
@ -41,6 +41,7 @@ if-addrs = "0.10.1"
|
|||||||
slog = "2.7.0"
|
slog = "2.7.0"
|
||||||
igd = "0.12.1"
|
igd = "0.12.1"
|
||||||
duration-str = "0.5.1"
|
duration-str = "0.5.1"
|
||||||
|
channel = { path = "../../common/channel" }
|
||||||
|
|
||||||
[dependencies.libp2p]
|
[dependencies.libp2p]
|
||||||
version = "0.45.1"
|
version = "0.45.1"
|
||||||
|
@ -159,3 +159,10 @@ pub enum NetworkMessage {
|
|||||||
udp_socket: Option<SocketAddr>,
|
udp_socket: Option<SocketAddr>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
|
||||||
|
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
|
||||||
|
|
||||||
|
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
|
||||||
|
channel::metrics::unbounded_channel("network")
|
||||||
|
}
|
||||||
|
@ -3,10 +3,9 @@
|
|||||||
//! Currently supported strategies:
|
//! Currently supported strategies:
|
||||||
//! - UPnP
|
//! - UPnP
|
||||||
|
|
||||||
use crate::{NetworkConfig, NetworkMessage};
|
use crate::{NetworkConfig, NetworkMessage, NetworkSender};
|
||||||
use if_addrs::get_if_addrs;
|
use if_addrs::get_if_addrs;
|
||||||
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
|
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
/// Configuration required to construct the UPnP port mappings.
|
/// Configuration required to construct the UPnP port mappings.
|
||||||
pub struct UPnPConfig {
|
pub struct UPnPConfig {
|
||||||
@ -36,10 +35,7 @@ impl UPnPConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Attempts to construct external port mappings with UPnP.
|
/// Attempts to construct external port mappings with UPnP.
|
||||||
pub fn construct_upnp_mappings(
|
pub fn construct_upnp_mappings(config: UPnPConfig, network_send: NetworkSender) {
|
||||||
config: UPnPConfig,
|
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
|
||||||
) {
|
|
||||||
info!("UPnP Attempting to initialise routes");
|
info!("UPnP Attempting to initialise routes");
|
||||||
match igd::search_gateway(Default::default()) {
|
match igd::search_gateway(Default::default()) {
|
||||||
Err(e) => info!(error = %e, "UPnP not available"),
|
Err(e) => info!(error = %e, "UPnP not available"),
|
||||||
|
@ -4,7 +4,7 @@ use crate::discovery::enr;
|
|||||||
use crate::multiaddr::Protocol;
|
use crate::multiaddr::Protocol;
|
||||||
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId};
|
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId};
|
||||||
use crate::types::{error, GossipKind};
|
use crate::types::{error, GossipKind};
|
||||||
use crate::{EnrExt, NetworkMessage};
|
use crate::{EnrExt, NetworkSender};
|
||||||
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
|
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p::core::{
|
use libp2p::core::{
|
||||||
@ -21,7 +21,6 @@ use std::io::prelude::*;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
|
||||||
|
|
||||||
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
|
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
|
||||||
|
|
||||||
@ -60,7 +59,7 @@ pub struct Context<'a> {
|
|||||||
impl<AppReqId: ReqId> Service<AppReqId> {
|
impl<AppReqId: ReqId> Service<AppReqId> {
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
network_sender: UnboundedSender<NetworkMessage>,
|
network_sender: NetworkSender,
|
||||||
ctx: Context<'_>,
|
ctx: Context<'_>,
|
||||||
) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> {
|
) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> {
|
||||||
trace!("Libp2p Service starting");
|
trace!("Libp2p Service starting");
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#![cfg(test)]
|
#![cfg(test)]
|
||||||
|
|
||||||
use libp2p::gossipsub::GossipsubConfigBuilder;
|
use libp2p::gossipsub::GossipsubConfigBuilder;
|
||||||
|
use network::new_network_channel;
|
||||||
use network::Enr;
|
use network::Enr;
|
||||||
use network::EnrExt;
|
use network::EnrExt;
|
||||||
use network::Multiaddr;
|
use network::Multiaddr;
|
||||||
@ -22,7 +23,6 @@ pub mod swarm;
|
|||||||
type ReqId = usize;
|
type ReqId = usize;
|
||||||
|
|
||||||
use tempfile::Builder as TempBuilder;
|
use tempfile::Builder as TempBuilder;
|
||||||
use tokio::sync::mpsc::unbounded_channel;
|
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
|
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
|
||||||
@ -72,7 +72,7 @@ pub async fn build_libp2p_instance(rt: Weak<Runtime>, boot_nodes: Vec<Enr>) -> L
|
|||||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||||
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
|
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
|
||||||
let libp2p_context = network::Context { config: &config };
|
let libp2p_context = network::Context { config: &config };
|
||||||
let (sender, _) = unbounded_channel();
|
let (sender, _) = new_network_channel();
|
||||||
Libp2pInstance(
|
Libp2pInstance(
|
||||||
LibP2PService::new(executor, sender, libp2p_context)
|
LibP2PService::new(executor, sender, libp2p_context)
|
||||||
.await
|
.await
|
||||||
|
@ -16,7 +16,7 @@ use network::{
|
|||||||
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
|
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
|
||||||
PublicKey, PubsubMessage, Request, RequestId, Response,
|
PublicKey, PubsubMessage, Request, RequestId, Response,
|
||||||
};
|
};
|
||||||
use network::{Multiaddr, PeerAction, ReportSource};
|
use network::{Multiaddr, NetworkSender, PeerAction, ReportSource};
|
||||||
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
|
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
@ -88,7 +88,7 @@ pub struct Libp2pEventHandler {
|
|||||||
/// A collection of global variables, accessible outside of the network service.
|
/// A collection of global variables, accessible outside of the network service.
|
||||||
network_globals: Arc<NetworkGlobals>,
|
network_globals: Arc<NetworkGlobals>,
|
||||||
/// A channel to the router service.
|
/// A channel to the router service.
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
/// A channel to the syncing service.
|
/// A channel to the syncing service.
|
||||||
sync_send: SyncSender,
|
sync_send: SyncSender,
|
||||||
/// A channel to the RPC chunk pool service.
|
/// A channel to the RPC chunk pool service.
|
||||||
@ -112,7 +112,7 @@ impl Libp2pEventHandler {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
config: Config,
|
config: Config,
|
||||||
network_globals: Arc<NetworkGlobals>,
|
network_globals: Arc<NetworkGlobals>,
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
sync_send: SyncSender,
|
sync_send: SyncSender,
|
||||||
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
|
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
|
||||||
local_keypair: Keypair,
|
local_keypair: Keypair,
|
||||||
@ -1010,10 +1010,12 @@ mod tests {
|
|||||||
use network::{
|
use network::{
|
||||||
discovery::{CombinedKey, ConnectionId},
|
discovery::{CombinedKey, ConnectionId},
|
||||||
discv5::enr::EnrBuilder,
|
discv5::enr::EnrBuilder,
|
||||||
|
new_network_channel,
|
||||||
rpc::{GetChunksRequest, StatusMessage, SubstreamId},
|
rpc::{GetChunksRequest, StatusMessage, SubstreamId},
|
||||||
types::FindFile,
|
types::FindFile,
|
||||||
CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals,
|
CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals,
|
||||||
NetworkMessage, PeerId, PubsubMessage, Request, RequestId, Response, SyncId,
|
NetworkMessage, NetworkReceiver, PeerId, PubsubMessage, Request, RequestId, Response,
|
||||||
|
SyncId,
|
||||||
};
|
};
|
||||||
use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID};
|
use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID};
|
||||||
use storage::{
|
use storage::{
|
||||||
@ -1035,8 +1037,8 @@ mod tests {
|
|||||||
runtime: TestRuntime,
|
runtime: TestRuntime,
|
||||||
network_globals: Arc<NetworkGlobals>,
|
network_globals: Arc<NetworkGlobals>,
|
||||||
keypair: Keypair,
|
keypair: Keypair,
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
network_recv: NetworkReceiver,
|
||||||
sync_send: SyncSender,
|
sync_send: SyncSender,
|
||||||
sync_recv: SyncReceiver,
|
sync_recv: SyncReceiver,
|
||||||
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
|
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
|
||||||
@ -1050,7 +1052,7 @@ mod tests {
|
|||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
let runtime = TestRuntime::default();
|
let runtime = TestRuntime::default();
|
||||||
let (network_globals, keypair) = Context::new_network_globals();
|
let (network_globals, keypair) = Context::new_network_globals();
|
||||||
let (network_send, network_recv) = mpsc::unbounded_channel();
|
let (network_send, network_recv) = new_network_channel();
|
||||||
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
|
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
|
||||||
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
|
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
@ -5,11 +5,9 @@ use chunk_pool::ChunkPoolMessage;
|
|||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use futures::{channel::mpsc::Sender, prelude::*};
|
use futures::{channel::mpsc::Sender, prelude::*};
|
||||||
use miner::MinerMessage;
|
use miner::MinerMessage;
|
||||||
use network::types::NewFile;
|
|
||||||
use network::PubsubMessage;
|
|
||||||
use network::{
|
use network::{
|
||||||
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
|
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
|
||||||
Service as LibP2PService, Swarm,
|
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
|
||||||
};
|
};
|
||||||
use pruner::PrunerMessage;
|
use pruner::PrunerMessage;
|
||||||
use shared_types::timestamp_now;
|
use shared_types::timestamp_now;
|
||||||
@ -33,7 +31,7 @@ pub struct RouterService {
|
|||||||
network_globals: Arc<NetworkGlobals>,
|
network_globals: Arc<NetworkGlobals>,
|
||||||
|
|
||||||
/// The receiver channel for Zgs to communicate with the network service.
|
/// The receiver channel for Zgs to communicate with the network service.
|
||||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
network_recv: NetworkReceiver,
|
||||||
|
|
||||||
/// The receiver channel for Zgs to communicate with the pruner service.
|
/// The receiver channel for Zgs to communicate with the pruner service.
|
||||||
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
|
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
|
||||||
@ -57,8 +55,8 @@ impl RouterService {
|
|||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
libp2p: LibP2PService<RequestId>,
|
libp2p: LibP2PService<RequestId>,
|
||||||
network_globals: Arc<NetworkGlobals>,
|
network_globals: Arc<NetworkGlobals>,
|
||||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
network_recv: NetworkReceiver,
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
sync_send: SyncSender,
|
sync_send: SyncSender,
|
||||||
_miner_send: Option<broadcast::Sender<MinerMessage>>,
|
_miner_send: Option<broadcast::Sender<MinerMessage>>,
|
||||||
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
|
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
|
||||||
|
@ -17,15 +17,13 @@ use file_location_cache::FileLocationCache;
|
|||||||
use futures::channel::mpsc::Sender;
|
use futures::channel::mpsc::Sender;
|
||||||
use jsonrpsee::core::RpcResult;
|
use jsonrpsee::core::RpcResult;
|
||||||
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
|
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
|
||||||
use network::NetworkGlobals;
|
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
|
||||||
use network::NetworkMessage;
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use sync::{SyncRequest, SyncResponse, SyncSender};
|
use sync::{SyncRequest, SyncResponse, SyncSender};
|
||||||
use task_executor::ShutdownReason;
|
use task_executor::ShutdownReason;
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
|
||||||
use zgs::RpcServer as ZgsRpcServer;
|
use zgs::RpcServer as ZgsRpcServer;
|
||||||
use zgs_miner::MinerMessage;
|
use zgs_miner::MinerMessage;
|
||||||
|
|
||||||
@ -42,7 +40,7 @@ pub struct Context {
|
|||||||
pub config: RPCConfig,
|
pub config: RPCConfig,
|
||||||
pub file_location_cache: Arc<FileLocationCache>,
|
pub file_location_cache: Arc<FileLocationCache>,
|
||||||
pub network_globals: Arc<NetworkGlobals>,
|
pub network_globals: Arc<NetworkGlobals>,
|
||||||
pub network_send: UnboundedSender<NetworkMessage>,
|
pub network_send: NetworkSender,
|
||||||
pub sync_send: SyncSender,
|
pub sync_send: SyncSender,
|
||||||
pub chunk_pool: Arc<MemoryChunkPool>,
|
pub chunk_pool: Arc<MemoryChunkPool>,
|
||||||
pub log_store: Arc<Store>,
|
pub log_store: Arc<Store>,
|
||||||
|
@ -4,8 +4,8 @@ use file_location_cache::FileLocationCache;
|
|||||||
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
||||||
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
|
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
|
||||||
use network::{
|
use network::{
|
||||||
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
|
self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
|
||||||
Service as LibP2PService,
|
NetworkSender, RequestId, Service as LibP2PService,
|
||||||
};
|
};
|
||||||
use pruner::{Pruner, PrunerConfig, PrunerMessage};
|
use pruner::{Pruner, PrunerConfig, PrunerMessage};
|
||||||
use router::RouterService;
|
use router::RouterService;
|
||||||
@ -27,15 +27,12 @@ macro_rules! require {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct NetworkComponents {
|
struct NetworkComponents {
|
||||||
send: mpsc::UnboundedSender<NetworkMessage>,
|
send: NetworkSender,
|
||||||
globals: Arc<NetworkGlobals>,
|
globals: Arc<NetworkGlobals>,
|
||||||
keypair: Keypair,
|
keypair: Keypair,
|
||||||
|
|
||||||
// note: these will be owned by the router service
|
// note: these will be owned by the router service
|
||||||
owned: Option<(
|
owned: Option<(LibP2PService<RequestId>, NetworkReceiver)>,
|
||||||
LibP2PService<RequestId>,
|
|
||||||
mpsc::UnboundedReceiver<NetworkMessage>,
|
|
||||||
)>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SyncComponents {
|
struct SyncComponents {
|
||||||
@ -144,7 +141,7 @@ impl ClientBuilder {
|
|||||||
let service_context = network::Context { config };
|
let service_context = network::Context { config };
|
||||||
|
|
||||||
// construct communication channel
|
// construct communication channel
|
||||||
let (send, recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
let (send, recv) = new_network_channel();
|
||||||
|
|
||||||
// launch libp2p service
|
// launch libp2p service
|
||||||
let (globals, keypair, libp2p) =
|
let (globals, keypair, libp2p) =
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
use network::{NetworkMessage, PeerAction, PeerId, PubsubMessage, ReportSource};
|
use network::{NetworkMessage, NetworkSender, PeerAction, PeerId, PubsubMessage, ReportSource};
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
pub struct SyncNetworkContext {
|
pub struct SyncNetworkContext {
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SyncNetworkContext {
|
impl SyncNetworkContext {
|
||||||
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>) -> Self {
|
pub fn new(network_send: NetworkSender) -> Self {
|
||||||
Self { network_send }
|
Self { network_send }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -750,13 +750,13 @@ mod tests {
|
|||||||
use crate::test_util::create_2_store;
|
use crate::test_util::create_2_store;
|
||||||
use crate::test_util::tests::create_file_location_cache;
|
use crate::test_util::tests::create_file_location_cache;
|
||||||
use libp2p::identity;
|
use libp2p::identity;
|
||||||
|
use network::{new_network_channel, NetworkReceiver};
|
||||||
use network::{ReportSource, Request};
|
use network::{ReportSource, Request};
|
||||||
use storage::log_store::log_manager::LogConfig;
|
use storage::log_store::log_manager::LogConfig;
|
||||||
use storage::log_store::log_manager::LogManager;
|
use storage::log_store::log_manager::LogManager;
|
||||||
use storage::log_store::LogStoreRead;
|
use storage::log_store::LogStoreRead;
|
||||||
use storage::H256;
|
use storage::H256;
|
||||||
use task_executor::{test_utils::TestRuntime, TaskExecutor};
|
use task_executor::{test_utils::TestRuntime, TaskExecutor};
|
||||||
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_status() {
|
fn test_status() {
|
||||||
@ -1649,7 +1649,7 @@ mod tests {
|
|||||||
fn create_default_controller(
|
fn create_default_controller(
|
||||||
task_executor: TaskExecutor,
|
task_executor: TaskExecutor,
|
||||||
peer_id: Option<PeerId>,
|
peer_id: Option<PeerId>,
|
||||||
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
|
) -> (SerialSyncController, NetworkReceiver) {
|
||||||
let tx_id = TxID {
|
let tx_id = TxID {
|
||||||
seq: 0,
|
seq: 0,
|
||||||
hash: H256::random(),
|
hash: H256::random(),
|
||||||
@ -1668,8 +1668,8 @@ mod tests {
|
|||||||
store: Arc<LogManager>,
|
store: Arc<LogManager>,
|
||||||
tx_id: TxID,
|
tx_id: TxID,
|
||||||
num_chunks: usize,
|
num_chunks: usize,
|
||||||
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
|
) -> (SerialSyncController, NetworkReceiver) {
|
||||||
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
let (network_send, network_recv) = new_network_channel();
|
||||||
let ctx = Arc::new(SyncNetworkContext::new(network_send));
|
let ctx = Arc::new(SyncNetworkContext::new(network_send));
|
||||||
|
|
||||||
let peer_id = match peer_id {
|
let peer_id = match peer_id {
|
||||||
|
@ -10,10 +10,9 @@ use libp2p::swarm::DialError;
|
|||||||
use log_entry_sync::LogSyncEvent;
|
use log_entry_sync::LogSyncEvent;
|
||||||
use network::rpc::methods::FileAnnouncement;
|
use network::rpc::methods::FileAnnouncement;
|
||||||
use network::types::{AnnounceChunks, FindFile, NewFile};
|
use network::types::{AnnounceChunks, FindFile, NewFile};
|
||||||
use network::PubsubMessage;
|
|
||||||
use network::{
|
use network::{
|
||||||
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
|
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, NetworkSender,
|
||||||
PeerRequestId, SyncId as RequestId,
|
PeerId, PeerRequestId, PubsubMessage, SyncId as RequestId,
|
||||||
};
|
};
|
||||||
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
|
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
@ -27,7 +26,7 @@ use storage::error::Result as StorageResult;
|
|||||||
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
||||||
use storage::log_store::Store as LogStore;
|
use storage::log_store::Store as LogStore;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, oneshot};
|
||||||
|
|
||||||
pub type SyncSender = channel::Sender<SyncMessage, SyncRequest, SyncResponse>;
|
pub type SyncSender = channel::Sender<SyncMessage, SyncRequest, SyncResponse>;
|
||||||
pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse>;
|
pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse>;
|
||||||
@ -142,7 +141,7 @@ pub struct SyncService {
|
|||||||
impl SyncService {
|
impl SyncService {
|
||||||
pub async fn spawn(
|
pub async fn spawn(
|
||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
store: Arc<dyn LogStore>,
|
store: Arc<dyn LogStore>,
|
||||||
file_location_cache: Arc<FileLocationCache>,
|
file_location_cache: Arc<FileLocationCache>,
|
||||||
event_recv: broadcast::Receiver<LogSyncEvent>,
|
event_recv: broadcast::Receiver<LogSyncEvent>,
|
||||||
@ -163,7 +162,7 @@ impl SyncService {
|
|||||||
pub async fn spawn_with_config(
|
pub async fn spawn_with_config(
|
||||||
config: Config,
|
config: Config,
|
||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
store: Arc<dyn LogStore>,
|
store: Arc<dyn LogStore>,
|
||||||
file_location_cache: Arc<FileLocationCache>,
|
file_location_cache: Arc<FileLocationCache>,
|
||||||
event_recv: broadcast::Receiver<LogSyncEvent>,
|
event_recv: broadcast::Receiver<LogSyncEvent>,
|
||||||
@ -897,7 +896,9 @@ mod tests {
|
|||||||
use crate::test_util::tests::create_file_location_cache;
|
use crate::test_util::tests::create_file_location_cache;
|
||||||
use libp2p::identity;
|
use libp2p::identity;
|
||||||
use network::discovery::ConnectionId;
|
use network::discovery::ConnectionId;
|
||||||
|
use network::new_network_channel;
|
||||||
use network::rpc::SubstreamId;
|
use network::rpc::SubstreamId;
|
||||||
|
use network::NetworkReceiver;
|
||||||
use network::ReportSource;
|
use network::ReportSource;
|
||||||
use shared_types::ChunkArray;
|
use shared_types::ChunkArray;
|
||||||
use shared_types::Transaction;
|
use shared_types::Transaction;
|
||||||
@ -909,8 +910,6 @@ mod tests {
|
|||||||
use storage::log_store::LogStoreRead;
|
use storage::log_store::LogStoreRead;
|
||||||
use storage::H256;
|
use storage::H256;
|
||||||
use task_executor::test_utils::TestRuntime;
|
use task_executor::test_utils::TestRuntime;
|
||||||
use tokio::sync::mpsc::UnboundedReceiver;
|
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
|
||||||
|
|
||||||
struct TestSyncRuntime {
|
struct TestSyncRuntime {
|
||||||
runtime: TestRuntime,
|
runtime: TestRuntime,
|
||||||
@ -924,8 +923,8 @@ mod tests {
|
|||||||
init_peer_id: PeerId,
|
init_peer_id: PeerId,
|
||||||
file_location_cache: Arc<FileLocationCache>,
|
file_location_cache: Arc<FileLocationCache>,
|
||||||
|
|
||||||
network_send: UnboundedSender<NetworkMessage>,
|
network_send: NetworkSender,
|
||||||
network_recv: UnboundedReceiver<NetworkMessage>,
|
network_recv: NetworkReceiver,
|
||||||
event_send: broadcast::Sender<LogSyncEvent>,
|
event_send: broadcast::Sender<LogSyncEvent>,
|
||||||
catch_up_end_recv: Option<oneshot::Receiver<()>>,
|
catch_up_end_recv: Option<oneshot::Receiver<()>>,
|
||||||
}
|
}
|
||||||
@ -942,7 +941,7 @@ mod tests {
|
|||||||
let (store, peer_store, txs, data) = create_2_store(chunk_counts);
|
let (store, peer_store, txs, data) = create_2_store(chunk_counts);
|
||||||
let init_data = data[0].clone();
|
let init_data = data[0].clone();
|
||||||
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
||||||
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
let (network_send, network_recv) = new_network_channel();
|
||||||
let (event_send, _) = broadcast::channel(16);
|
let (event_send, _) = broadcast::channel(16);
|
||||||
let (_, catch_up_end_recv) = oneshot::channel();
|
let (_, catch_up_end_recv) = oneshot::channel();
|
||||||
|
|
||||||
@ -1006,7 +1005,7 @@ mod tests {
|
|||||||
let file_location_cache: Arc<FileLocationCache> =
|
let file_location_cache: Arc<FileLocationCache> =
|
||||||
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
||||||
|
|
||||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
let (network_send, mut network_recv) = new_network_channel();
|
||||||
let (_, sync_recv) = channel::Channel::unbounded("test");
|
let (_, sync_recv) = channel::Channel::unbounded("test");
|
||||||
|
|
||||||
let mut sync = SyncService {
|
let mut sync = SyncService {
|
||||||
@ -1035,7 +1034,7 @@ mod tests {
|
|||||||
let file_location_cache: Arc<FileLocationCache> =
|
let file_location_cache: Arc<FileLocationCache> =
|
||||||
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
||||||
|
|
||||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
let (network_send, mut network_recv) = new_network_channel();
|
||||||
let (_, sync_recv) = channel::Channel::unbounded("test");
|
let (_, sync_recv) = channel::Channel::unbounded("test");
|
||||||
|
|
||||||
let mut sync = SyncService {
|
let mut sync = SyncService {
|
||||||
@ -1354,7 +1353,7 @@ mod tests {
|
|||||||
let file_location_cache: Arc<FileLocationCache> =
|
let file_location_cache: Arc<FileLocationCache> =
|
||||||
create_file_location_cache(init_peer_id, vec![]);
|
create_file_location_cache(init_peer_id, vec![]);
|
||||||
|
|
||||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
let (network_send, mut network_recv) = new_network_channel();
|
||||||
let (_event_send, event_recv) = broadcast::channel(16);
|
let (_event_send, event_recv) = broadcast::channel(16);
|
||||||
let (_, catch_up_end_recv) = oneshot::channel();
|
let (_, catch_up_end_recv) = oneshot::channel();
|
||||||
let sync_send = SyncService::spawn_with_config(
|
let sync_send = SyncService::spawn_with_config(
|
||||||
@ -1781,7 +1780,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_chunk_request(
|
async fn receive_chunk_request(
|
||||||
network_recv: &mut UnboundedReceiver<NetworkMessage>,
|
network_recv: &mut NetworkReceiver,
|
||||||
sync_send: &SyncSender,
|
sync_send: &SyncSender,
|
||||||
peer_store: Arc<LogManager>,
|
peer_store: Arc<LogManager>,
|
||||||
init_peer_id: PeerId,
|
init_peer_id: PeerId,
|
||||||
|
Loading…
Reference in New Issue
Block a user