Filter out file announcement of private ip (#113)

* Filter out file announcement of private ip

* add log for listen address
This commit is contained in:
Bo QIU 2024-07-08 18:45:55 +08:00 committed by GitHub
parent 84f455d1d4
commit 68b5d27728
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 108 additions and 42 deletions

2
Cargo.lock generated
View File

@ -6136,6 +6136,7 @@ dependencies = [
"channel",
"chrono",
"chunk_pool",
"duration-str",
"error-chain",
"file_location_cache",
"futures",
@ -6144,6 +6145,7 @@ dependencies = [
"network",
"pruner",
"rand 0.8.5",
"serde",
"shared_types",
"storage",
"storage-async",

View File

@ -21,6 +21,8 @@ chunk_pool = { path = "../chunk_pool" }
tokio = { version = "1.19.2", features = ["full"] }
tracing = "0.1.35"
rand = "0.8.5"
serde = { version = "1.0.137", features = ["derive"] }
duration-str = "0.5.1"
[dev-dependencies]
channel = { path = "../../common/channel" }

View File

@ -5,28 +5,42 @@ mod libp2p_event_handler;
mod peer_manager;
mod service;
use duration_str::deserialize_duration;
use network::Multiaddr;
use serde::Deserialize;
use std::time::Duration;
pub use crate::service::RouterService;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct Config {
pub heartbeat_interval_secs: u64,
pub idle_time_secs: u64,
#[serde(deserialize_with = "deserialize_duration")]
pub heartbeat_interval: Duration,
#[serde(deserialize_with = "deserialize_duration")]
pub idle_time: Duration,
pub max_idle_incoming_peers: usize,
pub max_idle_outgoing_peers: usize,
pub libp2p_nodes: Vec<Multiaddr>,
pub private_ip_enabled: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
heartbeat_interval_secs: 5,
idle_time_secs: 180,
heartbeat_interval: Duration::from_secs(5),
idle_time: Duration::from_secs(180),
max_idle_incoming_peers: 12,
max_idle_outgoing_peers: 20,
libp2p_nodes: vec![],
private_ip_enabled: false,
}
}
}
impl Config {
pub fn with_private_ip_enabled(mut self, enabled: bool) -> Self {
self.private_ip_enabled = enabled;
self
}
}

View File

@ -2,7 +2,9 @@ use std::{ops::Neg, sync::Arc};
use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use network::multiaddr::Protocol;
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
use network::Multiaddr;
use network::{
rpc::StatusMessage,
types::{
@ -20,6 +22,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, RwLock};
use crate::peer_manager::PeerManager;
use crate::Config;
lazy_static::lazy_static! {
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(2);
@ -65,6 +68,7 @@ fn verify_signature(msg: &dyn HasSignature, peer_id: &PeerId, propagation_source
}
pub struct Libp2pEventHandler {
config: Config,
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals>,
/// A channel to the router service.
@ -86,6 +90,7 @@ pub struct Libp2pEventHandler {
impl Libp2pEventHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Config,
network_globals: Arc<NetworkGlobals>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
sync_send: SyncSender,
@ -96,6 +101,7 @@ impl Libp2pEventHandler {
peers: Arc<RwLock<PeerManager>>,
) -> Self {
Self {
config,
network_globals,
network_send,
sync_send,
@ -257,16 +263,47 @@ impl Libp2pEventHandler {
}
}
fn get_listen_addr(&self) -> Option<Multiaddr> {
let listen_addrs = self.network_globals.listen_multiaddrs.read();
if self.config.private_ip_enabled {
listen_addrs.first().cloned()
} else {
listen_addrs
.iter()
.find(|&x| Self::contains_public_ip(x))
.cloned()
}
}
fn contains_public_ip(addr: &Multiaddr) -> bool {
for c in addr.iter() {
match c {
Protocol::Ip4(ip4_addr) => {
return !ip4_addr.is_broadcast()
&& !ip4_addr.is_documentation()
&& !ip4_addr.is_link_local()
&& !ip4_addr.is_loopback()
&& !ip4_addr.is_multicast()
&& !ip4_addr.is_private()
&& !ip4_addr.is_unspecified()
}
Protocol::Ip6(ip6_addr) => {
return !ip6_addr.is_loopback()
&& !ip6_addr.is_multicast()
&& !ip6_addr.is_unspecified()
}
_ => {}
}
}
false
}
pub async fn construct_announce_file_message(&self, tx_id: TxID) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = match self.network_globals.listen_multiaddrs.read().first() {
Some(addr) => addr.clone(),
None => {
error!("No listen address available");
return None;
}
};
let addr = self.get_listen_addr()?;
let timestamp = timestamp_now();
let shard_config = self.store.get_store().flow().get_shard_config();
@ -298,13 +335,7 @@ impl Libp2pEventHandler {
shard_config: ShardConfig,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = match self.network_globals.listen_multiaddrs.read().first() {
Some(addr) => addr.clone(),
None => {
error!("No listen address available");
return None;
}
};
let addr = self.get_listen_addr()?;
let timestamp = timestamp_now();
let msg = AnnounceShardConfig {
@ -377,12 +408,7 @@ impl Libp2pEventHandler {
index_end: u64,
) -> Option<PubsubMessage> {
let peer_id = *self.network_globals.peer_id.read();
let addr = self
.network_globals
.listen_multiaddrs
.read()
.first()?
.clone();
let addr = self.get_listen_addr()?;
let timestamp = timestamp_now();
let msg = AnnounceChunks {
@ -473,6 +499,12 @@ impl Libp2pEventHandler {
return MessageAcceptance::Reject;
}
// verify public ip address if required
let addr = msg.at.clone().into();
if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) {
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since(msg.resend_timestamp);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
@ -484,7 +516,7 @@ impl Libp2pEventHandler {
self.send_to_sync(SyncMessage::AnnounceFileGossip {
tx_id: msg.tx_id,
peer_id: msg.peer_id.clone().into(),
addr: msg.at.clone().into(),
addr,
});
// insert message to cache
@ -503,6 +535,12 @@ impl Libp2pEventHandler {
return MessageAcceptance::Reject;
}
// verify public ip address if required
let addr = msg.at.clone().into();
if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) {
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since(msg.resend_timestamp);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
@ -518,7 +556,7 @@ impl Libp2pEventHandler {
self.send_to_sync(SyncMessage::AnnounceShardConfig {
shard_config,
peer_id: msg.peer_id.clone().into(),
addr: msg.at.clone().into(),
addr,
});
// insert message to cache
@ -538,6 +576,12 @@ impl Libp2pEventHandler {
return MessageAcceptance::Reject;
}
// verify public ip address if required
let addr = msg.at.clone().into();
if !self.config.private_ip_enabled && !Self::contains_public_ip(&addr) {
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since(msg.resend_timestamp);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
@ -625,6 +669,7 @@ mod tests {
impl Context {
fn new_handler(&self) -> Libp2pEventHandler {
Libp2pEventHandler::new(
Config::default().with_private_ip_enabled(true),
self.network_globals.clone(),
self.network_send.clone(),
self.sync_send.clone(),

View File

@ -2,7 +2,7 @@ use crate::Config;
use network::PeerId;
use rand::seq::IteratorRandom;
use std::collections::HashMap;
use std::time::Instant;
use std::time::{Duration, Instant};
/// Connected peer info.
struct PeerInfo {
@ -20,8 +20,8 @@ impl PeerInfo {
}
}
fn elapsed_secs(&self) -> u64 {
self.since.elapsed().as_secs()
fn elapsed(&self) -> Duration {
self.since.elapsed()
}
}
@ -98,7 +98,7 @@ impl PeerManager {
.peers
.iter()
.filter(|(_, peer)| {
peer.outgoing == outgoing && peer.elapsed_secs() >= self.config.idle_time_secs
peer.outgoing == outgoing && peer.elapsed() >= self.config.idle_time
})
.map(|(peer_id, _)| *peer_id)
.collect();
@ -116,10 +116,7 @@ impl PeerManager {
#[cfg(test)]
mod tests {
use std::{
ops::Sub,
time::{Duration, Instant},
};
use std::{ops::Sub, time::Instant};
use network::PeerId;
@ -202,10 +199,9 @@ mod tests {
}
// change timestamp for all peers
let idle_timeout = Duration::from_secs(config.idle_time_secs);
for peer_id in peers.iter() {
let peer = manager.peers.get_mut(peer_id).unwrap();
peer.since = Instant::now().sub(idle_timeout);
peer.since = Instant::now().sub(config.idle_time);
}
assert_eq!(

View File

@ -10,7 +10,6 @@ use network::{
};
use pruner::PrunerMessage;
use std::sync::Arc;
use std::time::Duration;
use storage::log_store::Store as LogStore;
use storage_async::Store;
use sync::{SyncMessage, SyncSender};
@ -68,13 +67,14 @@ impl RouterService {
// create the network service and spawn the task
let router = RouterService {
config,
config: config.clone(),
libp2p,
network_globals: network_globals.clone(),
network_recv,
pruner_recv,
peers: peers.clone(),
libp2p_event_handler: Libp2pEventHandler::new(
config,
network_globals,
network_send,
sync_send,
@ -94,7 +94,7 @@ impl RouterService {
}
async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
let mut heartbeat = interval(Duration::from_secs(self.config.heartbeat_interval_secs));
let mut heartbeat = interval(self.config.heartbeat_interval);
loop {
tokio::select! {
@ -194,6 +194,7 @@ impl RouterService {
}
},
Libp2pEvent::NewListenAddr(multiaddr) => {
info!(?multiaddr, "New listen address");
self.network_globals
.listen_multiaddrs
.write()

View File

@ -180,7 +180,7 @@ impl ZgsConfig {
}
pub fn router_config(&self, network_config: &NetworkConfig) -> Result<router::Config, String> {
let mut router_config = router::Config::default();
let mut router_config = self.router.clone();
router_config.libp2p_nodes = network_config.libp2p_nodes.to_vec();
Ok(router_config)
}

View File

@ -87,6 +87,9 @@ build_config! {
pub struct ZgsConfig {
pub raw_conf: RawConfiguration,
// router config, configured by [router] section by `config` crate.
pub router: router::Config,
// sync config, configured by [sync] section by `config` crate.
pub sync: sync::Config,
}

View File

@ -3,6 +3,9 @@ from web3 import Web3
ZGS_CONFIG = {
"log_config_file": "log_config",
"confirmation_block_count": 1,
"router": {
"private_ip_enabled": True,
}
}
BSC_CONFIG = dict(