mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 00:27:39 +00:00 
			
		
		
		
	add timestamp in p2p rpc request id for latency stat (#167)
	
		
			
	
		
	
	
		
	
		
			Some checks are pending
		
		
	
	
		
			
				
	
				abi-consistent-check / build-and-compare (push) Waiting to run
				
			
		
			
				
	
				code-coverage / unittest-cov (push) Waiting to run
				
			
		
			
				
	
				rust / check (push) Waiting to run
				
			
		
			
				
	
				rust / test (push) Waiting to run
				
			
		
			
				
	
				rust / lints (push) Waiting to run
				
			
		
			
				
	
				functional-test / test (push) Waiting to run
				
			
		
		
	
	
				
					
				
			
		
			Some checks are pending
		
		
	
	abi-consistent-check / build-and-compare (push) Waiting to run
				
			code-coverage / unittest-cov (push) Waiting to run
				
			rust / check (push) Waiting to run
				
			rust / test (push) Waiting to run
				
			rust / lints (push) Waiting to run
				
			functional-test / test (push) Waiting to run
				
			* add timestamp in p2p rpc request id for latency stat * fix test compilation error
This commit is contained in:
		
							parent
							
								
									82fef674ed
								
							
						
					
					
						commit
						f14a1b5975
					
				@ -25,6 +25,7 @@ pub mod types;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
pub use config::gossip_max_size;
 | 
					pub use config::gossip_max_size;
 | 
				
			||||||
use std::net::SocketAddr;
 | 
					use std::net::SocketAddr;
 | 
				
			||||||
 | 
					use std::time::Instant;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
 | 
					use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
 | 
				
			||||||
use shared_types::TxID;
 | 
					use shared_types::TxID;
 | 
				
			||||||
@ -97,8 +98,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
 | 
				
			|||||||
/// Application level requests sent to the network.
 | 
					/// Application level requests sent to the network.
 | 
				
			||||||
#[derive(Debug, Clone, Copy)]
 | 
					#[derive(Debug, Clone, Copy)]
 | 
				
			||||||
pub enum RequestId {
 | 
					pub enum RequestId {
 | 
				
			||||||
    Router,
 | 
					    Router(Instant),
 | 
				
			||||||
    Sync(SyncId),
 | 
					    Sync(Instant, SyncId),
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Debug, Clone, Copy)]
 | 
					#[derive(Debug, Clone, Copy)]
 | 
				
			||||||
 | 
				
			|||||||
@ -1,4 +1,5 @@
 | 
				
			|||||||
use std::net::IpAddr;
 | 
					use std::net::IpAddr;
 | 
				
			||||||
 | 
					use std::time::Instant;
 | 
				
			||||||
use std::{ops::Neg, sync::Arc};
 | 
					use std::{ops::Neg, sync::Arc};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use chunk_pool::ChunkPoolMessage;
 | 
					use chunk_pool::ChunkPoolMessage;
 | 
				
			||||||
@ -153,7 +154,7 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        self.send_to_network(NetworkMessage::SendRequest {
 | 
					        self.send_to_network(NetworkMessage::SendRequest {
 | 
				
			||||||
            peer_id,
 | 
					            peer_id,
 | 
				
			||||||
            request_id: RequestId::Router,
 | 
					            request_id: RequestId::Router(Instant::now()),
 | 
				
			||||||
            request: Request::Status(status_message),
 | 
					            request: Request::Status(status_message),
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -238,12 +239,22 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
        match response {
 | 
					        match response {
 | 
				
			||||||
            Response::Status(status_message) => {
 | 
					            Response::Status(status_message) => {
 | 
				
			||||||
                debug!(%peer_id, ?status_message, "Received Status response");
 | 
					                debug!(%peer_id, ?status_message, "Received Status response");
 | 
				
			||||||
                self.on_status_response(peer_id, status_message);
 | 
					                match request_id {
 | 
				
			||||||
 | 
					                    RequestId::Router(since) => {
 | 
				
			||||||
                        metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
 | 
					                        metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
 | 
				
			||||||
 | 
					                        metrics::LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY.update_since(since);
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    _ => unreachable!("All status response belong to router"),
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					                self.on_status_response(peer_id, status_message);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            Response::Chunks(response) => {
 | 
					            Response::Chunks(response) => {
 | 
				
			||||||
                let request_id = match request_id {
 | 
					                let request_id = match request_id {
 | 
				
			||||||
                    RequestId::Sync(sync_id) => sync_id,
 | 
					                    RequestId::Sync(since, sync_id) => {
 | 
				
			||||||
 | 
					                        metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
 | 
				
			||||||
 | 
					                        metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY.update_since(since);
 | 
				
			||||||
 | 
					                        sync_id
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
                    _ => unreachable!("All Chunks responses belong to sync"),
 | 
					                    _ => unreachable!("All Chunks responses belong to sync"),
 | 
				
			||||||
                };
 | 
					                };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -252,8 +263,6 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
                    request_id,
 | 
					                    request_id,
 | 
				
			||||||
                    response,
 | 
					                    response,
 | 
				
			||||||
                });
 | 
					                });
 | 
				
			||||||
 | 
					 | 
				
			||||||
                metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
 | 
					 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            Response::DataByHash(_) => {
 | 
					            Response::DataByHash(_) => {
 | 
				
			||||||
                // ignore
 | 
					                // ignore
 | 
				
			||||||
@ -265,11 +274,13 @@ impl Libp2pEventHandler {
 | 
				
			|||||||
        self.peers.write().await.update(&peer_id);
 | 
					        self.peers.write().await.update(&peer_id);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // Check if the failed RPC belongs to sync
 | 
					        // Check if the failed RPC belongs to sync
 | 
				
			||||||
        if let RequestId::Sync(request_id) = request_id {
 | 
					        if let RequestId::Sync(since, request_id) = request_id {
 | 
				
			||||||
            self.send_to_sync(SyncMessage::RpcError {
 | 
					            self.send_to_sync(SyncMessage::RpcError {
 | 
				
			||||||
                peer_id,
 | 
					                peer_id,
 | 
				
			||||||
                request_id,
 | 
					                request_id,
 | 
				
			||||||
            });
 | 
					            });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            metrics::LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY.update_since(since);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1);
 | 
					        metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1);
 | 
				
			||||||
@ -872,7 +883,7 @@ mod tests {
 | 
				
			|||||||
                }) => {
 | 
					                }) => {
 | 
				
			||||||
                    assert_eq!(peer_id, expected_peer_id);
 | 
					                    assert_eq!(peer_id, expected_peer_id);
 | 
				
			||||||
                    assert!(matches!(request, Request::Status(..)));
 | 
					                    assert!(matches!(request, Request::Status(..)));
 | 
				
			||||||
                    assert!(matches!(request_id, RequestId::Router))
 | 
					                    assert!(matches!(request_id, RequestId::Router(..)))
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                Ok(_) => panic!("Unexpected network message type received"),
 | 
					                Ok(_) => panic!("Unexpected network message type received"),
 | 
				
			||||||
                Err(e) => panic!("No network message received: {:?}", e),
 | 
					                Err(e) => panic!("No network message received: {:?}", e),
 | 
				
			||||||
@ -1038,7 +1049,7 @@ mod tests {
 | 
				
			|||||||
        handler
 | 
					        handler
 | 
				
			||||||
            .on_rpc_response(
 | 
					            .on_rpc_response(
 | 
				
			||||||
                alice,
 | 
					                alice,
 | 
				
			||||||
                RequestId::Sync(SyncId::SerialSync { tx_id: id }),
 | 
					                RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
 | 
				
			||||||
                Response::Chunks(data.clone()),
 | 
					                Response::Chunks(data.clone()),
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            .await;
 | 
					            .await;
 | 
				
			||||||
@ -1066,7 +1077,10 @@ mod tests {
 | 
				
			|||||||
        let alice = PeerId::random();
 | 
					        let alice = PeerId::random();
 | 
				
			||||||
        let id = TxID::random_hash(555);
 | 
					        let id = TxID::random_hash(555);
 | 
				
			||||||
        handler
 | 
					        handler
 | 
				
			||||||
            .on_rpc_error(alice, RequestId::Sync(SyncId::SerialSync { tx_id: id }))
 | 
					            .on_rpc_error(
 | 
				
			||||||
 | 
					                alice,
 | 
				
			||||||
 | 
					                RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
            .await;
 | 
					            .await;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        match ctx.sync_recv.try_recv() {
 | 
					        match ctx.sync_recv.try_recv() {
 | 
				
			||||||
 | 
				
			|||||||
@ -1,6 +1,6 @@
 | 
				
			|||||||
use std::sync::Arc;
 | 
					use std::sync::Arc;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use metrics::{register_meter, Histogram, Meter, Sample};
 | 
					use metrics::{register_meter, register_meter_with_group, Histogram, Meter, Sample};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
lazy_static::lazy_static! {
 | 
					lazy_static::lazy_static! {
 | 
				
			||||||
    // service
 | 
					    // service
 | 
				
			||||||
@ -11,10 +11,10 @@ lazy_static::lazy_static! {
 | 
				
			|||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_already");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_ok");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_fail");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
 | 
				
			||||||
    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
 | 
					    pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -25,14 +25,17 @@ lazy_static::lazy_static! {
 | 
				
			|||||||
    // libp2p_event_handler
 | 
					    // libp2p_event_handler
 | 
				
			||||||
    pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
 | 
					    pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_outgoing");
 | 
					    pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_incoming");
 | 
					    pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
 | 
					    pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status");
 | 
					    pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks");
 | 
					    pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_status");
 | 
					    pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_status", "qps");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_get_chunks");
 | 
					    pub static ref LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_status", "latency", 1024);
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_error");
 | 
					    pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_get_chunks", "qps");
 | 
				
			||||||
 | 
					    pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_get_chunks", "latency", 1024);
 | 
				
			||||||
 | 
					    pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
 | 
				
			||||||
 | 
					    pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file");
 | 
					    pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks");
 | 
					    pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks");
 | 
				
			||||||
    pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file");
 | 
					    pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file");
 | 
				
			||||||
 | 
				
			|||||||
@ -258,7 +258,8 @@ impl SerialSyncController {
 | 
				
			|||||||
        // request next chunk array
 | 
					        // request next chunk array
 | 
				
			||||||
        let from_chunk = self.next_chunk;
 | 
					        let from_chunk = self.next_chunk;
 | 
				
			||||||
        let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
 | 
					        let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
 | 
				
			||||||
        let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id });
 | 
					        let request_id =
 | 
				
			||||||
 | 
					            network::RequestId::Sync(Instant::now(), RequestId::SerialSync { tx_id: self.tx_id });
 | 
				
			||||||
        // TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed.
 | 
					        // TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed.
 | 
				
			||||||
        // We can wait for its commitment, but this will slow down this state machine.
 | 
					        // We can wait for its commitment, but this will slow down this state machine.
 | 
				
			||||||
        // Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new
 | 
					        // Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new
 | 
				
			||||||
@ -884,7 +885,7 @@ mod tests {
 | 
				
			|||||||
                    );
 | 
					                    );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    match request_id {
 | 
					                    match request_id {
 | 
				
			||||||
                        network::RequestId::Sync(sync_id) => match sync_id {
 | 
					                        network::RequestId::Sync(_, sync_id) => match sync_id {
 | 
				
			||||||
                            network::SyncId::SerialSync { tx_id } => {
 | 
					                            network::SyncId::SerialSync { tx_id } => {
 | 
				
			||||||
                                assert_eq!(tx_id, controller.tx_id);
 | 
					                                assert_eq!(tx_id, controller.tx_id);
 | 
				
			||||||
                            }
 | 
					                            }
 | 
				
			||||||
 | 
				
			|||||||
@ -1709,7 +1709,7 @@ mod tests {
 | 
				
			|||||||
                    };
 | 
					                    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    let sync_id = match request_id {
 | 
					                    let sync_id = match request_id {
 | 
				
			||||||
                        network::RequestId::Sync(sync_id) => sync_id,
 | 
					                        network::RequestId::Sync(_, sync_id) => sync_id,
 | 
				
			||||||
                        _ => unreachable!("All Chunks responses belong to sync"),
 | 
					                        _ => unreachable!("All Chunks responses belong to sync"),
 | 
				
			||||||
                    };
 | 
					                    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user