mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-25 15:55:17 +00:00
Compare commits
No commits in common. "27366a5331d4c3a442af9b85275d1330843da7a7" and "bfe434972d9135b7ad2a4378f94429f2bcb9f5bd" have entirely different histories.
27366a5331
...
bfe434972d
@ -1,6 +0,0 @@
|
|||||||
FROM rust
|
|
||||||
VOLUME ["/data"]
|
|
||||||
COPY . .
|
|
||||||
RUN apt-get update && apt-get install -y clang cmake build-essential pkg-config libssl-dev
|
|
||||||
RUN cargo build --release
|
|
||||||
CMD ["./target/release/zgs_node", "--config", "run/config-testnet-standard.toml", "--log", "run/log_config"]
|
|
@ -1,6 +0,0 @@
|
|||||||
FROM rust
|
|
||||||
VOLUME ["/data"]
|
|
||||||
COPY . .
|
|
||||||
RUN apt-get update && apt-get install -y clang cmake build-essential pkg-config libssl-dev
|
|
||||||
RUN cargo build --release
|
|
||||||
CMD ["./target/release/zgs_node", "--config", "run/config-testnet-turbo.toml", "--log", "run/log_config"]
|
|
39
README.md
39
README.md
@ -2,31 +2,34 @@
|
|||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, particularly for areas such as AI.
|
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
|
||||||
|
|
||||||
## System Architecture
|
* Built-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
|
||||||
|
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
|
||||||
|
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
|
||||||
|
|
||||||
0G Storage consists of two main components:
|
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/)
|
||||||
|
|
||||||
1. **Data Publishing Lane**: Ensures fast Merkle treedata root commitment and verification through 0G Chain.
|
## Integration
|
||||||
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and sharding for parallel processing.
|
|
||||||
|
|
||||||
Across the two lanes, 0G Storage supports the following features:
|
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features:
|
||||||
|
|
||||||
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
|
* File Merkle Tree Class
|
||||||
* **Validated Incentivization**: Utilizes the PoRA (Proof of Random Access) mining algorithm to mitigate data outsourcing issue and to ensure rewards are distributed to nodes who contribute to the storage network.
|
* Flow Contract Types
|
||||||
|
* RPC methods support
|
||||||
|
* File upload
|
||||||
|
* Support browser environment
|
||||||
|
* Tests for different environments (In Progress)
|
||||||
|
* File download (In Progress)
|
||||||
|
|
||||||
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/og-storage).
|
## Deployment
|
||||||
|
|
||||||
## Documentation
|
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node.
|
||||||
|
|
||||||
- If you want to run a node, please refer to the [Running a Node](https://docs.0g.ai/run-a-node/storage-node) guide.
|
## Test
|
||||||
- If you want to conduct local testing, please refer to [Onebox Testing](https://github.com/0glabs/0g-storage-node/blob/main/docs/onebox-test.md) guide.
|
|
||||||
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/build-with-0g/storage-sdk) guide.
|
|
||||||
|
|
||||||
## Support and Additional Resources
|
Please refer to the [One Box Test](docs/onebox-test.md) page for local testing purpose.
|
||||||
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
|
|
||||||
|
|
||||||
### Communities
|
## Contributing
|
||||||
- [0G Telegram](https://t.me/web3_0glabs)
|
|
||||||
- [0G Discord](https://discord.com/invite/0glabs)
|
To make contributions to the project, please follow the guidelines [here](contributing.md).
|
||||||
|
@ -96,10 +96,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
|
|||||||
/// Defines the current P2P protocol version.
|
/// Defines the current P2P protocol version.
|
||||||
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
|
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
|
||||||
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
|
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
|
||||||
/// - v3: Add shard config in Status message.
|
|
||||||
pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1];
|
pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1];
|
||||||
pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1];
|
pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1];
|
||||||
pub const PROTOCOL_VERSION_V3: [u8; 3] = [0, 3, 0];
|
|
||||||
|
|
||||||
/// Application level requests sent to the network.
|
/// Application level requests sent to the network.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
@ -152,8 +150,6 @@ pub enum NetworkMessage {
|
|||||||
},
|
},
|
||||||
/// Start dialing a new peer.
|
/// Start dialing a new peer.
|
||||||
DialPeer { address: Multiaddr, peer_id: PeerId },
|
DialPeer { address: Multiaddr, peer_id: PeerId },
|
||||||
/// Disconnect a peer.
|
|
||||||
DisconnectPeer { peer_id: PeerId },
|
|
||||||
/// Notify that new file stored in db.
|
/// Notify that new file stored in db.
|
||||||
AnnounceLocalFile { tx_id: TxID },
|
AnnounceLocalFile { tx_id: TxID },
|
||||||
/// Called if a known external TCP socket address has been updated.
|
/// Called if a known external TCP socket address has been updated.
|
||||||
@ -169,5 +165,5 @@ pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
|
|||||||
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
|
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
|
||||||
|
|
||||||
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
|
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
|
||||||
channel::metrics::unbounded_channel("network_channel")
|
channel::metrics::unbounded_channel("network")
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use std::{fmt::Debug, sync::Arc, time::Duration};
|
use std::time::Duration;
|
||||||
|
|
||||||
use duration_str::deserialize_duration;
|
use duration_str::deserialize_duration;
|
||||||
use libp2p::PeerId;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// The time in seconds between re-status's peers.
|
/// The time in seconds between re-status's peers.
|
||||||
@ -17,7 +16,7 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20;
|
|||||||
pub const DEFAULT_TARGET_PEERS: usize = 50;
|
pub const DEFAULT_TARGET_PEERS: usize = 50;
|
||||||
|
|
||||||
/// Configurations for the PeerManager.
|
/// Configurations for the PeerManager.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
/* Peer count related configurations */
|
/* Peer count related configurations */
|
||||||
@ -41,9 +40,6 @@ pub struct Config {
|
|||||||
pub ping_interval_inbound: u64,
|
pub ping_interval_inbound: u64,
|
||||||
/// Interval between PING events for peers dialed by us.
|
/// Interval between PING events for peers dialed by us.
|
||||||
pub ping_interval_outbound: u64,
|
pub ping_interval_outbound: u64,
|
||||||
|
|
||||||
#[serde(skip)]
|
|
||||||
pub filters: Filters,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
@ -56,29 +52,6 @@ impl Default for Config {
|
|||||||
status_interval: DEFAULT_STATUS_INTERVAL,
|
status_interval: DEFAULT_STATUS_INTERVAL,
|
||||||
ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND,
|
ping_interval_inbound: DEFAULT_PING_INTERVAL_INBOUND,
|
||||||
ping_interval_outbound: DEFAULT_PING_INTERVAL_OUTBOUND,
|
ping_interval_outbound: DEFAULT_PING_INTERVAL_OUTBOUND,
|
||||||
filters: Default::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[derive(Clone)]
|
|
||||||
|
|
||||||
pub struct Filters {
|
|
||||||
/// Decide whether to dial to specified peer.
|
|
||||||
pub dial_peer_filter: Option<Arc<dyn Fn(&PeerId) -> bool + Sync + Send + 'static>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Filters {
|
|
||||||
fn default() -> Self {
|
|
||||||
Filters {
|
|
||||||
dial_peer_filter: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Debug for Filters {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
f.debug_struct("Filters")
|
|
||||||
.field("dial_peer_filter", &self.dial_peer_filter.is_some())
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -69,8 +69,6 @@ pub struct PeerManager {
|
|||||||
discovery_enabled: bool,
|
discovery_enabled: bool,
|
||||||
/// Keeps track if the current instance is reporting metrics or not.
|
/// Keeps track if the current instance is reporting metrics or not.
|
||||||
metrics_enabled: bool,
|
metrics_enabled: bool,
|
||||||
|
|
||||||
filters: config::Filters,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The events that the `PeerManager` outputs (requests).
|
/// The events that the `PeerManager` outputs (requests).
|
||||||
@ -110,7 +108,6 @@ impl PeerManager {
|
|||||||
status_interval,
|
status_interval,
|
||||||
ping_interval_inbound,
|
ping_interval_inbound,
|
||||||
ping_interval_outbound,
|
ping_interval_outbound,
|
||||||
filters,
|
|
||||||
} = cfg;
|
} = cfg;
|
||||||
|
|
||||||
// Set up the peer manager heartbeat interval
|
// Set up the peer manager heartbeat interval
|
||||||
@ -126,7 +123,6 @@ impl PeerManager {
|
|||||||
heartbeat,
|
heartbeat,
|
||||||
discovery_enabled,
|
discovery_enabled,
|
||||||
metrics_enabled,
|
metrics_enabled,
|
||||||
filters,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -281,10 +277,6 @@ impl PeerManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(dial_peer_filter) = self.filters.dial_peer_filter.clone() {
|
|
||||||
to_dial_peers.retain(|peer_id| dial_peer_filter(peer_id));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue another discovery if we need to
|
// Queue another discovery if we need to
|
||||||
self.maintain_peer_count(to_dial_peers.len());
|
self.maintain_peer_count(to_dial_peers.len());
|
||||||
|
|
||||||
@ -701,7 +693,7 @@ impl PeerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Gracefully disconnects a peer without banning them.
|
// Gracefully disconnects a peer without banning them.
|
||||||
pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
|
||||||
self.events
|
self.events
|
||||||
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
|
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
|
||||||
self.network_globals
|
self.network_globals
|
||||||
|
@ -35,7 +35,7 @@ pub struct PeerDBConfig {
|
|||||||
pub allowed_negative_gossipsub_factor: f32,
|
pub allowed_negative_gossipsub_factor: f32,
|
||||||
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
|
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
|
||||||
#[serde(deserialize_with = "deserialize_duration")]
|
#[serde(deserialize_with = "deserialize_duration")]
|
||||||
pub dial_timeout: Duration,
|
pub dail_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PeerDBConfig {
|
impl Default for PeerDBConfig {
|
||||||
@ -45,7 +45,7 @@ impl Default for PeerDBConfig {
|
|||||||
max_banned_peers: 1000,
|
max_banned_peers: 1000,
|
||||||
banned_peers_per_ip_threshold: 5,
|
banned_peers_per_ip_threshold: 5,
|
||||||
allowed_negative_gossipsub_factor: 0.1,
|
allowed_negative_gossipsub_factor: 0.1,
|
||||||
dial_timeout: Duration::from_secs(15),
|
dail_timeout: Duration::from_secs(15),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -339,7 +339,7 @@ impl PeerDB {
|
|||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(peer_id, info)| {
|
.filter_map(|(peer_id, info)| {
|
||||||
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
|
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
|
||||||
if (*since) + self.config.dial_timeout < std::time::Instant::now() {
|
if (*since) + self.config.dail_timeout < std::time::Instant::now() {
|
||||||
return Some(*peer_id);
|
return Some(*peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -399,7 +399,9 @@ mod tests {
|
|||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|
||||||
fn status_message() -> StatusMessage {
|
fn status_message() -> StatusMessage {
|
||||||
Default::default()
|
StatusMessage {
|
||||||
|
data: Default::default(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ping_message() -> Ping {
|
fn ping_message() -> Ping {
|
||||||
@ -568,7 +570,10 @@ mod tests {
|
|||||||
assert_eq!(stream_identifier.len(), 10);
|
assert_eq!(stream_identifier.len(), 10);
|
||||||
|
|
||||||
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
|
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
|
||||||
let status_message_bytes = StatusMessage::default().as_ssz_bytes();
|
let status_message_bytes = StatusMessage {
|
||||||
|
data: Default::default(),
|
||||||
|
}
|
||||||
|
.as_ssz_bytes();
|
||||||
|
|
||||||
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
||||||
let mut dst = BytesMut::with_capacity(1024);
|
let mut dst = BytesMut::with_capacity(1024);
|
||||||
|
@ -69,13 +69,9 @@ impl ToString for ErrorType {
|
|||||||
/* Requests */
|
/* Requests */
|
||||||
|
|
||||||
/// The STATUS request/response handshake message.
|
/// The STATUS request/response handshake message.
|
||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq, Default)]
|
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
||||||
pub struct StatusMessage {
|
pub struct StatusMessage {
|
||||||
pub data: NetworkIdentity,
|
pub data: NetworkIdentity,
|
||||||
|
|
||||||
// shard config
|
|
||||||
pub num_shard: usize,
|
|
||||||
pub shard_id: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The PING request/response message.
|
/// The PING request/response message.
|
||||||
|
@ -23,10 +23,14 @@ fn test_status_rpc() {
|
|||||||
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt)).await;
|
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt)).await;
|
||||||
|
|
||||||
// Dummy STATUS RPC message
|
// Dummy STATUS RPC message
|
||||||
let rpc_request = Request::Status(Default::default());
|
let rpc_request = Request::Status(StatusMessage {
|
||||||
|
data: Default::default(),
|
||||||
|
});
|
||||||
|
|
||||||
// Dummy STATUS RPC message
|
// Dummy STATUS RPC message
|
||||||
let rpc_response = Response::Status(Default::default());
|
let rpc_response = Response::Status(StatusMessage {
|
||||||
|
data: Default::default(),
|
||||||
|
});
|
||||||
|
|
||||||
// build the sender future
|
// build the sender future
|
||||||
let sender_future = async {
|
let sender_future = async {
|
||||||
|
@ -172,11 +172,8 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_status(&self, peer_id: PeerId) {
|
pub fn send_status(&self, peer_id: PeerId) {
|
||||||
let shard_config = self.store.get_store().get_shard_config();
|
|
||||||
let status_message = StatusMessage {
|
let status_message = StatusMessage {
|
||||||
data: self.network_globals.network_id(),
|
data: self.network_globals.network_id(),
|
||||||
num_shard: shard_config.num_shard,
|
|
||||||
shard_id: shard_config.shard_id,
|
|
||||||
};
|
};
|
||||||
debug!(%peer_id, ?status_message, "Sending Status request");
|
debug!(%peer_id, ?status_message, "Sending Status request");
|
||||||
|
|
||||||
@ -194,6 +191,7 @@ impl Libp2pEventHandler {
|
|||||||
|
|
||||||
if outgoing {
|
if outgoing {
|
||||||
self.send_status(peer_id);
|
self.send_status(peer_id);
|
||||||
|
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||||
metrics::LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING.mark(1);
|
metrics::LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING.mark(1);
|
||||||
} else {
|
} else {
|
||||||
metrics::LIBP2P_HANDLE_PEER_CONNECTED_INCOMING.mark(1);
|
metrics::LIBP2P_HANDLE_PEER_CONNECTED_INCOMING.mark(1);
|
||||||
@ -256,11 +254,8 @@ impl Libp2pEventHandler {
|
|||||||
debug!(%peer_id, ?status, "Received Status request");
|
debug!(%peer_id, ?status, "Received Status request");
|
||||||
|
|
||||||
let network_id = self.network_globals.network_id();
|
let network_id = self.network_globals.network_id();
|
||||||
let shard_config = self.store.get_store().get_shard_config();
|
|
||||||
let status_message = StatusMessage {
|
let status_message = StatusMessage {
|
||||||
data: network_id.clone(),
|
data: network_id.clone(),
|
||||||
num_shard: shard_config.num_shard,
|
|
||||||
shard_id: shard_config.shard_id,
|
|
||||||
};
|
};
|
||||||
debug!(%peer_id, ?status_message, "Sending Status response");
|
debug!(%peer_id, ?status_message, "Sending Status response");
|
||||||
|
|
||||||
@ -269,18 +264,12 @@ impl Libp2pEventHandler {
|
|||||||
id: request_id,
|
id: request_id,
|
||||||
response: Response::Status(status_message),
|
response: Response::Status(status_message),
|
||||||
});
|
});
|
||||||
|
self.on_status_message(peer_id, status, network_id);
|
||||||
if self.verify_status_message(peer_id, status, network_id, &shard_config) {
|
|
||||||
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) {
|
fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) {
|
||||||
let network_id = self.network_globals.network_id();
|
let network_id = self.network_globals.network_id();
|
||||||
let shard_config = self.store.get_store().get_shard_config();
|
self.on_status_message(peer_id, status, network_id);
|
||||||
if self.verify_status_message(peer_id, status, network_id, &shard_config) {
|
|
||||||
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn on_rpc_response(
|
pub async fn on_rpc_response(
|
||||||
@ -961,54 +950,21 @@ impl Libp2pEventHandler {
|
|||||||
MessageAcceptance::Accept
|
MessageAcceptance::Accept
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_status_message(
|
fn on_status_message(
|
||||||
&self,
|
&self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
status: StatusMessage,
|
status: StatusMessage,
|
||||||
network_id: NetworkIdentity,
|
network_id: NetworkIdentity,
|
||||||
shard_config: &ShardConfig,
|
) {
|
||||||
) -> bool {
|
|
||||||
if status.data != network_id {
|
if status.data != network_id {
|
||||||
warn!(%peer_id, ?network_id, ?status.data, "Report peer with incompatible network id");
|
warn!(%peer_id, ?network_id, ?status.data, "Report peer with incompatible network id");
|
||||||
self.send_to_network(NetworkMessage::ReportPeer {
|
self.send_to_network(NetworkMessage::ReportPeer {
|
||||||
peer_id,
|
peer_id,
|
||||||
action: PeerAction::Fatal,
|
action: PeerAction::Fatal,
|
||||||
source: ReportSource::RPC,
|
source: ReportSource::Gossipsub,
|
||||||
msg: "Incompatible network id in StatusMessage",
|
msg: "Incompatible network id in StatusMessage",
|
||||||
});
|
})
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let peer_shard_config = match ShardConfig::new(status.shard_id, status.num_shard) {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => {
|
|
||||||
warn!(%peer_id, ?status, ?err, "Report peer with invalid shard config");
|
|
||||||
self.send_to_network(NetworkMessage::ReportPeer {
|
|
||||||
peer_id,
|
|
||||||
action: PeerAction::Fatal,
|
|
||||||
source: ReportSource::RPC,
|
|
||||||
msg: "Invalid shard config in StatusMessage",
|
|
||||||
});
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.file_location_cache
|
|
||||||
.insert_peer_config(peer_id, peer_shard_config);
|
|
||||||
|
|
||||||
if !peer_shard_config.intersect(shard_config) {
|
|
||||||
info!(%peer_id, ?shard_config, ?status, "Report peer with mismatched shard config");
|
|
||||||
self.send_to_network(NetworkMessage::ReportPeer {
|
|
||||||
peer_id,
|
|
||||||
action: PeerAction::LowToleranceError,
|
|
||||||
source: ReportSource::RPC,
|
|
||||||
msg: "Shard config mismatch in StatusMessage",
|
|
||||||
});
|
|
||||||
self.send_to_network(NetworkMessage::DisconnectPeer { peer_id });
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
|
||||||
@ -1228,7 +1184,10 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(handler.peers.read().await.size(), 1);
|
assert_eq!(handler.peers.read().await.size(), 1);
|
||||||
ctx.assert_status_request(alice);
|
ctx.assert_status_request(alice);
|
||||||
assert!(matches!(ctx.sync_recv.try_recv(), Err(TryRecvError::Empty)));
|
assert!(matches!(
|
||||||
|
ctx.sync_recv.try_recv(),
|
||||||
|
Ok(Notification(SyncMessage::PeerConnected {peer_id})) if peer_id == alice
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@ -1257,8 +1216,6 @@ mod tests {
|
|||||||
let req_id = (ConnectionId::new(4), SubstreamId(12));
|
let req_id = (ConnectionId::new(4), SubstreamId(12));
|
||||||
let request = Request::Status(StatusMessage {
|
let request = Request::Status(StatusMessage {
|
||||||
data: Default::default(),
|
data: Default::default(),
|
||||||
num_shard: 1,
|
|
||||||
shard_id: 0,
|
|
||||||
});
|
});
|
||||||
handler.on_rpc_request(alice, req_id, request).await;
|
handler.on_rpc_request(alice, req_id, request).await;
|
||||||
|
|
||||||
|
@ -11,14 +11,16 @@ lazy_static::lazy_static! {
|
|||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "all");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "already");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "ok");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dial_peer", "fail");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
|
||||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
||||||
|
|
||||||
pub static ref SERVICE_EXPIRED_PEERS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024);
|
pub static ref SERVICE_EXPIRED_PEERS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024);
|
||||||
|
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_OK: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_ok", 1024);
|
||||||
|
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024);
|
||||||
|
|
||||||
// libp2p_event_handler
|
// libp2p_event_handler
|
||||||
|
|
||||||
|
@ -5,8 +5,6 @@ use chunk_pool::ChunkPoolMessage;
|
|||||||
use file_location_cache::FileLocationCache;
|
use file_location_cache::FileLocationCache;
|
||||||
use futures::{channel::mpsc::Sender, prelude::*};
|
use futures::{channel::mpsc::Sender, prelude::*};
|
||||||
use miner::MinerMessage;
|
use miner::MinerMessage;
|
||||||
use network::rpc::GoodbyeReason;
|
|
||||||
use network::PeerId;
|
|
||||||
use network::{
|
use network::{
|
||||||
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
|
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
|
||||||
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
|
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
|
||||||
@ -311,30 +309,27 @@ impl RouterService {
|
|||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
|
||||||
}
|
}
|
||||||
NetworkMessage::DialPeer { address, peer_id } => {
|
NetworkMessage::DialPeer { address, peer_id } => {
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1);
|
||||||
|
|
||||||
if self.libp2p.swarm.is_connected(&peer_id) {
|
if self.libp2p.swarm.is_connected(&peer_id) {
|
||||||
self.libp2p_event_handler
|
self.libp2p_event_handler
|
||||||
.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
.send_to_sync(SyncMessage::PeerConnected { peer_id });
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_ALREADY.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1);
|
||||||
} else {
|
} else {
|
||||||
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
|
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
debug!(%address, "Dialing libp2p peer");
|
debug!(%address, "Dialing libp2p peer");
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_OK.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1);
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
info!(%address, error = ?err, "Failed to dial peer");
|
info!(%address, error = ?err, "Failed to dial peer");
|
||||||
self.libp2p_event_handler
|
self.libp2p_event_handler
|
||||||
.send_to_sync(SyncMessage::DialFailed { peer_id, err });
|
.send_to_sync(SyncMessage::DailFailed { peer_id, err });
|
||||||
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DIAL_PEER_NEW_FAIL.mark(1);
|
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
NetworkMessage::DisconnectPeer { peer_id } => {
|
|
||||||
self.disconnect_peer(peer_id);
|
|
||||||
}
|
|
||||||
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
NetworkMessage::AnnounceLocalFile { tx_id } => {
|
||||||
let shard_config = self.store.get_shard_config();
|
let shard_config = self.store.get_shard_config();
|
||||||
let msg = PubsubMessage::NewFile(NewFile {
|
let msg = PubsubMessage::NewFile(NewFile {
|
||||||
@ -404,16 +399,24 @@ impl RouterService {
|
|||||||
debug!(%num_expired_peers, "Heartbeat, remove expired peers")
|
debug!(%num_expired_peers, "Heartbeat, remove expired peers")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut num_succeeded = 0;
|
||||||
|
let mut num_failed = 0;
|
||||||
for peer_id in expired_peers {
|
for peer_id in expired_peers {
|
||||||
self.disconnect_peer(peer_id);
|
// async operation, once peer disconnected, swarm event `PeerDisconnected`
|
||||||
}
|
// will be polled to handle in advance.
|
||||||
}
|
match self.libp2p.swarm.disconnect_peer_id(peer_id) {
|
||||||
|
Ok(_) => {
|
||||||
fn disconnect_peer(&mut self, peer_id: PeerId) {
|
debug!(%peer_id, "Peer expired and disconnect it");
|
||||||
let pm = self.libp2p.swarm.behaviour_mut().peer_manager_mut();
|
num_succeeded += 1;
|
||||||
if pm.is_connected(&peer_id) {
|
}
|
||||||
pm.disconnect_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
|
Err(_) => {
|
||||||
|
debug!(%peer_id, "Peer expired but failed to disconnect");
|
||||||
|
num_failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_OK.update(num_succeeded);
|
||||||
|
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL.update(num_failed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,21 +134,11 @@ impl ClientBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Starts the networking stack.
|
/// Starts the networking stack.
|
||||||
pub async fn with_network(mut self, mut config: NetworkConfig) -> Result<Self, String> {
|
pub async fn with_network(mut self, config: &NetworkConfig) -> Result<Self, String> {
|
||||||
let executor = require!("network", self, runtime_context).clone().executor;
|
let executor = require!("network", self, runtime_context).clone().executor;
|
||||||
let store = require!("network", self, store).clone();
|
|
||||||
let file_location_cache = require!("network", self, file_location_cache).clone();
|
|
||||||
|
|
||||||
// only dial to peers that shard config matched
|
|
||||||
config.peer_manager.filters.dial_peer_filter = Some(Arc::new(move |peer_id| {
|
|
||||||
match file_location_cache.get_peer_config(peer_id) {
|
|
||||||
Some(v) => store.get_shard_config().intersect(&v),
|
|
||||||
None => true,
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
// construct the libp2p service context
|
// construct the libp2p service context
|
||||||
let service_context = network::Context { config: &config };
|
let service_context = network::Context { config };
|
||||||
|
|
||||||
// construct communication channel
|
// construct communication channel
|
||||||
let (send, recv) = new_network_channel();
|
let (send, recv) = new_network_channel();
|
||||||
|
@ -39,17 +39,18 @@ impl ZgsConfig {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
||||||
.as_u64();
|
.as_u64();
|
||||||
|
let network_protocol_version = if self.sync.neighbors_only {
|
||||||
|
network::PROTOCOL_VERSION_V2
|
||||||
|
} else {
|
||||||
|
network::PROTOCOL_VERSION_V1
|
||||||
|
};
|
||||||
let local_network_id = NetworkIdentity {
|
let local_network_id = NetworkIdentity {
|
||||||
chain_id,
|
chain_id,
|
||||||
flow_address,
|
flow_address,
|
||||||
p2p_protocol_version: ProtocolVersion {
|
p2p_protocol_version: ProtocolVersion {
|
||||||
major: network::PROTOCOL_VERSION_V3[0],
|
major: network_protocol_version[0],
|
||||||
minor: network::PROTOCOL_VERSION_V3[1],
|
minor: network_protocol_version[1],
|
||||||
build: if self.sync.neighbors_only {
|
build: network_protocol_version[2],
|
||||||
network::PROTOCOL_VERSION_V3[2] + 1
|
|
||||||
} else {
|
|
||||||
network::PROTOCOL_VERSION_V3[2]
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
network_config.network_id = local_network_id.clone();
|
network_config.network_id = local_network_id.clone();
|
||||||
@ -109,7 +110,7 @@ impl ZgsConfig {
|
|||||||
network_config.private = self.network_private;
|
network_config.private = self.network_private;
|
||||||
|
|
||||||
network_config.peer_db = self.network_peer_db;
|
network_config.peer_db = self.network_peer_db;
|
||||||
network_config.peer_manager = self.network_peer_manager.clone();
|
network_config.peer_manager = self.network_peer_manager;
|
||||||
network_config.disable_enr_network_id = self.discv5_disable_enr_network_id;
|
network_config.disable_enr_network_id = self.discv5_disable_enr_network_id;
|
||||||
|
|
||||||
Ok(network_config)
|
Ok(network_config)
|
||||||
|
@ -26,7 +26,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
|||||||
.with_log_sync(log_sync_config)
|
.with_log_sync(log_sync_config)
|
||||||
.await?
|
.await?
|
||||||
.with_file_location_cache(config.file_location_cache)
|
.with_file_location_cache(config.file_location_cache)
|
||||||
.with_network(network_config)
|
.with_network(&network_config)
|
||||||
.await?
|
.await?
|
||||||
.with_chunk_pool(chunk_pool_config)
|
.with_chunk_pool(chunk_pool_config)
|
||||||
.await?
|
.await?
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
|
use backtrace::Backtrace;
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
||||||
};
|
};
|
||||||
@ -139,6 +140,9 @@ impl Store {
|
|||||||
{
|
{
|
||||||
let store = self.store.clone();
|
let store = self.store.clone();
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let mut backtrace = Backtrace::new();
|
||||||
|
let frames = backtrace.frames().to_vec();
|
||||||
|
backtrace = frames.into();
|
||||||
|
|
||||||
self.executor.spawn_blocking(
|
self.executor.spawn_blocking(
|
||||||
move || {
|
move || {
|
||||||
@ -146,6 +150,7 @@ impl Store {
|
|||||||
let res = f(&*store);
|
let res = f(&*store);
|
||||||
|
|
||||||
if tx.send(res).is_err() {
|
if tx.send(res).is_err() {
|
||||||
|
warn!("Backtrace: {:?}", backtrace);
|
||||||
error!("Unable to complete async storage operation: the receiver dropped");
|
error!("Unable to complete async storage operation: the receiver dropped");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -194,7 +194,7 @@ impl SyncPeers {
|
|||||||
ctx.report_peer(
|
ctx.report_peer(
|
||||||
*peer_id,
|
*peer_id,
|
||||||
PeerAction::LowToleranceError,
|
PeerAction::LowToleranceError,
|
||||||
"Dial timeout",
|
"Dail timeout",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ impl SerialSyncController {
|
|||||||
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
|
/// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover
|
||||||
/// data in all shards.
|
/// data in all shards.
|
||||||
fn try_connect(&mut self) {
|
fn try_connect(&mut self) {
|
||||||
let mut num_peers_dialed = 0;
|
let mut num_peers_dailed = 0;
|
||||||
|
|
||||||
// select a random peer
|
// select a random peer
|
||||||
while !self
|
while !self
|
||||||
@ -256,10 +256,10 @@ impl SerialSyncController {
|
|||||||
self.peers
|
self.peers
|
||||||
.update_state(&peer_id, PeerState::Found, PeerState::Connecting);
|
.update_state(&peer_id, PeerState::Found, PeerState::Connecting);
|
||||||
|
|
||||||
num_peers_dialed += 1;
|
num_peers_dailed += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(%self.tx_seq, %num_peers_dialed, "Connecting peers");
|
info!(%self.tx_seq, %num_peers_dailed, "Connecting peers");
|
||||||
|
|
||||||
self.state = SyncState::ConnectingPeers {
|
self.state = SyncState::ConnectingPeers {
|
||||||
origin: self.since,
|
origin: self.since,
|
||||||
@ -358,14 +358,14 @@ impl SerialSyncController {
|
|||||||
.update_state_force(&peer_id, PeerState::Connected);
|
.update_state_force(&peer_id, PeerState::Connected);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn on_dial_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
|
||||||
match err {
|
match err {
|
||||||
DialError::ConnectionLimit(_) => {
|
DialError::ConnectionLimit(_) => {
|
||||||
if let Some(true) =
|
if let Some(true) =
|
||||||
self.peers
|
self.peers
|
||||||
.update_state(&peer_id, PeerState::Connecting, PeerState::Found)
|
.update_state(&peer_id, PeerState::Connecting, PeerState::Found)
|
||||||
{
|
{
|
||||||
info!(%self.tx_seq, %peer_id, "Failed to dial peer due to outgoing connection limitation");
|
info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation");
|
||||||
self.state = SyncState::AwaitingOutgoingConnection {
|
self.state = SyncState::AwaitingOutgoingConnection {
|
||||||
since: Instant::now().into(),
|
since: Instant::now().into(),
|
||||||
};
|
};
|
||||||
@ -377,7 +377,7 @@ impl SerialSyncController {
|
|||||||
PeerState::Connecting,
|
PeerState::Connecting,
|
||||||
PeerState::Disconnected,
|
PeerState::Disconnected,
|
||||||
) {
|
) {
|
||||||
info!(%self.tx_seq, %peer_id, %err, "Failed to dial peer");
|
info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer");
|
||||||
self.state = SyncState::Idle;
|
self.state = SyncState::Idle;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum SyncMessage {
|
pub enum SyncMessage {
|
||||||
DialFailed {
|
DailFailed {
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
err: DialError,
|
err: DialError,
|
||||||
},
|
},
|
||||||
@ -227,8 +227,8 @@ impl SyncService {
|
|||||||
trace!("Sync received message {:?}", msg);
|
trace!("Sync received message {:?}", msg);
|
||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
SyncMessage::DialFailed { peer_id, err } => {
|
SyncMessage::DailFailed { peer_id, err } => {
|
||||||
self.on_dial_failed(peer_id, err);
|
self.on_dail_failed(peer_id, err);
|
||||||
}
|
}
|
||||||
SyncMessage::PeerConnected { peer_id } => {
|
SyncMessage::PeerConnected { peer_id } => {
|
||||||
self.on_peer_connected(peer_id);
|
self.on_peer_connected(peer_id);
|
||||||
@ -369,11 +369,11 @@ impl SyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_dial_failed(&mut self, peer_id: PeerId, err: DialError) {
|
fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) {
|
||||||
info!(%peer_id, ?err, "Dial to peer failed");
|
info!(%peer_id, ?err, "Dail to peer failed");
|
||||||
|
|
||||||
for controller in self.controllers.values_mut() {
|
for controller in self.controllers.values_mut() {
|
||||||
controller.on_dial_failed(peer_id, &err);
|
controller.on_dail_failed(peer_id, &err);
|
||||||
controller.transition();
|
controller.transition();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -245,7 +245,7 @@ neighbors_only = true
|
|||||||
# Maximum number of continous failures to terminate a file sync.
|
# Maximum number of continous failures to terminate a file sync.
|
||||||
# max_request_failures = 5
|
# max_request_failures = 5
|
||||||
|
|
||||||
# Timeout to dial peers.
|
# Timeout to dail peers.
|
||||||
# peer_connect_timeout = "15s"
|
# peer_connect_timeout = "15s"
|
||||||
|
|
||||||
# Timeout to disconnect peers.
|
# Timeout to disconnect peers.
|
||||||
|
@ -257,7 +257,7 @@ neighbors_only = true
|
|||||||
# Maximum number of continous failures to terminate a file sync.
|
# Maximum number of continous failures to terminate a file sync.
|
||||||
# max_request_failures = 5
|
# max_request_failures = 5
|
||||||
|
|
||||||
# Timeout to dial peers.
|
# Timeout to dail peers.
|
||||||
# peer_connect_timeout = "15s"
|
# peer_connect_timeout = "15s"
|
||||||
|
|
||||||
# Timeout to disconnect peers.
|
# Timeout to disconnect peers.
|
||||||
|
@ -259,7 +259,7 @@ neighbors_only = true
|
|||||||
# Maximum number of continous failures to terminate a file sync.
|
# Maximum number of continous failures to terminate a file sync.
|
||||||
# max_request_failures = 5
|
# max_request_failures = 5
|
||||||
|
|
||||||
# Timeout to dial peers.
|
# Timeout to dail peers.
|
||||||
# peer_connect_timeout = "15s"
|
# peer_connect_timeout = "15s"
|
||||||
|
|
||||||
# Timeout to disconnect peers.
|
# Timeout to disconnect peers.
|
||||||
|
@ -1,76 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import os
|
|
||||||
import time
|
|
||||||
|
|
||||||
from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
|
|
||||||
from test_framework.test_framework import TestFramework
|
|
||||||
from utility.utils import p2p_port
|
|
||||||
|
|
||||||
class NetworkTcpShardTest(TestFramework):
|
|
||||||
"""
|
|
||||||
This is to test TCP connection for shard config mismatched peers of UDP discovery.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def setup_params(self):
|
|
||||||
# 1 bootnode and 2 community nodes
|
|
||||||
self.num_nodes = 3
|
|
||||||
|
|
||||||
# setup for node 0 as bootnode
|
|
||||||
self.zgs_node_key_files = [ZGS_KEY_FILE]
|
|
||||||
bootnode_port = p2p_port(0)
|
|
||||||
self.zgs_node_configs[0] = {
|
|
||||||
# enable UDP discovery relevant configs
|
|
||||||
"network_enr_address": "127.0.0.1",
|
|
||||||
"network_enr_tcp_port": bootnode_port,
|
|
||||||
"network_enr_udp_port": bootnode_port,
|
|
||||||
|
|
||||||
# disable trusted nodes
|
|
||||||
"network_libp2p_nodes": [],
|
|
||||||
|
|
||||||
# custom shard config
|
|
||||||
"shard_position": "0/4"
|
|
||||||
}
|
|
||||||
|
|
||||||
# setup node 1 & 2 as community nodes
|
|
||||||
bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"]
|
|
||||||
for i in range(1, self.num_nodes):
|
|
||||||
self.zgs_node_configs[i] = {
|
|
||||||
# enable UDP discovery relevant configs
|
|
||||||
"network_enr_address": "127.0.0.1",
|
|
||||||
"network_enr_tcp_port": p2p_port(i),
|
|
||||||
"network_enr_udp_port": p2p_port(i),
|
|
||||||
|
|
||||||
# disable trusted nodes and enable bootnodes
|
|
||||||
"network_libp2p_nodes": [],
|
|
||||||
"network_boot_nodes": bootnodes,
|
|
||||||
|
|
||||||
# custom shard config
|
|
||||||
"shard_position": f"{i}/4"
|
|
||||||
}
|
|
||||||
|
|
||||||
def run_test(self):
|
|
||||||
timeout_secs = 10
|
|
||||||
|
|
||||||
for iter in range(timeout_secs):
|
|
||||||
time.sleep(1)
|
|
||||||
self.log.info("==================================== iter %s", iter)
|
|
||||||
|
|
||||||
for i in range(self.num_nodes):
|
|
||||||
info = self.nodes[i].rpc.admin_getNetworkInfo()
|
|
||||||
self.log.info(
|
|
||||||
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
|
|
||||||
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if i == timeout_secs - 1:
|
|
||||||
assert info["totalPeers"] == self.num_nodes - 1
|
|
||||||
assert info["bannedPeers"] == 0
|
|
||||||
assert info["disconnectedPeers"] == self.num_nodes - 1
|
|
||||||
assert info["connectedPeers"] == 0
|
|
||||||
|
|
||||||
self.log.info("====================================")
|
|
||||||
self.log.info("All nodes discovered but not connected for each other")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
NetworkTcpShardTest().main()
|
|
Loading…
Reference in New Issue
Block a user