mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
4 Commits
b9684a4a3f
...
90e3fb744d
Author | SHA1 | Date | |
---|---|---|---|
![]() |
90e3fb744d | ||
![]() |
8b870a069c | ||
![]() |
f14a1b5975 | ||
![]() |
09ea9d7d7d |
@ -25,6 +25,7 @@ pub mod types;
|
||||
|
||||
pub use config::gossip_max_size;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Instant;
|
||||
|
||||
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
|
||||
use shared_types::TxID;
|
||||
@ -97,8 +98,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
|
||||
/// Application level requests sent to the network.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum RequestId {
|
||||
Router,
|
||||
Sync(SyncId),
|
||||
Router(Instant),
|
||||
Sync(Instant, SyncId),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::net::IpAddr;
|
||||
use std::time::Instant;
|
||||
use std::{ops::Neg, sync::Arc};
|
||||
|
||||
use chunk_pool::ChunkPoolMessage;
|
||||
@ -153,7 +154,7 @@ impl Libp2pEventHandler {
|
||||
|
||||
self.send_to_network(NetworkMessage::SendRequest {
|
||||
peer_id,
|
||||
request_id: RequestId::Router,
|
||||
request_id: RequestId::Router(Instant::now()),
|
||||
request: Request::Status(status_message),
|
||||
});
|
||||
|
||||
@ -238,12 +239,22 @@ impl Libp2pEventHandler {
|
||||
match response {
|
||||
Response::Status(status_message) => {
|
||||
debug!(%peer_id, ?status_message, "Received Status response");
|
||||
match request_id {
|
||||
RequestId::Router(since) => {
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY.update_since(since);
|
||||
}
|
||||
_ => unreachable!("All status response belong to router"),
|
||||
}
|
||||
self.on_status_response(peer_id, status_message);
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
|
||||
}
|
||||
Response::Chunks(response) => {
|
||||
let request_id = match request_id {
|
||||
RequestId::Sync(sync_id) => sync_id,
|
||||
RequestId::Sync(since, sync_id) => {
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY.update_since(since);
|
||||
sync_id
|
||||
}
|
||||
_ => unreachable!("All Chunks responses belong to sync"),
|
||||
};
|
||||
|
||||
@ -252,8 +263,6 @@ impl Libp2pEventHandler {
|
||||
request_id,
|
||||
response,
|
||||
});
|
||||
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
|
||||
}
|
||||
Response::DataByHash(_) => {
|
||||
// ignore
|
||||
@ -265,11 +274,13 @@ impl Libp2pEventHandler {
|
||||
self.peers.write().await.update(&peer_id);
|
||||
|
||||
// Check if the failed RPC belongs to sync
|
||||
if let RequestId::Sync(request_id) = request_id {
|
||||
if let RequestId::Sync(since, request_id) = request_id {
|
||||
self.send_to_sync(SyncMessage::RpcError {
|
||||
peer_id,
|
||||
request_id,
|
||||
});
|
||||
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY.update_since(since);
|
||||
}
|
||||
|
||||
metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1);
|
||||
@ -872,7 +883,7 @@ mod tests {
|
||||
}) => {
|
||||
assert_eq!(peer_id, expected_peer_id);
|
||||
assert!(matches!(request, Request::Status(..)));
|
||||
assert!(matches!(request_id, RequestId::Router))
|
||||
assert!(matches!(request_id, RequestId::Router(..)))
|
||||
}
|
||||
Ok(_) => panic!("Unexpected network message type received"),
|
||||
Err(e) => panic!("No network message received: {:?}", e),
|
||||
@ -1038,7 +1049,7 @@ mod tests {
|
||||
handler
|
||||
.on_rpc_response(
|
||||
alice,
|
||||
RequestId::Sync(SyncId::SerialSync { tx_id: id }),
|
||||
RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
|
||||
Response::Chunks(data.clone()),
|
||||
)
|
||||
.await;
|
||||
@ -1066,7 +1077,10 @@ mod tests {
|
||||
let alice = PeerId::random();
|
||||
let id = TxID::random_hash(555);
|
||||
handler
|
||||
.on_rpc_error(alice, RequestId::Sync(SyncId::SerialSync { tx_id: id }))
|
||||
.on_rpc_error(
|
||||
alice,
|
||||
RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
|
||||
)
|
||||
.await;
|
||||
|
||||
match ctx.sync_recv.try_recv() {
|
||||
|
@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_meter, Histogram, Meter, Sample};
|
||||
use metrics::{register_meter, register_meter_with_group, Histogram, Meter, Sample};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
// service
|
||||
@ -11,10 +11,10 @@ lazy_static::lazy_static! {
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_already");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_ok");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_fail");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
|
||||
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
|
||||
|
||||
@ -25,14 +25,17 @@ lazy_static::lazy_static! {
|
||||
// libp2p_event_handler
|
||||
pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
|
||||
|
||||
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_outgoing");
|
||||
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_incoming");
|
||||
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing");
|
||||
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming");
|
||||
pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
|
||||
pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status");
|
||||
pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks");
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_status");
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_get_chunks");
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_error");
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_status", "qps");
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_status", "latency", 1024);
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_get_chunks", "qps");
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_get_chunks", "latency", 1024);
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
|
||||
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
|
||||
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file");
|
||||
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks");
|
||||
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file");
|
||||
|
@ -18,10 +18,12 @@ pub struct FileSyncGoal {
|
||||
pub index_start: u64,
|
||||
/// Chunk index to sync to (exclusive).
|
||||
pub index_end: u64,
|
||||
/// `true` if we are syncing all the needed data of this file.
|
||||
pub all_chunks: bool,
|
||||
}
|
||||
|
||||
impl FileSyncGoal {
|
||||
pub fn new(num_chunks: u64, index_start: u64, index_end: u64) -> Self {
|
||||
pub fn new(num_chunks: u64, index_start: u64, index_end: u64, all_chunks: bool) -> Self {
|
||||
assert!(
|
||||
index_start < index_end && index_end <= num_chunks,
|
||||
"invalid index_end"
|
||||
@ -30,15 +32,16 @@ impl FileSyncGoal {
|
||||
num_chunks,
|
||||
index_start,
|
||||
index_end,
|
||||
all_chunks,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_file(num_chunks: u64) -> Self {
|
||||
Self::new(num_chunks, 0, num_chunks)
|
||||
Self::new(num_chunks, 0, num_chunks, true)
|
||||
}
|
||||
|
||||
pub fn is_all_chunks(&self) -> bool {
|
||||
self.index_start == 0 && self.index_end == self.num_chunks
|
||||
self.all_chunks
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,7 +139,7 @@ impl SerialSyncController {
|
||||
if let Some((start, end)) = maybe_range {
|
||||
// Sync new chunks regardless of previously downloaded file or chunks.
|
||||
// It's up to client to avoid duplicated chunks sync.
|
||||
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end);
|
||||
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end, false);
|
||||
self.next_chunk = start;
|
||||
} else if self.goal.is_all_chunks() {
|
||||
// retry the failed file sync at break point
|
||||
@ -258,7 +258,8 @@ impl SerialSyncController {
|
||||
// request next chunk array
|
||||
let from_chunk = self.next_chunk;
|
||||
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
|
||||
let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id });
|
||||
let request_id =
|
||||
network::RequestId::Sync(Instant::now(), RequestId::SerialSync { tx_id: self.tx_id });
|
||||
// TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed.
|
||||
// We can wait for its commitment, but this will slow down this state machine.
|
||||
// Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new
|
||||
@ -883,7 +884,7 @@ mod tests {
|
||||
);
|
||||
|
||||
match request_id {
|
||||
network::RequestId::Sync(sync_id) => match sync_id {
|
||||
network::RequestId::Sync(_, sync_id) => match sync_id {
|
||||
network::SyncId::SerialSync { tx_id } => {
|
||||
assert_eq!(tx_id, controller.tx_id);
|
||||
}
|
||||
|
@ -635,8 +635,8 @@ impl SyncService {
|
||||
bail!("File already exists");
|
||||
}
|
||||
|
||||
let (index_start, index_end) = match maybe_range {
|
||||
Some((start, end)) => (start, end),
|
||||
let (index_start, index_end, all_chunks) = match maybe_range {
|
||||
Some((start, end)) => (start, end, false),
|
||||
None => {
|
||||
let start = match Self::tx_sync_start_index(&self.store, &tx).await? {
|
||||
Some(s) => s,
|
||||
@ -646,7 +646,7 @@ impl SyncService {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
(start, num_chunks)
|
||||
(start, num_chunks, true)
|
||||
}
|
||||
};
|
||||
|
||||
@ -658,7 +658,7 @@ impl SyncService {
|
||||
self.config,
|
||||
tx.id(),
|
||||
tx.start_entry_index(),
|
||||
FileSyncGoal::new(num_chunks, index_start, index_end),
|
||||
FileSyncGoal::new(num_chunks, index_start, index_end, all_chunks),
|
||||
self.ctx.clone(),
|
||||
self.store.clone(),
|
||||
self.file_location_cache.clone(),
|
||||
@ -1750,7 +1750,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let sync_id = match request_id {
|
||||
network::RequestId::Sync(sync_id) => sync_id,
|
||||
network::RequestId::Sync(_, sync_id) => sync_id,
|
||||
_ => unreachable!("All Chunks responses belong to sync"),
|
||||
};
|
||||
|
||||
|
@ -68,7 +68,7 @@ class ZgsNode(TestNode):
|
||||
os.mkdir(self.data_dir)
|
||||
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
|
||||
with open(log_config_path, "w") as f:
|
||||
f.write("debug,hyper=info,h2=info")
|
||||
f.write("trace,hyper=info,h2=info")
|
||||
|
||||
initialize_toml_config(self.config_file, self.config)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user