//! Session and packet handling for the Discv5 Discovery service. //! //! The [`Handler`] is responsible for establishing and maintaining sessions with //! connected/discovered nodes. Each node, identified by it's [`NodeId`] is associated with a //! `Session`. This service drives the handshakes for establishing the sessions and associated //! logic for sending/requesting initial connections/ENR's to/from unknown peers. //! //! The [`Handler`] also manages the timeouts for each request and reports back RPC failures, //! and received messages. Messages are encrypted and decrypted using the //! associated `Session` for each node. //! //! An ongoing established connection is abstractly represented by a `Session`. A node that provides an ENR with an //! IP address/port that doesn't match the source, is considered invalid. A node that doesn't know //! their external contactable addresses should set their ENR IP field to `None`. //! //! The Handler also routinely checks the timeouts for banned nodes and removes them from the //! banned list once their ban expires. //! //! # Usage //! //! Interacting with a handler is done via channels. A Handler is spawned using the [`Handler::spawn`] //! function. This returns an exit channel, a sending and receiving channel respectively. If the //! exit channel is dropped or fired, the handler task gets shutdown. //! //! Requests to the handler can be made via the sending channel using a [`HandlerRequest`]. //! Responses come by the receiving channel in the form of a [`HandlerResponse`]. use crate::{ config::Discv5Config, discv5::PERMIT_BAN_LIST, error::{Discv5Error, RequestError}, packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind}, rpc::{Message, Request, RequestBody, RequestId, Response, ResponseBody}, socket, socket::{FilterConfig, Socket}, Enr, }; use enr::{CombinedKey, NodeId}; use futures::prelude::*; use parking_lot::RwLock; use std::{ collections::HashMap, convert::TryFrom, default::Default, net::SocketAddr, sync::{atomic::Ordering, Arc}, time::{Duration, Instant}, }; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, trace, warn}; mod crypto; mod hashmap_delay; mod session; mod tests; pub use crate::node_info::{NodeAddress, NodeContact}; use crate::metrics::METRICS; use crate::lru_time_cache::LruTimeCache; use hashmap_delay::HashMapDelay; use session::Session; // The time interval to check banned peer timeouts and unban peers when the timeout has elapsed (in // seconds). const BANNED_NODES_CHECK: u64 = 300; // Check every 5 minutes. /// Events sent to the handler to be executed. #[derive(Debug, Clone, PartialEq)] #[allow(clippy::large_enum_variant)] pub enum HandlerRequest { /// Sends a `Request` to a `NodeContact`. A `NodeContact` is an abstract type /// that allows for either an ENR to be sent or a `Raw` type which represents an `SocketAddr`, /// `PublicKey` and `NodeId`. This type can be created from MultiAddrs and MultiAddr strings /// for some keys. /// /// This permits us to send messages to nodes without knowing their ENR. In this case their ENR /// will be requested during the handshake. /// /// A Request is flagged and permits responses through the packet filter. /// /// Note: To update an ENR for an unknown node, we request a FINDNODE with distance 0 to the /// `NodeContact` we know of. Request(NodeContact, Box), /// Send a response to a received request to a particular node. /// /// The handler does not keep state of requests, so the application layer must send the /// response back to the `NodeAddress` from which the request was received. Response(NodeAddress, Box), /// A Random packet has been received and we have requested the application layer to inform /// us what the highest known ENR is for this node. /// The `WhoAreYouRef` is sent out in the `HandlerResponse::WhoAreYou` event and should /// be returned here to submit the application's response. WhoAreYou(WhoAreYouRef, Option), } /// The outputs provided by the `Handler`. #[derive(Debug, Clone, PartialEq)] pub enum HandlerResponse { /// A session has been established with a node. /// /// A session is only considered established once we have received a signed ENR from the /// node and received messages from it's `SocketAddr` matching it's ENR fields. Established(Enr, ConnectionDirection), /// A Request has been received. Request(NodeAddress, Box), /// A Response has been received. Response(NodeAddress, Box), /// An unknown source has requested information from us. Return the reference with the known /// ENR of this node (if known). See the `HandlerRequest::WhoAreYou` variant. WhoAreYou(WhoAreYouRef), /// An RPC request failed. /// /// This returns the request ID and an error indicating why the request failed. RequestFailed(RequestId, RequestError), } /// How we connected to the node. #[derive(PartialEq, Eq, Debug, Copy, Clone)] pub enum ConnectionDirection { /// The node contacted us. Incoming, /// We contacted the node. Outgoing, } /// A reference for the application layer to send back when the handler requests any known /// ENR for the NodeContact. #[derive(Debug, Clone, PartialEq)] pub struct WhoAreYouRef(pub NodeAddress, MessageNonce); #[derive(Debug)] /// A Challenge (WHOAREYOU) object used to handle and send WHOAREYOU requests. pub struct Challenge { /// The challenge data received from the node. data: ChallengeData, /// The remote's ENR if we know it. We can receive a challenge from an unknown node. remote_enr: Option, } /// A request to a node that we are waiting for a response. #[derive(Debug)] pub(crate) struct RequestCall { contact: NodeContact, /// The raw discv5 packet sent. packet: Packet, /// The unencrypted message. Required if need to re-encrypt and re-send. request: Request, /// Handshakes attempted. handshake_sent: bool, /// The number of times this request has been re-sent. retries: u8, /// If we receive a Nodes Response with a total greater than 1. This keeps track of the /// remaining responses expected. remaining_responses: Option, /// Signifies if we are initiating the session with a random packet. This is only used to /// determine the connection direction of the session. initiating_session: bool, } impl RequestCall { fn new( contact: NodeContact, packet: Packet, request: Request, initiating_session: bool, ) -> Self { RequestCall { contact, packet, request, handshake_sent: false, retries: 1, remaining_responses: None, initiating_session, } } fn id(&self) -> &RequestId { &self.request.id } } /// Process to handle handshakes and sessions established from raw RPC communications between nodes. pub struct Handler { /// Configuration for the discv5 service. request_retries: u8, /// The local node id to save unnecessary read locks on the ENR. The NodeID should not change /// during the operation of the server. node_id: NodeId, /// The local ENR. enr: Arc>, /// The key to sign the ENR and set up encrypted communication with peers. key: Arc>, /// Pending raw requests. A list of raw messages we are awaiting a response from the remote. active_requests: HashMapDelay, // WHOAREYOU messages do not include the source node id. We therefore maintain another // mapping of active_requests via message_nonce. This allows us to match WHOAREYOU // requests with active requests sent. /// A mapping of all pending active raw requests message nonces to their NodeAddress. active_requests_nonce_mapping: HashMap, /// The expected responses by SocketAddr which allows packets to pass the underlying filter. filter_expected_responses: Arc>>, /// Requests awaiting a handshake completion. pending_requests: HashMap>, /// Currently in-progress handshakes with peers. active_challenges: LruTimeCache, /// Established sessions with peers. sessions: LruTimeCache, /// The channel that receives requests from the application layer. inbound_channel: mpsc::UnboundedReceiver, /// The channel to send responses to the application layer. outbound_channel: mpsc::Sender, /// The listening socket to filter out any attempted requests to self. listen_socket: SocketAddr, /// The discovery v5 UDP socket tasks. socket: Socket, /// Exit channel to shutdown the handler. exit: oneshot::Receiver<()>, } type HandlerReturn = ( oneshot::Sender<()>, mpsc::UnboundedSender, mpsc::Receiver, ); impl Handler { /// A new Session service which instantiates the UDP socket send/recv tasks. pub async fn spawn( enr: Arc>, key: Arc>, listen_socket: SocketAddr, config: Discv5Config, ) -> Result { let (exit_sender, exit) = oneshot::channel(); // create the channels to send/receive messages from the application let (inbound_send, inbound_channel) = mpsc::unbounded_channel(); let (outbound_channel, outbound_recv) = mpsc::channel(50); // Creates a SocketConfig to pass to the underlying UDP socket tasks. // Lets the underlying filter know that we are expecting a packet from this source. let filter_expected_responses = Arc::new(RwLock::new(HashMap::new())); // The local node id let node_id = enr.read().node_id(); // enable the packet filter if required let filter_config = FilterConfig { enabled: config.enable_packet_filter, rate_limiter: config.filter_rate_limiter.clone(), max_nodes_per_ip: config.filter_max_nodes_per_ip, max_bans_per_ip: config.filter_max_bans_per_ip, }; let socket_config = socket::SocketConfig { executor: config.executor.clone().expect("Executor must exist"), socket_addr: listen_socket, filter_config, local_node_id: node_id, expected_responses: filter_expected_responses.clone(), ban_duration: config.ban_duration, }; // Attempt to bind to the socket before spinning up the send/recv tasks. let socket = socket::Socket::new_socket(&socket_config.socket_addr).await?; config .executor .clone() .expect("Executor must be present") .spawn(Box::pin(async move { let socket = match socket::Socket::new(socket, socket_config) { Ok(v) => v, Err(e) => { error!("Could not bind UDP socket. {}", e); return; } }; let mut handler = Handler { request_retries: config.request_retries, node_id, enr, key, active_requests: HashMapDelay::new(config.request_timeout), active_requests_nonce_mapping: HashMap::new(), pending_requests: HashMap::new(), filter_expected_responses, sessions: LruTimeCache::new( config.session_timeout, Some(config.session_cache_capacity), ), active_challenges: LruTimeCache::new(config.request_timeout * 2, None), inbound_channel, outbound_channel, listen_socket, socket, exit, }; debug!("Handler Starting"); handler.start().await; })); Ok((exit_sender, inbound_send, outbound_recv)) } /// The main execution loop for the handler. async fn start(&mut self) { let mut banned_nodes_check = tokio::time::interval(Duration::from_secs(BANNED_NODES_CHECK)); loop { tokio::select! { Some(handler_request) = self.inbound_channel.recv() => { match handler_request { HandlerRequest::Request(contact, request) => { let id = request.id.clone(); if let Err(request_error) = self.send_request(contact, *request).await { // If the sending failed report to the application let _ = self.outbound_channel.send(HandlerResponse::RequestFailed(id, request_error)).await; } } HandlerRequest::Response(dst, response) => self.send_response(dst, *response).await, HandlerRequest::WhoAreYou(wru_ref, enr) => self.send_challenge(wru_ref, enr).await, } } Some(inbound_packet) = self.socket.recv.recv() => { self.process_inbound_packet(inbound_packet).await; } Some(Ok((node_address, pending_request))) = self.active_requests.next() => { self.handle_request_timeout(node_address, pending_request).await; } _ = banned_nodes_check.tick() => self.unban_nodes_check(), // Unban nodes that are past the timeout _ = &mut self.exit => { return; } } } } /// Processes an inbound decoded packet. async fn process_inbound_packet(&mut self, inbound_packet: socket::InboundPacket) { let message_nonce = inbound_packet.header.message_nonce; match inbound_packet.header.kind { PacketKind::WhoAreYou { enr_seq, .. } => { let challenge_data = ChallengeData::try_from(inbound_packet.authenticated_data.as_slice()) .expect("Must be correct size"); self.handle_challenge( inbound_packet.src_address, message_nonce, enr_seq, challenge_data, ) .await } PacketKind::Handshake { src_id, id_nonce_sig, ephem_pubkey, enr_record, } => { let node_address = NodeAddress { socket_addr: inbound_packet.src_address, node_id: src_id, }; self.handle_auth_message( node_address, message_nonce, &id_nonce_sig, &ephem_pubkey, enr_record, &inbound_packet.message, &inbound_packet.authenticated_data, // This is required for authenticated data in decryption. ) .await } PacketKind::Message { src_id } => { let node_address = NodeAddress { socket_addr: inbound_packet.src_address, node_id: src_id, }; self.handle_message( node_address, message_nonce, &inbound_packet.message, &inbound_packet.authenticated_data, ) .await } } } fn remove_expected_response(&mut self, socket_addr: SocketAddr) { if let std::collections::hash_map::Entry::Occupied(mut entry) = self.filter_expected_responses.write().entry(socket_addr) { let count = entry.get_mut(); *count = count.saturating_sub(1); if count == &0 { entry.remove(); } } } fn add_expected_response(&mut self, socket_addr: SocketAddr) { *self .filter_expected_responses .write() .entry(socket_addr) .or_default() += 1; } /// A request has timed out. async fn handle_request_timeout( &mut self, node_address: NodeAddress, mut request_call: RequestCall, ) { if request_call.retries >= self.request_retries { trace!("Request timed out with {}", node_address); // Remove the request from the awaiting packet_filter // Remove the associated nonce mapping. self.active_requests_nonce_mapping .remove(request_call.packet.message_nonce()); self.remove_expected_response(node_address.socket_addr); // The request has timed out. We keep any established session for future use. self.fail_request(request_call, RequestError::Timeout, false) .await; } else { // increment the request retry count and restart the timeout trace!( "Resending message: {} to {}", request_call.request, node_address ); self.send(node_address.clone(), request_call.packet.clone()) .await; request_call.retries += 1; self.active_requests.insert(node_address, request_call); } } /// Sends a `Request` to a node. async fn send_request( &mut self, contact: NodeContact, request: Request, ) -> Result<(), RequestError> { let node_address = contact .node_address() .map_err(|e| RequestError::InvalidEnr(e.into()))?; if node_address.socket_addr == self.listen_socket { debug!("Filtered request to self"); return Err(RequestError::SelfRequest); } // If there is already an active request for this node, add to pending requests if self.active_requests.get(&node_address).is_some() { trace!("Request queued for node: {}", node_address); self.pending_requests .entry(node_address) .or_insert_with(Vec::new) .push((contact, request)); return Ok(()); } let (packet, initiating_session) = { if let Some(session) = self.sessions.get_mut(&node_address) { // Encrypt the message and send let packet = session .encrypt_message(self.node_id, &request.clone().encode()) .map_err(|e| RequestError::EncryptionFailed(format!("{:?}", e)))?; (packet, false) } else { // No session exists, start a new handshake trace!( "Starting session. Sending random packet to: {}", node_address ); let packet = Packet::new_random(&self.node_id).map_err(RequestError::EntropyFailure)?; // We are initiating a new session (packet, true) } }; let call = RequestCall::new(contact, packet.clone(), request, initiating_session); // let the filter know we are expecting a response self.add_expected_response(node_address.socket_addr); let nonce = *packet.message_nonce(); self.send(node_address.clone(), packet).await; self.active_requests_nonce_mapping .insert(nonce, node_address.clone()); self.active_requests.insert(node_address, call); Ok(()) } /// Sends an RPC Response. async fn send_response(&mut self, node_address: NodeAddress, response: Response) { // Check for an established session if let Some(session) = self.sessions.get_mut(&node_address) { // Encrypt the message and send let packet = match session.encrypt_message(self.node_id, &response.encode()) { Ok(packet) => packet, Err(e) => { warn!("Could not encrypt response: {:?}", e); return; } }; self.send(node_address, packet).await; } else { // Either the session is being established or has expired. We simply drop the // response in this case. warn!( "Session is not established. Dropping response {} for node: {}", response, node_address.node_id ); } } /// This is called in response to a `HandlerResponse::WhoAreYou` event. The applications finds the /// highest known ENR for a node then we respond to the node with a WHOAREYOU packet. async fn send_challenge(&mut self, wru_ref: WhoAreYouRef, remote_enr: Option) { let node_address = wru_ref.0; let message_nonce = wru_ref.1; if self.active_challenges.peek(&node_address).is_some() { warn!("WHOAREYOU already sent. {}", node_address); return; } // Ignore this request if the session is already established if self.sessions.get(&node_address).is_some() { trace!( "Session already established. WHOAREYOU not sent to {}", node_address ); return; } // It could be the case we have sent an ENR with an active request, however we consider // these independent as this is in response to an unknown packet. If the ENR it not in our // table (remote_enr is None) then we re-request the ENR to keep the session up to date. // send the challenge let enr_seq = remote_enr.clone().map_or_else(|| 0, |enr| enr.seq()); let id_nonce: IdNonce = rand::random(); let packet = Packet::new_whoareyou(message_nonce, id_nonce, enr_seq); let challenge_data = ChallengeData::try_from(packet.authenticated_data().as_slice()) .expect("Must be the correct challenge size"); debug!("Sending WHOAREYOU to {}", node_address); self.send(node_address.clone(), packet).await; self.active_challenges.insert( node_address, Challenge { data: challenge_data, remote_enr, }, ); } /* Packet Handling */ /// Handles a WHOAREYOU packet that was received from the network. async fn handle_challenge( &mut self, src_address: SocketAddr, request_nonce: MessageNonce, enr_seq: u64, challenge_data: ChallengeData, ) { // Check that this challenge matches a known active request. // If this message passes all the requisite checks, a request call is returned. let mut request_call = { // Check for an active request let node_address = match self.active_requests_nonce_mapping.remove(&request_nonce) { Some(addr) => addr, None => { trace!("Received a WHOAREYOU packet that references an unknown or expired request. Source {}, message_nonce {}", src_address, hex::encode(request_nonce)); return; } }; // Verify that the src_addresses match if node_address.socket_addr != src_address { trace!("Received a WHOAREYOU packet for a message with a non-expected source. Source {}, expected_source: {} message_nonce {}", src_address, node_address.socket_addr, hex::encode(request_nonce)); // add the mapping back self.active_requests_nonce_mapping .insert(request_nonce, node_address); return; } // Obtain the request from the mapping. This must exist, otherwise there is a // serious coding error. The active_requests_nonce_mapping and active_requests // mappings should be 1 to 1. match self.active_requests.remove(&node_address) { Some(request_call) => request_call, None => { error!("Active request mappings are not in sync. Message_id {}, node_address {} doesn't exist in active request mapping", hex::encode(request_nonce), node_address); // NOTE: Both mappings are removed in this case. return; } } }; // double check the message nonces match if request_call.packet.message_nonce() != &request_nonce { // This could theoretically happen if a peer uses the same node id across // different connections. warn!("Received a WHOAREYOU from a non expected source. Source: {}, message_nonce {} , expected_nonce: {}", request_call.contact, hex::encode(request_call.packet.message_nonce()), hex::encode(request_nonce)); // NOTE: Both mappings are removed in this case. return; } trace!( "Received a WHOAREYOU packet response. Source: {}", request_call.contact ); // We do not allow multiple WHOAREYOU packets for a single challenge request. If we have // already sent a WHOAREYOU ourselves, we drop sessions who send us a WHOAREYOU in // response. if request_call.handshake_sent { warn!( "Authentication response already sent. Dropping session. Node: {}", request_call.contact ); self.fail_request(request_call, RequestError::InvalidRemotePacket, true) .await; return; } // Encrypt the message with an auth header and respond // First if a new version of our ENR is requested, obtain it for the header let updated_enr = if enr_seq < self.enr.read().seq() { Some(self.enr.read().clone()) } else { None }; // Generate a new session and authentication packet let (auth_packet, mut session) = match Session::encrypt_with_header( &request_call.contact, self.key.clone(), updated_enr, &self.node_id, &challenge_data, &(request_call.request.clone().encode()), ) { Ok(v) => v, Err(e) => { error!("Could not generate a session. Error: {:?}", e); self.fail_request(request_call, RequestError::InvalidRemotePacket, true) .await; return; } }; // There are two quirks with an established session at this point. // 1. We may not know the ENR if we dialed this node with a NodeContact::Raw. In this case // we need to set up a request to find the ENR and wait for a response before we // officially call this node established. // 2. The challenge here could be to an already established session. If so, we need to // update the existing session to attempt to decrypt future messages with the new keys // and update the keys internally upon successful decryption. // // We handle both of these cases here. // Check if we know the ENR, if not request it and flag the session as awaiting an ENR. // // All sent requests must have an associated node_id. Therefore the following // must not panic. let node_address = request_call .contact .node_address() .expect("All sent requests must have a node address"); match request_call.contact.clone() { NodeContact::Enr(enr) => { // NOTE: Here we decide if the session is outgoing or ingoing. The condition for an // outgoing session is that we originally sent a RANDOM packet (signifying we did // not have a session for a request) and the packet is not a PING (we are not // trying to update an old session that may have expired. let connection_direction = { match (&request_call.initiating_session, &request_call.request.body) { (true, RequestBody::Ping { .. }) => ConnectionDirection::Incoming, (true, _) => ConnectionDirection::Outgoing, (false, _) => ConnectionDirection::Incoming, } }; // We already know the ENR. Send the handshake response packet trace!("Sending Authentication response to node: {}", node_address); request_call.packet = auth_packet.clone(); request_call.handshake_sent = true; request_call.initiating_session = false; // Reinsert the request_call self.insert_active_request(request_call); // Send the actual packet to the send task. self.send(node_address.clone(), auth_packet).await; // Notify the application that the session has been established self.outbound_channel .send(HandlerResponse::Established(*enr, connection_direction)) .await .unwrap_or_else(|e| warn!("Error with sending channel: {}", e)); } NodeContact::Raw { .. } => { // Don't know the ENR. Establish the session, but request an ENR also // Send the Auth response let contact = request_call.contact.clone(); trace!( "Sending Authentication response to node: {}", request_call .contact .node_address() .expect("Sanitized contact") ); request_call.packet = auth_packet.clone(); request_call.handshake_sent = true; // Reinsert the request_call self.insert_active_request(request_call); self.send(node_address.clone(), auth_packet).await; let id = RequestId::random(); let request = Request { id: id.clone(), body: RequestBody::FindNode { distances: vec![0] }, }; session.awaiting_enr = Some(id); let _ = self.send_request(contact, request).await; } } self.new_session(node_address, session); } /// Verifies a Node ENR to it's observed address. If it fails, any associated session is also /// considered failed. If it succeeds, we notify the application. fn verify_enr(&self, enr: &Enr, node_address: &NodeAddress) -> bool { // If the ENR does not match the observed IP addresses, we consider the Session // failed. enr.node_id() == node_address.node_id && (enr.udp_socket().is_none() || enr.udp_socket() == Some(node_address.socket_addr)) } /// Handle a message that contains an authentication header. #[allow(clippy::too_many_arguments)] async fn handle_auth_message( &mut self, node_address: NodeAddress, message_nonce: MessageNonce, id_nonce_sig: &[u8], ephem_pubkey: &[u8], enr_record: Option, message: &[u8], authenticated_data: &[u8], ) { // Needs to match an outgoing challenge packet (so we have the required nonce to be signed). If it doesn't we drop the packet. // This will lead to future outgoing challenges if they proceed to send further encrypted // packets. trace!( "Received an Authentication header message from: {}", node_address ); if let Some(challenge) = self.active_challenges.remove(&node_address) { match Session::establish_from_challenge( self.key.clone(), &self.node_id, &node_address.node_id, challenge, id_nonce_sig, ephem_pubkey, enr_record, ) { Ok((session, enr)) => { // Receiving an AuthResponse must give us an up-to-date view of the node ENR. // Verify the ENR is valid if self.verify_enr(&enr, &node_address) { // Session is valid // Notify the application // The session established here are from WHOAREYOU packets that we sent. // This occurs when a node established a connection with us. let _ = self .outbound_channel .send(HandlerResponse::Established( enr, ConnectionDirection::Incoming, )) .await; self.new_session(node_address.clone(), session); self.handle_message( node_address, message_nonce, message, authenticated_data, ) .await; } else { // IP's or NodeAddress don't match. Drop the session. warn!( "Session has invalid ENR. Enr socket: {:?}, {}", enr.udp_socket(), node_address ); self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true) .await; } } Err(Discv5Error::InvalidChallengeSignature(challenge)) => { warn!( "Authentication header contained invalid signature. Ignoring packet from: {}", node_address ); // insert back the challenge self.active_challenges.insert(node_address, challenge); } Err(e) => { warn!( "Invalid Authentication header. Dropping session. Error: {:?}", e ); self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) .await; } } } else { warn!( "Received an authenticated header without a matching WHOAREYOU request. {}", node_address ); } } async fn send_next_request(&mut self, node_address: NodeAddress) { // ensure we are not over writing any existing requests if self.active_requests.get(&node_address).is_none() { if let std::collections::hash_map::Entry::Occupied(mut entry) = self.pending_requests.entry(node_address) { // If it exists, there must be a request here let request = entry.get_mut().remove(0); if entry.get().is_empty() { entry.remove(); } trace!("Sending next awaiting message. Node: {}", request.0); let _ = self.send_request(request.0, request.1).await; } } } /// Handle a standard message that does not contain an authentication header. #[allow(clippy::single_match)] async fn handle_message( &mut self, node_address: NodeAddress, message_nonce: MessageNonce, message: &[u8], authenticated_data: &[u8], ) { // check if we have an available session if let Some(session) = self.sessions.get_mut(&node_address) { // attempt to decrypt and process the message. let message = match session.decrypt_message(message_nonce, message, authenticated_data) { Ok(m) => match Message::decode(&m) { Ok(p) => p, Err(e) => { warn!("Failed to decode message. Error: {:?}, {}", e, node_address); return; } }, Err(e) => { // We have a session, but the message could not be decrypted. It is likely the node // sending this message has dropped their session. In this case, this message is a // Random packet and we should reply with a WHOAREYOU. // This means we need to drop the current session and re-establish. trace!("Decryption failed. Error {}", e); debug!( "Message from node: {} is not encrypted with known session keys.", node_address ); self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) .await; // If we haven't already sent a WhoAreYou, // spawn a WHOAREYOU event to check for highest known ENR // Update the cache time and remove expired entries. if self.active_challenges.peek(&node_address).is_none() { let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); let _ = self .outbound_channel .send(HandlerResponse::WhoAreYou(whoareyou_ref)) .await; } else { trace!("WHOAREYOU packet already sent: {}", node_address); } return; } }; trace!("Received message from: {}", node_address); // Remove any associated request from pending_request match message { Message::Request(request) => { // report the request to the application let _ = self .outbound_channel .send(HandlerResponse::Request(node_address, Box::new(request))) .await; } Message::Response(response) => { // Sessions could be awaiting an ENR response. Check if this response matches // these if let Some(request_id) = session.awaiting_enr.as_ref() { if &response.id == request_id { session.awaiting_enr = None; match response.body { ResponseBody::Nodes { mut nodes, .. } => { // Received the requested ENR if let Some(enr) = nodes.pop() { if self.verify_enr(&enr, &node_address) { // Notify the application // This can occur when we try to dial a node without an // ENR. In this case we have attempted to establish the // connection, so this is an outgoing connection. let _ = self .outbound_channel .send(HandlerResponse::Established( enr, ConnectionDirection::Outgoing, )) .await; return; } } } _ => {} } debug!("Session failed invalid ENR response"); self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true) .await; return; } } // Handle standard responses self.handle_response(node_address, response).await; } } } else { // no session exists trace!("Received a message without a session. {}", node_address); trace!("Requesting a WHOAREYOU packet to be sent."); // spawn a WHOAREYOU event to check for highest known ENR let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); let _ = self .outbound_channel .send(HandlerResponse::WhoAreYou(whoareyou_ref)) .await; } } /// Handles a response to a request. Re-inserts the request call if the response is a multiple /// Nodes response. async fn handle_response(&mut self, node_address: NodeAddress, response: Response) { // Find a matching request, if any if let Some(mut request_call) = self.active_requests.remove(&node_address) { if request_call.id() != &response.id { trace!( "Received an RPC Response to an unknown request. Likely late response. {}", node_address ); // add the request back and reset the timer self.active_requests.insert(node_address, request_call); return; } // The response matches a request // Check to see if this is a Nodes response, in which case we may require to wait for // extra responses if let ResponseBody::Nodes { total, .. } = response.body { if total > 1 { // This is a multi-response Nodes response if let Some(remaining_responses) = request_call.remaining_responses.as_mut() { *remaining_responses -= 1; if remaining_responses != &0 { // more responses remaining, add back the request and send the response // add back the request and send the response self.active_requests .insert(node_address.clone(), request_call); let _ = self .outbound_channel .send(HandlerResponse::Response(node_address, Box::new(response))) .await; return; } } else { // This is the first instance request_call.remaining_responses = Some(total - 1); // add back the request and send the response self.active_requests .insert(node_address.clone(), request_call); let _ = self .outbound_channel .send(HandlerResponse::Response(node_address, Box::new(response))) .await; return; } } } // Remove the associated nonce mapping. self.active_requests_nonce_mapping .remove(request_call.packet.message_nonce()); // Remove the expected response self.remove_expected_response(node_address.socket_addr); // The request matches report the response let _ = self .outbound_channel .send(HandlerResponse::Response( node_address.clone(), Box::new(response), )) .await; self.send_next_request(node_address).await; } else { // This is likely a late response and we have already failed the request. These get // dropped here. trace!("Late response from node: {}", node_address); } } /// Inserts a request and associated auth_tag mapping. fn insert_active_request(&mut self, request_call: RequestCall) { let node_address = request_call .contact .node_address() .expect("Can only add requests with a valid destination"); // adds the mapping of message nonce to node address self.active_requests_nonce_mapping .insert(*request_call.packet.message_nonce(), node_address.clone()); self.active_requests.insert(node_address, request_call); } fn new_session(&mut self, node_address: NodeAddress, session: Session) { if let Some(current_session) = self.sessions.get_mut(&node_address) { current_session.update(session); } else { self.sessions.insert(node_address, session); METRICS .active_sessions .store(self.sessions.len(), Ordering::Relaxed); } } /// A request has failed. async fn fail_request( &mut self, request_call: RequestCall, error: RequestError, remove_session: bool, ) { // The Request has expired, remove the session. // Remove the associated nonce mapping. self.active_requests_nonce_mapping .remove(request_call.packet.message_nonce()); // Fail the current request let request_id = request_call.request.id; let _ = self .outbound_channel .send(HandlerResponse::RequestFailed(request_id, error.clone())) .await; let node_address = request_call .contact .node_address() .expect("All Request calls have been sanitized"); self.fail_session(&node_address, error, remove_session) .await; } /// Removes a session and updates associated metrics and fields. async fn fail_session( &mut self, node_address: &NodeAddress, error: RequestError, remove_session: bool, ) { if remove_session { self.sessions.remove(node_address); METRICS .active_sessions .store(self.sessions.len(), Ordering::Relaxed); } for request in self .pending_requests .remove(node_address) .unwrap_or_else(Vec::new) { let _ = self .outbound_channel .send(HandlerResponse::RequestFailed(request.1.id, error.clone())) .await; } } /// Sends a packet to the send handler to be encoded and sent. async fn send(&mut self, node_address: NodeAddress, packet: Packet) { let outbound_packet = socket::OutboundPacket { node_address, packet, }; let _ = self.socket.send.send(outbound_packet).await; } /// Check if any banned nodes have served their time and unban them. fn unban_nodes_check(&self) { PERMIT_BAN_LIST .write() .ban_ips .retain(|_, time| time.is_none() || Some(Instant::now()) < *time); PERMIT_BAN_LIST .write() .ban_nodes .retain(|_, time| time.is_none() || Some(Instant::now()) < *time); } }