Compare commits

..

1 Commits

Author SHA1 Message Date
peilun-conflux
b9684a4a3f
Merge f43f76dd42 into 82fef674ed 2024-08-22 10:42:33 +08:00
7 changed files with 33 additions and 55 deletions

View File

@ -25,7 +25,6 @@ pub mod types;
pub use config::gossip_max_size; pub use config::gossip_max_size;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Instant;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use shared_types::TxID; use shared_types::TxID;
@ -98,8 +97,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
/// Application level requests sent to the network. /// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub enum RequestId { pub enum RequestId {
Router(Instant), Router,
Sync(Instant, SyncId), Sync(SyncId),
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]

View File

@ -1,5 +1,4 @@
use std::net::IpAddr; use std::net::IpAddr;
use std::time::Instant;
use std::{ops::Neg, sync::Arc}; use std::{ops::Neg, sync::Arc};
use chunk_pool::ChunkPoolMessage; use chunk_pool::ChunkPoolMessage;
@ -154,7 +153,7 @@ impl Libp2pEventHandler {
self.send_to_network(NetworkMessage::SendRequest { self.send_to_network(NetworkMessage::SendRequest {
peer_id, peer_id,
request_id: RequestId::Router(Instant::now()), request_id: RequestId::Router,
request: Request::Status(status_message), request: Request::Status(status_message),
}); });
@ -239,22 +238,12 @@ impl Libp2pEventHandler {
match response { match response {
Response::Status(status_message) => { Response::Status(status_message) => {
debug!(%peer_id, ?status_message, "Received Status response"); debug!(%peer_id, ?status_message, "Received Status response");
match request_id {
RequestId::Router(since) => {
metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
metrics::LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY.update_since(since);
}
_ => unreachable!("All status response belong to router"),
}
self.on_status_response(peer_id, status_message); self.on_status_response(peer_id, status_message);
metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
} }
Response::Chunks(response) => { Response::Chunks(response) => {
let request_id = match request_id { let request_id = match request_id {
RequestId::Sync(since, sync_id) => { RequestId::Sync(sync_id) => sync_id,
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY.update_since(since);
sync_id
}
_ => unreachable!("All Chunks responses belong to sync"), _ => unreachable!("All Chunks responses belong to sync"),
}; };
@ -263,6 +252,8 @@ impl Libp2pEventHandler {
request_id, request_id,
response, response,
}); });
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
} }
Response::DataByHash(_) => { Response::DataByHash(_) => {
// ignore // ignore
@ -274,13 +265,11 @@ impl Libp2pEventHandler {
self.peers.write().await.update(&peer_id); self.peers.write().await.update(&peer_id);
// Check if the failed RPC belongs to sync // Check if the failed RPC belongs to sync
if let RequestId::Sync(since, request_id) = request_id { if let RequestId::Sync(request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError { self.send_to_sync(SyncMessage::RpcError {
peer_id, peer_id,
request_id, request_id,
}); });
metrics::LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY.update_since(since);
} }
metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1); metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1);
@ -883,7 +872,7 @@ mod tests {
}) => { }) => {
assert_eq!(peer_id, expected_peer_id); assert_eq!(peer_id, expected_peer_id);
assert!(matches!(request, Request::Status(..))); assert!(matches!(request, Request::Status(..)));
assert!(matches!(request_id, RequestId::Router(..))) assert!(matches!(request_id, RequestId::Router))
} }
Ok(_) => panic!("Unexpected network message type received"), Ok(_) => panic!("Unexpected network message type received"),
Err(e) => panic!("No network message received: {:?}", e), Err(e) => panic!("No network message received: {:?}", e),
@ -1049,7 +1038,7 @@ mod tests {
handler handler
.on_rpc_response( .on_rpc_response(
alice, alice,
RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }), RequestId::Sync(SyncId::SerialSync { tx_id: id }),
Response::Chunks(data.clone()), Response::Chunks(data.clone()),
) )
.await; .await;
@ -1077,10 +1066,7 @@ mod tests {
let alice = PeerId::random(); let alice = PeerId::random();
let id = TxID::random_hash(555); let id = TxID::random_hash(555);
handler handler
.on_rpc_error( .on_rpc_error(alice, RequestId::Sync(SyncId::SerialSync { tx_id: id }))
alice,
RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
)
.await; .await;
match ctx.sync_recv.try_recv() { match ctx.sync_recv.try_recv() {

View File

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use metrics::{register_meter, register_meter_with_group, Histogram, Meter, Sample}; use metrics::{register_meter, Histogram, Meter, Sample};
lazy_static::lazy_static! { lazy_static::lazy_static! {
// service // service
@ -11,10 +11,10 @@ lazy_static::lazy_static! {
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_already");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_ok");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_fail");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
@ -25,17 +25,14 @@ lazy_static::lazy_static! {
// libp2p_event_handler // libp2p_event_handler
pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status"); pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing"); pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_outgoing");
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming"); pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_incoming");
pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected"); pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status"); pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status");
pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks"); pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks");
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_status", "qps"); pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_status");
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_status", "latency", 1024); pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_get_chunks");
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_get_chunks", "qps"); pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_error");
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_get_chunks", "latency", 1024);
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file");
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks");
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file"); pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file");

View File

@ -18,12 +18,10 @@ pub struct FileSyncGoal {
pub index_start: u64, pub index_start: u64,
/// Chunk index to sync to (exclusive). /// Chunk index to sync to (exclusive).
pub index_end: u64, pub index_end: u64,
/// `true` if we are syncing all the needed data of this file.
pub all_chunks: bool,
} }
impl FileSyncGoal { impl FileSyncGoal {
pub fn new(num_chunks: u64, index_start: u64, index_end: u64, all_chunks: bool) -> Self { pub fn new(num_chunks: u64, index_start: u64, index_end: u64) -> Self {
assert!( assert!(
index_start < index_end && index_end <= num_chunks, index_start < index_end && index_end <= num_chunks,
"invalid index_end" "invalid index_end"
@ -32,16 +30,15 @@ impl FileSyncGoal {
num_chunks, num_chunks,
index_start, index_start,
index_end, index_end,
all_chunks,
} }
} }
pub fn new_file(num_chunks: u64) -> Self { pub fn new_file(num_chunks: u64) -> Self {
Self::new(num_chunks, 0, num_chunks, true) Self::new(num_chunks, 0, num_chunks)
} }
pub fn is_all_chunks(&self) -> bool { pub fn is_all_chunks(&self) -> bool {
self.all_chunks self.index_start == 0 && self.index_end == self.num_chunks
} }
} }

View File

@ -139,7 +139,7 @@ impl SerialSyncController {
if let Some((start, end)) = maybe_range { if let Some((start, end)) = maybe_range {
// Sync new chunks regardless of previously downloaded file or chunks. // Sync new chunks regardless of previously downloaded file or chunks.
// It's up to client to avoid duplicated chunks sync. // It's up to client to avoid duplicated chunks sync.
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end, false); self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end);
self.next_chunk = start; self.next_chunk = start;
} else if self.goal.is_all_chunks() { } else if self.goal.is_all_chunks() {
// retry the failed file sync at break point // retry the failed file sync at break point
@ -258,8 +258,7 @@ impl SerialSyncController {
// request next chunk array // request next chunk array
let from_chunk = self.next_chunk; let from_chunk = self.next_chunk;
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end); let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
let request_id = let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id });
network::RequestId::Sync(Instant::now(), RequestId::SerialSync { tx_id: self.tx_id });
// TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed. // TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed.
// We can wait for its commitment, but this will slow down this state machine. // We can wait for its commitment, but this will slow down this state machine.
// Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new // Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new
@ -884,7 +883,7 @@ mod tests {
); );
match request_id { match request_id {
network::RequestId::Sync(_, sync_id) => match sync_id { network::RequestId::Sync(sync_id) => match sync_id {
network::SyncId::SerialSync { tx_id } => { network::SyncId::SerialSync { tx_id } => {
assert_eq!(tx_id, controller.tx_id); assert_eq!(tx_id, controller.tx_id);
} }

View File

@ -635,8 +635,8 @@ impl SyncService {
bail!("File already exists"); bail!("File already exists");
} }
let (index_start, index_end, all_chunks) = match maybe_range { let (index_start, index_end) = match maybe_range {
Some((start, end)) => (start, end, false), Some((start, end)) => (start, end),
None => { None => {
let start = match Self::tx_sync_start_index(&self.store, &tx).await? { let start = match Self::tx_sync_start_index(&self.store, &tx).await? {
Some(s) => s, Some(s) => s,
@ -646,7 +646,7 @@ impl SyncService {
return Ok(()); return Ok(());
} }
}; };
(start, num_chunks, true) (start, num_chunks)
} }
}; };
@ -658,7 +658,7 @@ impl SyncService {
self.config, self.config,
tx.id(), tx.id(),
tx.start_entry_index(), tx.start_entry_index(),
FileSyncGoal::new(num_chunks, index_start, index_end, all_chunks), FileSyncGoal::new(num_chunks, index_start, index_end),
self.ctx.clone(), self.ctx.clone(),
self.store.clone(), self.store.clone(),
self.file_location_cache.clone(), self.file_location_cache.clone(),
@ -1750,7 +1750,7 @@ mod tests {
}; };
let sync_id = match request_id { let sync_id = match request_id {
network::RequestId::Sync(_, sync_id) => sync_id, network::RequestId::Sync(sync_id) => sync_id,
_ => unreachable!("All Chunks responses belong to sync"), _ => unreachable!("All Chunks responses belong to sync"),
}; };

View File

@ -68,7 +68,7 @@ class ZgsNode(TestNode):
os.mkdir(self.data_dir) os.mkdir(self.data_dir)
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"]) log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
with open(log_config_path, "w") as f: with open(log_config_path, "w") as f:
f.write("trace,hyper=info,h2=info") f.write("debug,hyper=info,h2=info")
initialize_toml_config(self.config_file, self.config) initialize_toml_config(self.config_file, self.config)