Compare commits

...

3 Commits

Author SHA1 Message Date
0xroy
bba779a2b8
Merge fd9c033176 into 1de7afec14 2024-11-12 17:26:20 +08:00
Bo QIU
1de7afec14
Add more metrics for network unbounded channel (#264)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Add metrics for file finalization in chunk pool

* Add metrics for network unbounded channel
2024-11-12 17:25:49 +08:00
Roy Lu
fd9c033176 Updated README 2024-10-23 08:52:56 -07:00
18 changed files with 95 additions and 126 deletions

3
Cargo.lock generated
View File

@ -912,7 +912,9 @@ dependencies = [
"anyhow",
"async-lock 2.8.0",
"hashlink 0.8.4",
"lazy_static",
"log_entry_sync",
"metrics",
"network",
"shared_types",
"storage-async",
@ -5030,6 +5032,7 @@ dependencies = [
name = "network"
version = "0.2.0"
dependencies = [
"channel",
"directory",
"dirs 4.0.0",
"discv5",

View File

@ -2,69 +2,32 @@
## Overview
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, in areas such as AI and gaming.
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
In addition, it forms the storage layer for the 0G data availability (DA) system, with the cross-layer integration abstracted away from Rollup and AppChain builders.
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/)
## System Architecture
## Integration
0G Storage consists of two main components:
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features:
1. **Data Publishing Lane**: Ensures quick data availability and verification through the 0G Consensus network.
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and reliability.
* File Merkle Tree Class
* Flow Contract Types
* RPC methods support
* File upload
* Support browser environment
* Tests for different environments (In Progress)
* File download (In Progress)
Across the two lanes, 0G Storage supports the following features:
## Deployment
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Incentivized Participation**: Utilizes the PoRA (Proof of Random Access) mining algorithm to incentivize storage network participants.
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node.
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/og-storage).
## Test
## Documentation
### Prerequisites
- If you want to run a node, please refer to the [Running a Node](https://docs.0g.ai/run-a-node/storage-node) guide.
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/build-with-0g/storage-sdk) guide.
* Required python version: 3.8, 3.9, 3.10, higher version is not guaranteed (e.g. failed to install `pysha3`).
* Install dependencies under root folder: `pip3 install -r requirements.txt`
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
### Dependencies
Python test framework will launch blockchain fullnodes at local for storage node to interact with. There are 2 kinds of fullnodes supported:
* Conflux eSpace node (by default).
* BSC node (geth).
For Conflux eSpace node, the test framework will automatically compile the binary at runtime, and copy the binary to `tests/tmp` folder. For BSC node, the test framework will automatically download the latest version binary from [github](https://github.com/bnb-chain/bsc/releases) to `tests/tmp` folder.
Alternatively, you could also manually copy specific version binaries (conflux or geth) to the `tests/tmp` folder. Note, do **NOT** copy released conflux binary on github, since block height of some CIPs are hardcoded.
For testing, it's also dependent on the following repos:
* [0G Storage Contract](https://github.com/0glabs/0g-storage-contracts): It essentially provides two abi interfaces for 0G Storage Node to interact with the on-chain contracts.
* ZgsFlow: It contains apis to submit chunk data.
* PoraMine: It contains apis to submit PoRA answers.
* [0G Storage Client](https://github.com/0glabs/0g-storage-client): It is used to interact with certain 0G Storage Nodes to upload/download files.
### Run Tests
Go to the `tests` folder and run the following command to run all tests:
```
python test_all.py
```
or, run any single test, e.g.
```
python sync_test.py
```
## Contributing
To make contributions to the project, please follow the guidelines [here](contributing.md).
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)

View File

@ -13,3 +13,5 @@ tokio = { version = "1.19.2", features = ["sync"] }
async-lock = "2.5.0"
hashlink = "0.8.0"
tracing = "0.1.35"
lazy_static = "1.4.0"
metrics = { workspace = true }

View File

@ -1,11 +1,16 @@
use super::mem_pool::MemoryChunkPool;
use crate::mem_pool::FileID;
use anyhow::Result;
use network::NetworkMessage;
use metrics::{Histogram, Sample};
use network::{NetworkMessage, NetworkSender};
use shared_types::{ChunkArray, FileProof};
use std::{sync::Arc, time::SystemTime};
use std::{sync::Arc, time::Instant};
use storage_async::{ShardConfig, Store};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::UnboundedReceiver;
lazy_static::lazy_static! {
pub static ref FINALIZE_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("chunk_pool_finalize_file_latency", 1024);
}
/// Handle the cached file when uploaded completely and verified from blockchain.
/// Generally, the file will be persisted into log store.
@ -13,7 +18,7 @@ pub struct ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
sender: NetworkSender,
}
impl ChunkPoolHandler {
@ -21,7 +26,7 @@ impl ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
sender: NetworkSender,
) -> Self {
ChunkPoolHandler {
receiver,
@ -68,7 +73,7 @@ impl ChunkPoolHandler {
}
}
let start = SystemTime::now();
let start = Instant::now();
if !self
.log_store
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
@ -77,8 +82,9 @@ impl ChunkPoolHandler {
return Ok(false);
}
let elapsed = start.elapsed()?;
let elapsed = start.elapsed();
debug!(?id, ?elapsed, "Transaction finalized");
FINALIZE_FILE_LATENCY.update_since(start);
// always remove file from pool after transaction finalized
self.mem_pool.remove_file(&id.root).await;

View File

@ -29,7 +29,7 @@ impl Config {
pub fn unbounded(
config: Config,
log_store: Arc<storage_async::Store>,
network_send: tokio::sync::mpsc::UnboundedSender<network::NetworkMessage>,
network_send: network::NetworkSender,
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

View File

@ -3,13 +3,12 @@ use crate::monitor::Monitor;
use crate::sealer::Sealer;
use crate::submitter::Submitter;
use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher};
use network::NetworkMessage;
use network::NetworkSender;
use std::sync::Arc;
use std::time::Duration;
use storage::config::ShardConfig;
use storage_async::Store;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
#[derive(Clone, Debug)]
pub enum MinerMessage {
@ -29,7 +28,7 @@ pub struct MineService;
impl MineService {
pub async fn spawn(
executor: task_executor::TaskExecutor,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
_network_send: NetworkSender,
config: MinerConfig,
store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {

View File

@ -41,6 +41,7 @@ if-addrs = "0.10.1"
slog = "2.7.0"
igd = "0.12.1"
duration-str = "0.5.1"
channel = { path = "../../common/channel" }
[dependencies.libp2p]
version = "0.45.1"

View File

@ -159,3 +159,10 @@ pub enum NetworkMessage {
udp_socket: Option<SocketAddr>,
},
}
pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
channel::metrics::unbounded_channel("network")
}

View File

@ -3,10 +3,9 @@
//! Currently supported strategies:
//! - UPnP
use crate::{NetworkConfig, NetworkMessage};
use crate::{NetworkConfig, NetworkMessage, NetworkSender};
use if_addrs::get_if_addrs;
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc;
/// Configuration required to construct the UPnP port mappings.
pub struct UPnPConfig {
@ -36,10 +35,7 @@ impl UPnPConfig {
}
/// Attempts to construct external port mappings with UPnP.
pub fn construct_upnp_mappings(
config: UPnPConfig,
network_send: mpsc::UnboundedSender<NetworkMessage>,
) {
pub fn construct_upnp_mappings(config: UPnPConfig, network_send: NetworkSender) {
info!("UPnP Attempting to initialise routes");
match igd::search_gateway(Default::default()) {
Err(e) => info!(error = %e, "UPnP not available"),

View File

@ -4,7 +4,7 @@ use crate::discovery::enr;
use crate::multiaddr::Protocol;
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId};
use crate::types::{error, GossipKind};
use crate::{EnrExt, NetworkMessage};
use crate::{EnrExt, NetworkSender};
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
use futures::prelude::*;
use libp2p::core::{
@ -21,7 +21,6 @@ use std::io::prelude::*;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
@ -60,7 +59,7 @@ pub struct Context<'a> {
impl<AppReqId: ReqId> Service<AppReqId> {
pub async fn new(
executor: task_executor::TaskExecutor,
network_sender: UnboundedSender<NetworkMessage>,
network_sender: NetworkSender,
ctx: Context<'_>,
) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> {
trace!("Libp2p Service starting");

View File

@ -1,6 +1,7 @@
#![cfg(test)]
use libp2p::gossipsub::GossipsubConfigBuilder;
use network::new_network_channel;
use network::Enr;
use network::EnrExt;
use network::Multiaddr;
@ -22,7 +23,6 @@ pub mod swarm;
type ReqId = usize;
use tempfile::Builder as TempBuilder;
use tokio::sync::mpsc::unbounded_channel;
#[allow(unused)]
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
@ -72,7 +72,7 @@ pub async fn build_libp2p_instance(rt: Weak<Runtime>, boot_nodes: Vec<Enr>) -> L
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
let libp2p_context = network::Context { config: &config };
let (sender, _) = unbounded_channel();
let (sender, _) = new_network_channel();
Libp2pInstance(
LibP2PService::new(executor, sender, libp2p_context)
.await

View File

@ -16,7 +16,7 @@ use network::{
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
PublicKey, PubsubMessage, Request, RequestId, Response,
};
use network::{Multiaddr, PeerAction, ReportSource};
use network::{Multiaddr, NetworkSender, PeerAction, ReportSource};
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
use storage::config::ShardConfig;
use storage_async::Store;
@ -88,7 +88,7 @@ pub struct Libp2pEventHandler {
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals>,
/// A channel to the router service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
/// A channel to the syncing service.
sync_send: SyncSender,
/// A channel to the RPC chunk pool service.
@ -112,7 +112,7 @@ impl Libp2pEventHandler {
pub fn new(
config: Config,
network_globals: Arc<NetworkGlobals>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
sync_send: SyncSender,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
local_keypair: Keypair,
@ -1010,10 +1010,12 @@ mod tests {
use network::{
discovery::{CombinedKey, ConnectionId},
discv5::enr::EnrBuilder,
new_network_channel,
rpc::{GetChunksRequest, StatusMessage, SubstreamId},
types::FindFile,
CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals,
NetworkMessage, PeerId, PubsubMessage, Request, RequestId, Response, SyncId,
NetworkMessage, NetworkReceiver, PeerId, PubsubMessage, Request, RequestId, Response,
SyncId,
};
use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID};
use storage::{
@ -1035,8 +1037,8 @@ mod tests {
runtime: TestRuntime,
network_globals: Arc<NetworkGlobals>,
keypair: Keypair,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_send: NetworkSender,
network_recv: NetworkReceiver,
sync_send: SyncSender,
sync_recv: SyncReceiver,
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
@ -1050,7 +1052,7 @@ mod tests {
fn default() -> Self {
let runtime = TestRuntime::default();
let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel();
let (network_send, network_recv) = new_network_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();

View File

@ -5,11 +5,9 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::types::NewFile;
use network::PubsubMessage;
use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService, Swarm,
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
};
use pruner::PrunerMessage;
use shared_types::timestamp_now;
@ -33,7 +31,7 @@ pub struct RouterService {
network_globals: Arc<NetworkGlobals>,
/// The receiver channel for Zgs to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_recv: NetworkReceiver,
/// The receiver channel for Zgs to communicate with the pruner service.
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
@ -57,8 +55,8 @@ impl RouterService {
executor: task_executor::TaskExecutor,
libp2p: LibP2PService<RequestId>,
network_globals: Arc<NetworkGlobals>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_recv: NetworkReceiver,
network_send: NetworkSender,
sync_send: SyncSender,
_miner_send: Option<broadcast::Sender<MinerMessage>>,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,

View File

@ -17,15 +17,13 @@ use file_location_cache::FileLocationCache;
use futures::channel::mpsc::Sender;
use jsonrpsee::core::RpcResult;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use network::NetworkGlobals;
use network::NetworkMessage;
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
use std::error::Error;
use std::sync::Arc;
use storage_async::Store;
use sync::{SyncRequest, SyncResponse, SyncSender};
use task_executor::ShutdownReason;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use zgs::RpcServer as ZgsRpcServer;
use zgs_miner::MinerMessage;
@ -42,7 +40,7 @@ pub struct Context {
pub config: RPCConfig,
pub file_location_cache: Arc<FileLocationCache>,
pub network_globals: Arc<NetworkGlobals>,
pub network_send: UnboundedSender<NetworkMessage>,
pub network_send: NetworkSender,
pub sync_send: SyncSender,
pub chunk_pool: Arc<MemoryChunkPool>,
pub log_store: Arc<Store>,

View File

@ -4,8 +4,8 @@ use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
use network::{
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService,
self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
NetworkSender, RequestId, Service as LibP2PService,
};
use pruner::{Pruner, PrunerConfig, PrunerMessage};
use router::RouterService;
@ -27,15 +27,12 @@ macro_rules! require {
}
struct NetworkComponents {
send: mpsc::UnboundedSender<NetworkMessage>,
send: NetworkSender,
globals: Arc<NetworkGlobals>,
keypair: Keypair,
// note: these will be owned by the router service
owned: Option<(
LibP2PService<RequestId>,
mpsc::UnboundedReceiver<NetworkMessage>,
)>,
owned: Option<(LibP2PService<RequestId>, NetworkReceiver)>,
}
struct SyncComponents {
@ -144,7 +141,7 @@ impl ClientBuilder {
let service_context = network::Context { config };
// construct communication channel
let (send, recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (send, recv) = new_network_channel();
// launch libp2p service
let (globals, keypair, libp2p) =

View File

@ -1,12 +1,11 @@
use network::{NetworkMessage, PeerAction, PeerId, PubsubMessage, ReportSource};
use tokio::sync::mpsc;
use network::{NetworkMessage, NetworkSender, PeerAction, PeerId, PubsubMessage, ReportSource};
pub struct SyncNetworkContext {
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
}
impl SyncNetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>) -> Self {
pub fn new(network_send: NetworkSender) -> Self {
Self { network_send }
}

View File

@ -750,13 +750,13 @@ mod tests {
use crate::test_util::create_2_store;
use crate::test_util::tests::create_file_location_cache;
use libp2p::identity;
use network::{new_network_channel, NetworkReceiver};
use network::{ReportSource, Request};
use storage::log_store::log_manager::LogConfig;
use storage::log_store::log_manager::LogManager;
use storage::log_store::LogStoreRead;
use storage::H256;
use task_executor::{test_utils::TestRuntime, TaskExecutor};
use tokio::sync::mpsc::{self, UnboundedReceiver};
#[test]
fn test_status() {
@ -1649,7 +1649,7 @@ mod tests {
fn create_default_controller(
task_executor: TaskExecutor,
peer_id: Option<PeerId>,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
) -> (SerialSyncController, NetworkReceiver) {
let tx_id = TxID {
seq: 0,
hash: H256::random(),
@ -1668,8 +1668,8 @@ mod tests {
store: Arc<LogManager>,
tx_id: TxID,
num_chunks: usize,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
) -> (SerialSyncController, NetworkReceiver) {
let (network_send, network_recv) = new_network_channel();
let ctx = Arc::new(SyncNetworkContext::new(network_send));
let peer_id = match peer_id {

View File

@ -10,10 +10,9 @@ use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent;
use network::rpc::methods::FileAnnouncement;
use network::types::{AnnounceChunks, FindFile, NewFile};
use network::PubsubMessage;
use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
PeerRequestId, SyncId as RequestId,
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, NetworkSender,
PeerId, PeerRequestId, PubsubMessage, SyncId as RequestId,
};
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
use std::sync::atomic::Ordering;
@ -27,7 +26,7 @@ use storage::error::Result as StorageResult;
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage::log_store::Store as LogStore;
use storage_async::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::sync::{broadcast, oneshot};
pub type SyncSender = channel::Sender<SyncMessage, SyncRequest, SyncResponse>;
pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse>;
@ -142,7 +141,7 @@ pub struct SyncService {
impl SyncService {
pub async fn spawn(
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
@ -163,7 +162,7 @@ impl SyncService {
pub async fn spawn_with_config(
config: Config,
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
@ -897,7 +896,9 @@ mod tests {
use crate::test_util::tests::create_file_location_cache;
use libp2p::identity;
use network::discovery::ConnectionId;
use network::new_network_channel;
use network::rpc::SubstreamId;
use network::NetworkReceiver;
use network::ReportSource;
use shared_types::ChunkArray;
use shared_types::Transaction;
@ -909,8 +910,6 @@ mod tests {
use storage::log_store::LogStoreRead;
use storage::H256;
use task_executor::test_utils::TestRuntime;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
struct TestSyncRuntime {
runtime: TestRuntime,
@ -924,8 +923,8 @@ mod tests {
init_peer_id: PeerId,
file_location_cache: Arc<FileLocationCache>,
network_send: UnboundedSender<NetworkMessage>,
network_recv: UnboundedReceiver<NetworkMessage>,
network_send: NetworkSender,
network_recv: NetworkReceiver,
event_send: broadcast::Sender<LogSyncEvent>,
catch_up_end_recv: Option<oneshot::Receiver<()>>,
}
@ -942,7 +941,7 @@ mod tests {
let (store, peer_store, txs, data) = create_2_store(chunk_counts);
let init_data = data[0].clone();
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, network_recv) = new_network_channel();
let (event_send, _) = broadcast::channel(16);
let (_, catch_up_end_recv) = oneshot::channel();
@ -1006,7 +1005,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, mut network_recv) = new_network_channel();
let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService {
@ -1035,7 +1034,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, mut network_recv) = new_network_channel();
let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService {
@ -1354,7 +1353,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, mut network_recv) = new_network_channel();
let (_event_send, event_recv) = broadcast::channel(16);
let (_, catch_up_end_recv) = oneshot::channel();
let sync_send = SyncService::spawn_with_config(
@ -1781,7 +1780,7 @@ mod tests {
}
async fn receive_chunk_request(
network_recv: &mut UnboundedReceiver<NetworkMessage>,
network_recv: &mut NetworkReceiver,
sync_send: &SyncSender,
peer_store: Arc<LogManager>,
init_peer_id: PeerId,