mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 08:37:27 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			198 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			198 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
mod common;
 | 
						|
use std::{
 | 
						|
    collections::{HashMap, HashSet},
 | 
						|
    sync::Arc,
 | 
						|
};
 | 
						|
 | 
						|
use common::{
 | 
						|
    behaviour::{CallTraceBehaviour, MockBehaviour},
 | 
						|
    swarm,
 | 
						|
};
 | 
						|
use network::{
 | 
						|
    peer_manager::{config::Config, peerdb::PeerDBConfig, PeerManagerEvent},
 | 
						|
    NetworkGlobals, PeerAction, PeerInfo, PeerManager, ReportSource,
 | 
						|
};
 | 
						|
 | 
						|
use futures::StreamExt;
 | 
						|
use libp2p::{
 | 
						|
    core::either::EitherError,
 | 
						|
    swarm::SwarmEvent,
 | 
						|
    swarm::{handler::DummyConnectionHandler, DummyBehaviour, KeepAlive, Swarm},
 | 
						|
    NetworkBehaviour,
 | 
						|
};
 | 
						|
 | 
						|
use tracing::debug;
 | 
						|
 | 
						|
/// Struct that mimics the lighthouse_network::Service with respect to handling peer manager
 | 
						|
/// events.
 | 
						|
// TODO: make this a real struct for more accurate testing.
 | 
						|
struct Service {
 | 
						|
    swarm: Swarm<Behaviour>,
 | 
						|
}
 | 
						|
 | 
						|
impl Service {
 | 
						|
    async fn select_next_some(&mut self) -> SwarmEvent<Ev, EitherError<void::Void, void::Void>> {
 | 
						|
        let ev = self.swarm.select_next_some().await;
 | 
						|
        match &ev {
 | 
						|
            SwarmEvent::Behaviour(Ev(PeerManagerEvent::Banned(peer_id, _addr_vec))) => {
 | 
						|
                self.swarm.ban_peer_id(*peer_id);
 | 
						|
            }
 | 
						|
            SwarmEvent::Behaviour(Ev(PeerManagerEvent::UnBanned(peer_id, _addr_vec))) => {
 | 
						|
                self.swarm.unban_peer_id(*peer_id);
 | 
						|
            }
 | 
						|
            SwarmEvent::Behaviour(Ev(PeerManagerEvent::DisconnectPeer(peer_id, _reason))) => {
 | 
						|
                // directly disconnect here.
 | 
						|
                let _ = self.swarm.disconnect_peer_id(*peer_id);
 | 
						|
            }
 | 
						|
            _ => {}
 | 
						|
        }
 | 
						|
        ev
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
#[derive(Debug)]
 | 
						|
struct Ev(PeerManagerEvent);
 | 
						|
impl From<void::Void> for Ev {
 | 
						|
    fn from(_: void::Void) -> Self {
 | 
						|
        unreachable!("No events are emitted")
 | 
						|
    }
 | 
						|
}
 | 
						|
impl From<PeerManagerEvent> for Ev {
 | 
						|
    fn from(ev: PeerManagerEvent) -> Self {
 | 
						|
        Ev(ev)
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
#[derive(NetworkBehaviour)]
 | 
						|
#[behaviour(out_event = "Ev")]
 | 
						|
struct Behaviour {
 | 
						|
    pm_call_trace: CallTraceBehaviour<PeerManager>,
 | 
						|
    sibling: MockBehaviour,
 | 
						|
}
 | 
						|
 | 
						|
impl Behaviour {
 | 
						|
    fn new(pm: PeerManager) -> Self {
 | 
						|
        Behaviour {
 | 
						|
            pm_call_trace: CallTraceBehaviour::new(pm),
 | 
						|
            sibling: MockBehaviour::new(DummyConnectionHandler {
 | 
						|
                // The peer manager votes No, so we make sure the combined handler stays alive this
 | 
						|
                // way.
 | 
						|
                keep_alive: KeepAlive::Yes,
 | 
						|
            }),
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn banned_peers_consistency() {
 | 
						|
    let globals: Arc<NetworkGlobals> = Arc::new(NetworkGlobals::new_test_globals());
 | 
						|
 | 
						|
    // Build the peer manager.
 | 
						|
    let (mut pm_service, pm_addr) = {
 | 
						|
        let pm_config = Config {
 | 
						|
            discovery_enabled: false,
 | 
						|
            ..Default::default()
 | 
						|
        };
 | 
						|
        let pm = PeerManager::new(pm_config, globals.clone()).await.unwrap();
 | 
						|
        let mut pm_swarm = swarm::new_test_swarm(Behaviour::new(pm));
 | 
						|
        let pm_addr = swarm::bind_listener(&mut pm_swarm).await;
 | 
						|
        let service = Service { swarm: pm_swarm };
 | 
						|
        (service, pm_addr)
 | 
						|
    };
 | 
						|
 | 
						|
    let excess_banned_peers = 15;
 | 
						|
    let peers_to_ban = PeerDBConfig::default().max_banned_peers + excess_banned_peers;
 | 
						|
 | 
						|
    // Build all the dummy peers needed.
 | 
						|
    let (mut swarm_pool, peers) = {
 | 
						|
        let mut pool = swarm::SwarmPool::with_capacity(peers_to_ban);
 | 
						|
        let mut peers = HashSet::with_capacity(peers_to_ban);
 | 
						|
        for _ in 0..peers_to_ban {
 | 
						|
            let mut peer_swarm =
 | 
						|
                swarm::new_test_swarm(DummyBehaviour::with_keep_alive(KeepAlive::Yes));
 | 
						|
            let _peer_addr = swarm::bind_listener(&mut peer_swarm).await;
 | 
						|
            // It is ok to dial all at the same time since the swarm handles an event at a time.
 | 
						|
            peer_swarm.dial(pm_addr.clone()).unwrap();
 | 
						|
            let peer_id = pool.insert(peer_swarm);
 | 
						|
            peers.insert(peer_id);
 | 
						|
        }
 | 
						|
        (pool, peers)
 | 
						|
    };
 | 
						|
 | 
						|
    // we track banned peers at the swarm level here since there is no access to that info.
 | 
						|
    let mut swarm_banned_peers = HashMap::with_capacity(peers_to_ban);
 | 
						|
    let mut peers_unbanned = 0;
 | 
						|
    let timeout = tokio::time::sleep(tokio::time::Duration::from_secs(30));
 | 
						|
    futures::pin_mut!(timeout);
 | 
						|
 | 
						|
    loop {
 | 
						|
        // poll the pm and dummy swarms.
 | 
						|
        tokio::select! {
 | 
						|
            pm_event = pm_service.select_next_some() => {
 | 
						|
                debug!("[PM] {:?}", pm_event);
 | 
						|
                match pm_event {
 | 
						|
                    SwarmEvent::Behaviour(Ev(ev)) => match ev {
 | 
						|
                        PeerManagerEvent::Banned(peer_id, _) => {
 | 
						|
                            let has_been_unbanned = false;
 | 
						|
                            swarm_banned_peers.insert(peer_id, has_been_unbanned);
 | 
						|
                        }
 | 
						|
                        PeerManagerEvent::UnBanned(peer_id, _) => {
 | 
						|
                            *swarm_banned_peers.get_mut(&peer_id).expect("Unbanned peer must be banned first") = true;
 | 
						|
                            peers_unbanned += 1;
 | 
						|
                        }
 | 
						|
                        _ => {}
 | 
						|
                    }
 | 
						|
                    SwarmEvent::ConnectionEstablished {
 | 
						|
                        peer_id,
 | 
						|
                        endpoint: _,
 | 
						|
                        num_established: _,
 | 
						|
                        concurrent_dial_errors: _,
 | 
						|
                    } => {
 | 
						|
                        assert!(peers.contains(&peer_id));
 | 
						|
                        // now we report the peer as banned.
 | 
						|
                        pm_service
 | 
						|
                            .swarm
 | 
						|
                            .behaviour_mut()
 | 
						|
                            .pm_call_trace
 | 
						|
                            .inner()
 | 
						|
                            .report_peer(
 | 
						|
                                &peer_id,
 | 
						|
                                PeerAction::Fatal,
 | 
						|
                                ReportSource::Processor,
 | 
						|
                                None,
 | 
						|
                                ""
 | 
						|
                            );
 | 
						|
                    },
 | 
						|
                    _ => {}
 | 
						|
                }
 | 
						|
            }
 | 
						|
            Some((_peer_id, _peer_ev)) = swarm_pool.next() => {
 | 
						|
                // we need to poll the swarms to keep the peers going
 | 
						|
            }
 | 
						|
            _ = timeout.as_mut() => {
 | 
						|
                panic!("Test timeout.")
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        if peers_unbanned == excess_banned_peers {
 | 
						|
            let pdb = globals.peers.read();
 | 
						|
            let inconsistencies = swarm_banned_peers
 | 
						|
                .into_iter()
 | 
						|
                .map(|(peer_id, was_unbanned)| {
 | 
						|
                    was_unbanned
 | 
						|
                        != pdb.peer_info(&peer_id).map_or(
 | 
						|
                            false, /* We forgot about a banned peer */
 | 
						|
                            PeerInfo::is_banned,
 | 
						|
                        )
 | 
						|
                });
 | 
						|
            assert_eq!(
 | 
						|
                inconsistencies
 | 
						|
                    .filter(|is_consistent| *is_consistent)
 | 
						|
                    .count(),
 | 
						|
                peers_to_ban
 | 
						|
            );
 | 
						|
            return;
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 |