Compare commits

..

4 Commits

Author SHA1 Message Date
0g-peterzhb
27366a5331
add docker support (#241)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-11-25 10:26:11 +08:00
0xroy
1f4d56b645
Update README (#286)
* Updated README

* Edited README
2024-11-25 10:24:38 +08:00
0g-peterzhb
d0aad154da
remove unnecessary backtrace (#280) 2024-11-25 10:23:56 +08:00
Bo QIU
b9e6431a4d
Add shard config in STATUS message and only dail to shard config matched peers (#285)
* Add shard config in status message

* verify shard config for status message

* Notify peer connected to sync layer after status message exchanged

* Do not dial to shard config mismatched peers

* Upgrade network protocol version

* disconnect peer instead of ban peer if shard config mismatch

* Add python test for TCP connection by shard config
2024-11-25 10:15:30 +08:00
24 changed files with 274 additions and 113 deletions

6
DockerfileStandard Normal file
View File

@ -0,0 +1,6 @@
FROM rust
VOLUME ["/data"]
COPY . .
RUN apt-get update && apt-get install -y clang cmake build-essential pkg-config libssl-dev
RUN cargo build --release
CMD ["./target/release/zgs_node", "--config", "run/config-testnet-standard.toml", "--log", "run/log_config"]

6
DockerfileTurbo Normal file
View File

@ -0,0 +1,6 @@
FROM rust
VOLUME ["/data"]
COPY . .
RUN apt-get update && apt-get install -y clang cmake build-essential pkg-config libssl-dev
RUN cargo build --release
CMD ["./target/release/zgs_node", "--config", "run/config-testnet-turbo.toml", "--log", "run/log_config"]

View File

@ -2,34 +2,31 @@
## Overview
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, particularly for areas such as AI.
* Built-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
## System Architecture
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/)
0G Storage consists of two main components:
## Integration
1. **Data Publishing Lane**: Ensures fast Merkle treedata root commitment and verification through 0G Chain.
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and sharding for parallel processing.
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features:
Across the two lanes, 0G Storage supports the following features:
* File Merkle Tree Class
* Flow Contract Types
* RPC methods support
* File upload
* Support browser environment
* Tests for different environments (In Progress)
* File download (In Progress)
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Validated Incentivization**: Utilizes the PoRA (Proof of Random Access) mining algorithm to mitigate data outsourcing issue and to ensure rewards are distributed to nodes who contribute to the storage network.
## Deployment
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/og-storage).
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node.
## Documentation
## Test
- If you want to run a node, please refer to the [Running a Node](https://docs.0g.ai/run-a-node/storage-node) guide.
- If you want to conduct local testing, please refer to [Onebox Testing](https://github.com/0glabs/0g-storage-node/blob/main/docs/onebox-test.md) guide.
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/build-with-0g/storage-sdk) guide.
Please refer to the [One Box Test](docs/onebox-test.md) page for local testing purpose.
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
## Contributing
To make contributions to the project, please follow the guidelines [here](contributing.md).
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)

View File

@ -96,8 +96,10 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
/// Defines the current P2P protocol version.
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
/// - v3: Add shard config in Status message.
pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1];
pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1];
pub const PROTOCOL_VERSION_V3: [u8; 3] = [0, 3, 0];
/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)]
@ -150,6 +152,8 @@ pub enum NetworkMessage {
},
/// Start dialing a new peer.
DialPeer { address: Multiaddr, peer_id: PeerId },
/// Disconnect a peer.
DisconnectPeer { peer_id: PeerId },
/// Notify that new file stored in db.
AnnounceLocalFile { tx_id: TxID },
/// Called if a known external TCP socket address has been updated.
@ -165,5 +169,5 @@ pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
channel::metrics::unbounded_channel("network")
channel::metrics::unbounded_channel("network_channel")
}

View File

@ -1,6 +1,7 @@
use std::time::Duration;
use std::{fmt::Debug, sync::Arc, time::Duration};
use duration_str::deserialize_duration;
use libp2p::PeerId;
use serde::{Deserialize, Serialize};
/// The time in seconds between re-status's peers.
@ -16,7 +17,7 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20;
pub const DEFAULT_TARGET_PEERS: usize = 50;
/// Configurations for the PeerManager.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
/* Peer count related configurations */
@ -40,6 +41,9 @@ pub struct Config {
pub ping_interval_inbound: u64,
/// Interval between PING events for peers dialed by us.
pub ping_interval_outbound: u64,
#[serde(skip)]
pub filters: Filters,
}
impl Default for Config {
@ -52,6 +56,29 @@ impl Default for Config {
status_interval: DEFAULT_STATUS_INTERVAL,
ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND,
ping_interval_outbound: DEFAULT_PING_INTERVAL_OUTBOUND,
filters: Default::default(),
}
}
}
#[derive(Clone)]
pub struct Filters {
/// Decide whether to dial to specified peer.
pub dial_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
}
impl Default for Filters {
fn default() -> Self {
Filters {
dial_peer_filter: None,
}
}
}
impl Debug for Filters {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Filters")
.field("dial_peer_filter", &self.dial_peer_filter.is_some())
.finish()
}
}

View File

@ -69,6 +69,8 @@ pub struct PeerManager {
discovery_enabled: bool,
/// Keeps track if the current instance is reporting metrics or not.
metrics_enabled: bool,
filters: config::Filters,
}
/// The events that the `PeerManager` outputs (requests).
@ -108,6 +110,7 @@ impl PeerManager {
status_interval,
ping_interval_inbound,
ping_interval_outbound,
filters,
} = cfg;
// Set up the peer manager heartbeat interval
@ -123,6 +126,7 @@ impl PeerManager {
heartbeat,
discovery_enabled,
metrics_enabled,
filters,
})
}
@ -277,6 +281,10 @@ impl PeerManager {
}
}
if let Some(dial_peer_filter) = self.filters.dial_peer_filter.clone() {
to_dial_peers.retain(|peer_id| dial_peer_filter(peer_id));
}
// Queue another discovery if we need to
self.maintain_peer_count(to_dial_peers.len());
@ -693,7 +701,7 @@ impl PeerManager {
}
// Gracefully disconnects a peer without banning them.
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
self.network_globals

View File

@ -35,7 +35,7 @@ pub struct PeerDBConfig {
pub allowed_negative_gossipsub_factor: f32,
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
#[serde(deserialize_with = "deserialize_duration")]
pub dail_timeout: Duration,
pub dial_timeout: Duration,
}
impl Default for PeerDBConfig {
@ -45,7 +45,7 @@ impl Default for PeerDBConfig {
max_banned_peers: 1000,
banned_peers_per_ip_threshold: 5,
allowed_negative_gossipsub_factor: 0.1,
dail_timeout: Duration::from_secs(15),
dial_timeout: Duration::from_secs(15),
}
}
}
@ -339,7 +339,7 @@ impl PeerDB {
.iter()
.filter_map(|(peer_id, info)| {
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
if (*since) + self.config.dail_timeout < std::time::Instant::now() {
if (*since) + self.config.dial_timeout < std::time::Instant::now() {
return Some(*peer_id);
}
}

View File

@ -399,9 +399,7 @@ mod tests {
use std::io::Write;
fn status_message() -> StatusMessage {
StatusMessage {
data: Default::default(),
}
Default::default()
}
fn ping_message() -> Ping {
@ -570,10 +568,7 @@ mod tests {
assert_eq!(stream_identifier.len(), 10);
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
let status_message_bytes = StatusMessage {
data: Default::default(),
}
.as_ssz_bytes();
let status_message_bytes = StatusMessage::default().as_ssz_bytes();
let mut uvi_codec: Uvi<usize> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);

View File

@ -69,9 +69,13 @@ impl ToString for ErrorType {
/* Requests */
/// The STATUS request/response handshake message.
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq, Default)]
pub struct StatusMessage {
pub data: NetworkIdentity,
// shard config
pub num_shard: usize,
pub shard_id: usize,
}
/// The PING request/response message.

View File

@ -23,14 +23,10 @@ fn test_status_rpc() {
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt)).await;
// Dummy STATUS RPC message
let rpc_request = Request::Status(StatusMessage {
data: Default::default(),
});
let rpc_request = Request::Status(Default::default());
// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage {
data: Default::default(),
});
let rpc_response = Response::Status(Default::default());
// build the sender future
let sender_future = async {

View File

@ -172,8 +172,11 @@ impl Libp2pEventHandler {
}
pub fn send_status(&self, peer_id: PeerId) {
let shard_config = self.store.get_store().get_shard_config();
let status_message = StatusMessage {
data: self.network_globals.network_id(),
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
};
debug!(%peer_id, ?status_message, "Sending Status request");
@ -191,7 +194,6 @@ impl Libp2pEventHandler {
if outgoing {
self.send_status(peer_id);
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
metrics::LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING.mark(1);
} else {
metrics::LIBP2P_HANDLE_PEER_CONNECTED_INCOMING.mark(1);
@ -254,8 +256,11 @@ impl Libp2pEventHandler {
debug!(%peer_id, ?status, "Received Status request");
let network_id = self.network_globals.network_id();
let shard_config = self.store.get_store().get_shard_config();
let status_message = StatusMessage {
data: network_id.clone(),
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
};
debug!(%peer_id, ?status_message, "Sending Status response");
@ -264,12 +269,18 @@ impl Libp2pEventHandler {
id: request_id,
response: Response::Status(status_message),
});
self.on_status_message(peer_id, status, network_id);
if self.verify_status_message(peer_id, status, network_id, &shard_config) {
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
}
}
fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) {
let network_id = self.network_globals.network_id();
self.on_status_message(peer_id, status, network_id);
let shard_config = self.store.get_store().get_shard_config();
if self.verify_status_message(peer_id, status, network_id, &shard_config) {
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
}
}
pub async fn on_rpc_response(
@ -950,21 +961,54 @@ impl Libp2pEventHandler {
MessageAcceptance::Accept
}
fn on_status_message(
fn verify_status_message(
&self,
peer_id: PeerId,
status: StatusMessage,
network_id: NetworkIdentity,
) {
shard_config: &ShardConfig,
) -> bool {
if status.data != network_id {
warn!(%peer_id, ?network_id, ?status.data, "Report peer with incompatible network id");
self.send_to_network(NetworkMessage::ReportPeer {
peer_id,
action: PeerAction::Fatal,
source: ReportSource::Gossipsub,
source: ReportSource::RPC,
msg: "Incompatible network id in StatusMessage",
})
});
return false;
}
let peer_shard_config = match ShardConfig::new(status.shard_id, status.num_shard) {
Ok(v) => v,
Err(err) => {
warn!(%peer_id, ?status, ?err, "Report peer with invalid shard config");
self.send_to_network(NetworkMessage::ReportPeer {
peer_id,
action: PeerAction::Fatal,
source: ReportSource::RPC,
msg: "Invalid shard config in StatusMessage",
});
return false;
}
};
self.file_location_cache
.insert_peer_config(peer_id, peer_shard_config);
if !peer_shard_config.intersect(shard_config) {
info!(%peer_id, ?shard_config, ?status, "Report peer with mismatched shard config");
self.send_to_network(NetworkMessage::ReportPeer {
peer_id,
action: PeerAction::LowToleranceError,
source: ReportSource::RPC,
msg: "Shard config mismatch in StatusMessage",
});
self.send_to_network(NetworkMessage::DisconnectPeer { peer_id });
return false;
}
true
}
async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
@ -1184,10 +1228,7 @@ mod tests {
assert_eq!(handler.peers.read().await.size(), 1);
ctx.assert_status_request(alice);
assert!(matches!(
ctx.sync_recv.try_recv(),
Ok(Notification(SyncMessage::PeerConnected {peer_id})) if peer_id == alice
));
assert!(matches!(ctx.sync_recv.try_recv(), Err(TryRecvError::Empty)));
}
#[tokio::test]
@ -1216,6 +1257,8 @@ mod tests {
let req_id = (ConnectionId::new(4), SubstreamId(12));
let request = Request::Status(StatusMessage {
data: Default::default(),
num_shard: 1,
shard_id: 0,
});
handler.on_rpc_request(alice, req_id, request).await;

View File

@ -11,16 +11,14 @@ lazy_static::lazy_static! {
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "all");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "already");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "ok");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "fail");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
pub static ref SERVICE_EXPIRED_PEERS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024);
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_OK: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_ok", 1024);
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024);
// libp2p_event_handler

View File

@ -5,6 +5,8 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::rpc::GoodbyeReason;
use network::PeerId;
use network::{
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
@ -309,27 +311,30 @@ impl RouterService {
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
}
NetworkMessage::DialPeer { address, peer_id } => {
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER.mark(1);
if self.libp2p.swarm.is_connected(&peer_id) {
self.libp2p_event_handler
.send_to_sync(SyncMessage::PeerConnected { peer_id });
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY.mark(1);
} else {
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
Ok(()) => {
debug!(%address, "Dialing libp2p peer");
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK.mark(1);
}
Err(err) => {
info!(%address, error = ?err, "Failed to dial peer");
self.libp2p_event_handler
.send_to_sync(SyncMessage::DailFailed { peer_id, err });
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1);
.send_to_sync(SyncMessage::DialFailed { peer_id, err });
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL.mark(1);
}
};
}
}
NetworkMessage::DisconnectPeer { peer_id } => {
self.disconnect_peer(peer_id);
}
NetworkMessage::AnnounceLocalFile { tx_id } => {
let shard_config = self.store.get_shard_config();
let msg = PubsubMessage::NewFile(NewFile {
@ -399,24 +404,16 @@ impl RouterService {
debug!(%num_expired_peers, "Heartbeat, remove expired peers")
}
let mut num_succeeded = 0;
let mut num_failed = 0;
for peer_id in expired_peers {
// async operation, once peer disconnected, swarm event `PeerDisconnected`
// will be polled to handle in advance.
match self.libp2p.swarm.disconnect_peer_id(peer_id) {
Ok(_) => {
debug!(%peer_id, "Peer expired and disconnect it");
num_succeeded += 1;
}
Err(_) => {
debug!(%peer_id, "Peer expired but failed to disconnect");
num_failed += 1;
}
}
self.disconnect_peer(peer_id);
}
}
fn disconnect_peer(&mut self, peer_id: PeerId) {
let pm = self.libp2p.swarm.behaviour_mut().peer_manager_mut();
if pm.is_connected(&peer_id) {
pm.disconnect_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
}
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_OK.update(num_succeeded);
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL.update(num_failed);
}
}

View File

@ -134,11 +134,21 @@ impl ClientBuilder {
}
/// Starts the networking stack.
pub async fn with_network(mut self, config: &NetworkConfig) -> Result<Self, String> {
pub async fn with_network(mut self, mut config: NetworkConfig) -> Result<Self, String> {
let executor = require!("network", self, runtime_context).clone().executor;
let store = require!("network", self, store).clone();
let file_location_cache = require!("network", self, file_location_cache).clone();
// only dial to peers that shard config matched
config.peer_manager.filters.dial_peer_filter = Some(Arc::new(move |peer_id| {
match file_location_cache.get_peer_config(peer_id) {
Some(v) => store.get_shard_config().intersect(&v),
None => true,
}
}));
// construct the libp2p service context
let service_context = network::Context { config };
let service_context = network::Context { config: &config };
// construct communication channel
let (send, recv) = new_network_channel();

View File

@ -39,18 +39,17 @@ impl ZgsConfig {
.await
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
.as_u64();
let network_protocol_version = if self.sync.neighbors_only {
network::PROTOCOL_VERSION_V2
} else {
network::PROTOCOL_VERSION_V1
};
let local_network_id = NetworkIdentity {
chain_id,
flow_address,
p2p_protocol_version: ProtocolVersion {
major: network_protocol_version[0],
minor: network_protocol_version[1],
build: network_protocol_version[2],
major: network::PROTOCOL_VERSION_V3[0],
minor: network::PROTOCOL_VERSION_V3[1],
build: if self.sync.neighbors_only {
network::PROTOCOL_VERSION_V3[2] + 1
} else {
network::PROTOCOL_VERSION_V3[2]
},
},
};
network_config.network_id = local_network_id.clone();
@ -110,7 +109,7 @@ impl ZgsConfig {
network_config.private = self.network_private;
network_config.peer_db = self.network_peer_db;
network_config.peer_manager = self.network_peer_manager;
network_config.peer_manager = self.network_peer_manager.clone();
network_config.disable_enr_network_id = self.discv5_disable_enr_network_id;
Ok(network_config)

View File

@ -26,7 +26,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.with_log_sync(log_sync_config)
.await?
.with_file_location_cache(config.file_location_cache)
.with_network(&network_config)
.with_network(network_config)
.await?
.with_chunk_pool(chunk_pool_config)
.await?

View File

@ -2,7 +2,6 @@
extern crate tracing;
use anyhow::bail;
use backtrace::Backtrace;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
};
@ -140,9 +139,6 @@ impl Store {
{
let store = self.store.clone();
let (tx, rx) = oneshot::channel();
let mut backtrace = Backtrace::new();
let frames = backtrace.frames().to_vec();
backtrace = frames.into();
self.executor.spawn_blocking(
move || {
@ -150,7 +146,6 @@ impl Store {
let res = f(&*store);
if tx.send(res).is_err() {
warn!("Backtrace: {:?}", backtrace);
error!("Unable to complete async storage operation: the receiver dropped");
}
},

View File

@ -194,7 +194,7 @@ impl SyncPeers {
ctx.report_peer(
*peer_id,
PeerAction::LowToleranceError,
"Dail timeout",
"Dial timeout",
);
}

View File

@ -232,7 +232,7 @@ impl SerialSyncController {
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
/// data in all shards.
fn try_connect(&mut self) {
let mut num_peers_dailed = 0;
let mut num_peers_dialed = 0;
// select a random peer
while !self
@ -256,10 +256,10 @@ impl SerialSyncController {
self.peers
.update_state(&peer_id, PeerState::Found, PeerState::Connecting);
num_peers_dailed += 1;
num_peers_dialed += 1;
}
info!(%self.tx_seq, %num_peers_dailed, "Connecting peers");
info!(%self.tx_seq, %num_peers_dialed, "Connecting peers");
self.state = SyncState::ConnectingPeers {
origin: self.since,
@ -358,14 +358,14 @@ impl SerialSyncController {
.update_state_force(&peer_id, PeerState::Connected);
}
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
pub fn on_dial_failed(&mut self, peer_id: PeerId, err: &DialError) {
match err {
DialError::ConnectionLimit(_) => {
if let Some(true) =
self.peers
.update_state(&peer_id, PeerState::Connecting, PeerState::Found)
{
info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation");
info!(%self.tx_seq, %peer_id, "Failed to dial peer due to outgoing connection limitation");
self.state = SyncState::AwaitingOutgoingConnection {
since: Instant::now().into(),
};
@ -377,7 +377,7 @@ impl SerialSyncController {
PeerState::Connecting,
PeerState::Disconnected,
) {
info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer");
info!(%self.tx_seq, %peer_id, %err, "Failed to dial peer");
self.state = SyncState::Idle;
}
}

View File

@ -33,7 +33,7 @@ pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse
#[derive(Debug)]
pub enum SyncMessage {
DailFailed {
DialFailed {
peer_id: PeerId,
err: DialError,
},
@ -227,8 +227,8 @@ impl SyncService {
trace!("Sync received message {:?}", msg);
match msg {
SyncMessage::DailFailed { peer_id, err } => {
self.on_dail_failed(peer_id, err);
SyncMessage::DialFailed { peer_id, err } => {
self.on_dial_failed(peer_id, err);
}
SyncMessage::PeerConnected { peer_id } => {
self.on_peer_connected(peer_id);
@ -369,11 +369,11 @@ impl SyncService {
}
}
fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) {
info!(%peer_id, ?err, "Dail to peer failed");
fn on_dial_failed(&mut self, peer_id: PeerId, err: DialError) {
info!(%peer_id, ?err, "Dial to peer failed");
for controller in self.controllers.values_mut() {
controller.on_dail_failed(peer_id, &err);
controller.on_dial_failed(peer_id, &err);
controller.transition();
}
}

View File

@ -245,7 +245,7 @@ neighbors_only = true
# Maximum number of continous failures to terminate a file sync.
# max_request_failures = 5
# Timeout to dail peers.
# Timeout to dial peers.
# peer_connect_timeout = "15s"
# Timeout to disconnect peers.

View File

@ -257,7 +257,7 @@ neighbors_only = true
# Maximum number of continous failures to terminate a file sync.
# max_request_failures = 5
# Timeout to dail peers.
# Timeout to dial peers.
# peer_connect_timeout = "15s"
# Timeout to disconnect peers.

View File

@ -259,7 +259,7 @@ neighbors_only = true
# Maximum number of continous failures to terminate a file sync.
# max_request_failures = 5
# Timeout to dail peers.
# Timeout to dial peers.
# peer_connect_timeout = "15s"
# Timeout to disconnect peers.

View File

@ -0,0 +1,76 @@
#!/usr/bin/env python3
import os
import time
from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkTcpShardTest(TestFramework):
"""
This is to test TCP connection for shard config mismatched peers of UDP discovery.
"""
def setup_params(self):
# 1 bootnode and 2 community nodes
self.num_nodes = 3
# setup for node 0 as bootnode
self.zgs_node_key_files = [ZGS_KEY_FILE]
bootnode_port = p2p_port(0)
self.zgs_node_configs[0] = {
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
# custom shard config
"shard_position": "0/4"
}
# setup node 1 & 2 as community nodes
bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"]
for i in range(1, self.num_nodes):
self.zgs_node_configs[i] = {
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
# custom shard config
"shard_position": f"{i}/4"
}
def run_test(self):
timeout_secs = 10
for iter in range(timeout_secs):
time.sleep(1)
self.log.info("==================================== iter %s", iter)
for i in range(self.num_nodes):
info = self.nodes[i].rpc.admin_getNetworkInfo()
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
)
if i == timeout_secs - 1:
assert info["totalPeers"] == self.num_nodes - 1
assert info["bannedPeers"] == 0
assert info["disconnectedPeers"] == self.num_nodes - 1
assert info["connectedPeers"] == 0
self.log.info("====================================")
self.log.info("All nodes discovered but not connected for each other")
if __name__ == "__main__":
NetworkTcpShardTest().main()