mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-23 13:36:08 +00:00
Check network id in status and ban incompatible peers. (#159)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Check network id in status and ban incompatible peers. * Revert debug changes. * Request chain id from the blockchain server.
This commit is contained in:
parent
03286ebd78
commit
12d0c6b675
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -8552,6 +8552,7 @@ dependencies = [
|
||||
"duration-str",
|
||||
"error-chain",
|
||||
"ethereum-types 0.14.1",
|
||||
"ethers",
|
||||
"exit-future",
|
||||
"file_location_cache",
|
||||
"futures",
|
||||
|
@ -37,6 +37,7 @@ serde = { version = "1.0.137", features = ["derive"] }
|
||||
duration-str = "0.5.1"
|
||||
config = "0.13.1"
|
||||
public-ip = "0.2"
|
||||
ethers = "2.0.14"
|
||||
|
||||
[dependencies.libp2p]
|
||||
version = "0.45.1"
|
||||
|
@ -11,6 +11,7 @@ use libp2p::gossipsub::{
|
||||
use libp2p::Multiaddr;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
use shared_types::NetworkIdentity;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -122,6 +123,9 @@ pub struct Config {
|
||||
|
||||
/// Whether metrics are enabled.
|
||||
pub metrics_enabled: bool,
|
||||
|
||||
/// The id of the storage network.
|
||||
pub network_id: NetworkIdentity,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@ -199,6 +203,7 @@ impl Default for Config {
|
||||
shutdown_after_sync: false,
|
||||
topics: Vec::new(),
|
||||
metrics_enabled: false,
|
||||
network_id: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -391,7 +391,9 @@ mod tests {
|
||||
use std::io::Write;
|
||||
|
||||
fn status_message() -> StatusMessage {
|
||||
StatusMessage { data: 1 }
|
||||
StatusMessage {
|
||||
data: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn ping_message() -> Ping {
|
||||
@ -560,7 +562,10 @@ mod tests {
|
||||
assert_eq!(stream_identifier.len(), 10);
|
||||
|
||||
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
|
||||
let status_message_bytes = StatusMessage { data: 1 }.as_ssz_bytes();
|
||||
let status_message_bytes = StatusMessage {
|
||||
data: Default::default(),
|
||||
}
|
||||
.as_ssz_bytes();
|
||||
|
||||
let mut uvi_codec: Uvi<usize> = Uvi::default();
|
||||
let mut dst = BytesMut::with_capacity(1024);
|
||||
|
@ -9,7 +9,7 @@ use ssz_types::{
|
||||
use std::ops::Deref;
|
||||
use strum::IntoStaticStr;
|
||||
pub type Hash256 = ethereum_types::H256;
|
||||
use shared_types::{ChunkArrayWithProof, TxID};
|
||||
use shared_types::{ChunkArrayWithProof, NetworkIdentity, TxID};
|
||||
|
||||
pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector};
|
||||
|
||||
@ -71,7 +71,7 @@ impl ToString for ErrorType {
|
||||
/// The STATUS request/response handshake message.
|
||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct StatusMessage {
|
||||
pub data: u64,
|
||||
pub data: NetworkIdentity,
|
||||
}
|
||||
|
||||
/// The PING request/response message.
|
||||
|
@ -84,6 +84,7 @@ impl<AppReqId: ReqId> Service<AppReqId> {
|
||||
.iter()
|
||||
.map(|x| PeerId::from(x.clone()))
|
||||
.collect(),
|
||||
config.network_id.clone(),
|
||||
));
|
||||
|
||||
// try and construct UPnP port mappings if required.
|
||||
|
@ -4,6 +4,7 @@ use crate::Client;
|
||||
use crate::EnrExt;
|
||||
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
|
||||
use parking_lot::RwLock;
|
||||
use shared_types::NetworkIdentity;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
|
||||
@ -22,10 +23,19 @@ pub struct NetworkGlobals {
|
||||
pub peers: RwLock<PeerDB>,
|
||||
/// The current gossipsub topic subscriptions.
|
||||
pub gossipsub_subscriptions: RwLock<HashSet<GossipTopic>>,
|
||||
|
||||
/// The id of the storage network.
|
||||
pub network_id: RwLock<NetworkIdentity>,
|
||||
}
|
||||
|
||||
impl NetworkGlobals {
|
||||
pub fn new(enr: Enr, tcp_port: u16, udp_port: u16, trusted_peers: Vec<PeerId>) -> Self {
|
||||
pub fn new(
|
||||
enr: Enr,
|
||||
tcp_port: u16,
|
||||
udp_port: u16,
|
||||
trusted_peers: Vec<PeerId>,
|
||||
network_id: NetworkIdentity,
|
||||
) -> Self {
|
||||
NetworkGlobals {
|
||||
local_enr: RwLock::new(enr.clone()),
|
||||
peer_id: RwLock::new(enr.peer_id()),
|
||||
@ -34,6 +44,7 @@ impl NetworkGlobals {
|
||||
listen_port_udp: AtomicU16::new(udp_port),
|
||||
peers: RwLock::new(PeerDB::new(trusted_peers)),
|
||||
gossipsub_subscriptions: RwLock::new(HashSet::new()),
|
||||
network_id: RwLock::new(network_id),
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,6 +74,10 @@ impl NetworkGlobals {
|
||||
self.listen_port_udp.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn network_id(&self) -> NetworkIdentity {
|
||||
self.network_id.read().clone()
|
||||
}
|
||||
|
||||
/// Returns the number of libp2p connected peers.
|
||||
pub fn connected_peers(&self) -> usize {
|
||||
self.peers.read().connected_peer_ids().count()
|
||||
@ -95,6 +110,6 @@ impl NetworkGlobals {
|
||||
let enr_key: discv5::enr::CombinedKey =
|
||||
discv5::enr::CombinedKey::from_libp2p(&keypair).unwrap();
|
||||
let enr = discv5::enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
|
||||
NetworkGlobals::new(enr, 9000, 9000, vec![])
|
||||
NetworkGlobals::new(enr, 9000, 9000, vec![], Default::default())
|
||||
}
|
||||
}
|
||||
|
@ -23,10 +23,14 @@ fn test_status_rpc() {
|
||||
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt)).await;
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_request = Request::Status(StatusMessage { data: 2 });
|
||||
let rpc_request = Request::Status(StatusMessage {
|
||||
data: Default::default(),
|
||||
});
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_response = Response::Status(StatusMessage { data: 3 });
|
||||
let rpc_response = Response::Status(StatusMessage {
|
||||
data: Default::default(),
|
||||
});
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
|
@ -5,7 +5,6 @@ use chunk_pool::ChunkPoolMessage;
|
||||
use file_location_cache::FileLocationCache;
|
||||
use network::multiaddr::Protocol;
|
||||
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
|
||||
use network::Multiaddr;
|
||||
use network::{
|
||||
rpc::StatusMessage,
|
||||
types::{
|
||||
@ -15,7 +14,8 @@ use network::{
|
||||
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
|
||||
PublicKey, PubsubMessage, Request, RequestId, Response,
|
||||
};
|
||||
use shared_types::{bytes_to_chunks, timestamp_now, TxID};
|
||||
use network::{Multiaddr, PeerAction, ReportSource};
|
||||
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
|
||||
use storage::config::ShardConfig;
|
||||
use storage_async::Store;
|
||||
use sync::{SyncMessage, SyncSender};
|
||||
@ -139,7 +139,9 @@ impl Libp2pEventHandler {
|
||||
}
|
||||
|
||||
pub fn send_status(&self, peer_id: PeerId) {
|
||||
let status_message = StatusMessage { data: 123 }; // dummy status message
|
||||
let status_message = StatusMessage {
|
||||
data: self.network_globals.network_id(),
|
||||
}; // dummy status message
|
||||
debug!(%peer_id, ?status_message, "Sending Status request");
|
||||
|
||||
self.send_to_network(NetworkMessage::SendRequest {
|
||||
@ -191,7 +193,10 @@ impl Libp2pEventHandler {
|
||||
fn on_status_request(&self, peer_id: PeerId, request_id: PeerRequestId, status: StatusMessage) {
|
||||
debug!(%peer_id, ?status, "Received Status request");
|
||||
|
||||
let status_message = StatusMessage { data: 456 }; // dummy status message
|
||||
let network_id = self.network_globals.network_id();
|
||||
let status_message = StatusMessage {
|
||||
data: network_id.clone(),
|
||||
}; // dummy status message
|
||||
debug!(%peer_id, ?status_message, "Sending Status response");
|
||||
|
||||
self.send_to_network(NetworkMessage::SendResponse {
|
||||
@ -199,6 +204,12 @@ impl Libp2pEventHandler {
|
||||
id: request_id,
|
||||
response: Response::Status(status_message),
|
||||
});
|
||||
self.on_status_message(peer_id, status, network_id);
|
||||
}
|
||||
|
||||
fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) {
|
||||
let network_id = self.network_globals.network_id();
|
||||
self.on_status_message(peer_id, status, network_id);
|
||||
}
|
||||
|
||||
pub async fn on_rpc_response(
|
||||
@ -212,6 +223,7 @@ impl Libp2pEventHandler {
|
||||
match response {
|
||||
Response::Status(status_message) => {
|
||||
debug!(%peer_id, ?status_message, "Received Status response");
|
||||
self.on_status_response(peer_id, status_message);
|
||||
}
|
||||
Response::Chunks(response) => {
|
||||
let request_id = match request_id {
|
||||
@ -671,6 +683,23 @@ impl Libp2pEventHandler {
|
||||
|
||||
MessageAcceptance::Accept
|
||||
}
|
||||
|
||||
fn on_status_message(
|
||||
&self,
|
||||
peer_id: PeerId,
|
||||
status: StatusMessage,
|
||||
network_id: NetworkIdentity,
|
||||
) {
|
||||
if status.data != network_id {
|
||||
warn!(%peer_id, ?network_id, ?status.data, "Report peer with incompatible network id");
|
||||
self.send_to_network(NetworkMessage::ReportPeer {
|
||||
peer_id,
|
||||
action: PeerAction::Fatal,
|
||||
source: ReportSource::Gossipsub,
|
||||
msg: "Incompatible network id in StatusMessage",
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -762,7 +791,8 @@ mod tests {
|
||||
let keypair = Keypair::generate_secp256k1();
|
||||
let enr_key = CombinedKey::from_libp2p(&keypair).unwrap();
|
||||
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
|
||||
let network_globals = NetworkGlobals::new(enr, 30000, 30000, vec![]);
|
||||
let network_globals =
|
||||
NetworkGlobals::new(enr, 30000, 30000, vec![], Default::default());
|
||||
|
||||
let listen_addr: Multiaddr = "/ip4/127.0.0.1/tcp/30000".parse().unwrap();
|
||||
network_globals.listen_multiaddrs.write().push(listen_addr);
|
||||
@ -876,7 +906,9 @@ mod tests {
|
||||
|
||||
let alice = PeerId::random();
|
||||
let req_id = (ConnectionId::new(4), SubstreamId(12));
|
||||
let request = Request::Status(StatusMessage { data: 412 });
|
||||
let request = Request::Status(StatusMessage {
|
||||
data: Default::default(),
|
||||
});
|
||||
handler.on_rpc_request(alice, req_id, request).await;
|
||||
|
||||
match ctx.network_recv.try_recv() {
|
||||
|
@ -4,7 +4,7 @@ use anyhow::{anyhow, bail, Error};
|
||||
use append_merkle::{
|
||||
AppendMerkleTree, Proof as RawProof, RangeProof as RawRangeProof, Sha3Algorithm,
|
||||
};
|
||||
use ethereum_types::{H256, U256};
|
||||
use ethereum_types::{Address, H256, U256};
|
||||
use merkle_light::merkle::MerkleTree;
|
||||
use merkle_light::proof::Proof as RawFileProof;
|
||||
use merkle_light::{hash::Algorithm, merkle::next_pow2};
|
||||
@ -366,3 +366,14 @@ impl TryFrom<FileProof> for FlowProof {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
DeriveEncode, DeriveDecode, Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize,
|
||||
)]
|
||||
pub struct NetworkIdentity {
|
||||
/// The chain id of the blockchain network.
|
||||
pub chain_id: u64,
|
||||
|
||||
/// The address of the deployed Flow contract on the blockchain.
|
||||
pub flow_address: Address,
|
||||
}
|
||||
|
@ -2,11 +2,13 @@
|
||||
|
||||
use crate::ZgsConfig;
|
||||
use ethereum_types::{H256, U256};
|
||||
use ethers::prelude::{Http, Middleware, Provider};
|
||||
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
|
||||
use miner::MinerConfig;
|
||||
use network::NetworkConfig;
|
||||
use pruner::PrunerConfig;
|
||||
use rpc::RPCConfig;
|
||||
use shared_types::NetworkIdentity;
|
||||
use std::net::IpAddr;
|
||||
use std::time::Duration;
|
||||
use storage::config::ShardConfig;
|
||||
@ -25,6 +27,21 @@ impl ZgsConfig {
|
||||
network_config.libp2p_port = self.network_libp2p_port;
|
||||
network_config.disable_discovery = self.network_disable_discovery;
|
||||
network_config.discovery_port = self.network_discovery_port;
|
||||
let flow_address = self
|
||||
.log_contract_address
|
||||
.parse::<ContractAddress>()
|
||||
.map_err(|e| format!("Unable to parse log_contract_address: {:?}", e))?;
|
||||
let provider = Provider::<Http>::try_from(&self.blockchain_rpc_endpoint)
|
||||
.map_err(|e| format!("Can not parse blockchain endpoint: {:?}", e))?;
|
||||
let chain_id = provider
|
||||
.get_chainid()
|
||||
.await
|
||||
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
||||
.as_u64();
|
||||
network_config.network_id = NetworkIdentity {
|
||||
chain_id,
|
||||
flow_address,
|
||||
};
|
||||
|
||||
if !self.network_disable_discovery {
|
||||
network_config.enr_tcp_port = Some(self.network_enr_tcp_port);
|
||||
|
@ -1,6 +1,6 @@
|
||||
use super::*;
|
||||
use core::num::NonZeroUsize;
|
||||
use ethereum_types::{H256, U128, U256};
|
||||
use ethereum_types::{H160, H256, U128, U256};
|
||||
use smallvec::SmallVec;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -277,6 +277,27 @@ impl Decode for H256 {
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for H160 {
|
||||
fn is_ssz_fixed_len() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn ssz_fixed_len() -> usize {
|
||||
20
|
||||
}
|
||||
|
||||
fn from_ssz_bytes(bytes: &[u8]) -> Result<Self, DecodeError> {
|
||||
let len = bytes.len();
|
||||
let expected = <Self as Decode>::ssz_fixed_len();
|
||||
|
||||
if len != expected {
|
||||
Err(DecodeError::InvalidByteLength { len, expected })
|
||||
} else {
|
||||
Ok(H160::from_slice(bytes))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for U256 {
|
||||
fn is_ssz_fixed_len() -> bool {
|
||||
true
|
||||
|
@ -1,6 +1,6 @@
|
||||
use super::*;
|
||||
use core::num::NonZeroUsize;
|
||||
use ethereum_types::{H256, U128, U256};
|
||||
use ethereum_types::{H160, H256, U128, U256};
|
||||
use smallvec::SmallVec;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -323,6 +323,24 @@ impl Encode for H256 {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for H160 {
|
||||
fn is_ssz_fixed_len() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn ssz_fixed_len() -> usize {
|
||||
20
|
||||
}
|
||||
|
||||
fn ssz_bytes_len(&self) -> usize {
|
||||
20
|
||||
}
|
||||
|
||||
fn ssz_append(&self, buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(self.as_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for U256 {
|
||||
fn is_ssz_fixed_len() -> bool {
|
||||
true
|
||||
|
Loading…
Reference in New Issue
Block a user