Compare commits

...

5 Commits

Author SHA1 Message Date
peilun-conflux
90e3fb744d
Merge 8b870a069c into f14a1b5975 2024-08-22 07:05:13 +00:00
Peilun Li
8b870a069c nit. 2024-08-22 15:05:03 +08:00
Bo QIU
f14a1b5975
add timestamp in p2p rpc request id for latency stat (#167)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* add timestamp in p2p rpc request id for latency stat

* fix test compilation error
2024-08-22 14:34:10 +08:00
Peilun Li
09ea9d7d7d Fix test. 2024-08-22 14:25:06 +08:00
Peilun Li
f43f76dd42 Set sync start index based on data in db. 2024-08-21 19:31:46 +08:00
8 changed files with 124 additions and 55 deletions

View File

@ -25,6 +25,7 @@ pub mod types;
pub use config::gossip_max_size; pub use config::gossip_max_size;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Instant;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use shared_types::TxID; use shared_types::TxID;
@ -97,8 +98,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
/// Application level requests sent to the network. /// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub enum RequestId { pub enum RequestId {
Router, Router(Instant),
Sync(SyncId), Sync(Instant, SyncId),
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]

View File

@ -1,4 +1,5 @@
use std::net::IpAddr; use std::net::IpAddr;
use std::time::Instant;
use std::{ops::Neg, sync::Arc}; use std::{ops::Neg, sync::Arc};
use chunk_pool::ChunkPoolMessage; use chunk_pool::ChunkPoolMessage;
@ -153,7 +154,7 @@ impl Libp2pEventHandler {
self.send_to_network(NetworkMessage::SendRequest { self.send_to_network(NetworkMessage::SendRequest {
peer_id, peer_id,
request_id: RequestId::Router, request_id: RequestId::Router(Instant::now()),
request: Request::Status(status_message), request: Request::Status(status_message),
}); });
@ -238,12 +239,22 @@ impl Libp2pEventHandler {
match response { match response {
Response::Status(status_message) => { Response::Status(status_message) => {
debug!(%peer_id, ?status_message, "Received Status response"); debug!(%peer_id, ?status_message, "Received Status response");
self.on_status_response(peer_id, status_message); match request_id {
RequestId::Router(since) => {
metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1); metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
metrics::LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY.update_since(since);
}
_ => unreachable!("All status response belong to router"),
}
self.on_status_response(peer_id, status_message);
} }
Response::Chunks(response) => { Response::Chunks(response) => {
let request_id = match request_id { let request_id = match request_id {
RequestId::Sync(sync_id) => sync_id, RequestId::Sync(since, sync_id) => {
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY.update_since(since);
sync_id
}
_ => unreachable!("All Chunks responses belong to sync"), _ => unreachable!("All Chunks responses belong to sync"),
}; };
@ -252,8 +263,6 @@ impl Libp2pEventHandler {
request_id, request_id,
response, response,
}); });
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
} }
Response::DataByHash(_) => { Response::DataByHash(_) => {
// ignore // ignore
@ -265,11 +274,13 @@ impl Libp2pEventHandler {
self.peers.write().await.update(&peer_id); self.peers.write().await.update(&peer_id);
// Check if the failed RPC belongs to sync // Check if the failed RPC belongs to sync
if let RequestId::Sync(request_id) = request_id { if let RequestId::Sync(since, request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError { self.send_to_sync(SyncMessage::RpcError {
peer_id, peer_id,
request_id, request_id,
}); });
metrics::LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY.update_since(since);
} }
metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1); metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1);
@ -872,7 +883,7 @@ mod tests {
}) => { }) => {
assert_eq!(peer_id, expected_peer_id); assert_eq!(peer_id, expected_peer_id);
assert!(matches!(request, Request::Status(..))); assert!(matches!(request, Request::Status(..)));
assert!(matches!(request_id, RequestId::Router)) assert!(matches!(request_id, RequestId::Router(..)))
} }
Ok(_) => panic!("Unexpected network message type received"), Ok(_) => panic!("Unexpected network message type received"),
Err(e) => panic!("No network message received: {:?}", e), Err(e) => panic!("No network message received: {:?}", e),
@ -1038,7 +1049,7 @@ mod tests {
handler handler
.on_rpc_response( .on_rpc_response(
alice, alice,
RequestId::Sync(SyncId::SerialSync { tx_id: id }), RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
Response::Chunks(data.clone()), Response::Chunks(data.clone()),
) )
.await; .await;
@ -1066,7 +1077,10 @@ mod tests {
let alice = PeerId::random(); let alice = PeerId::random();
let id = TxID::random_hash(555); let id = TxID::random_hash(555);
handler handler
.on_rpc_error(alice, RequestId::Sync(SyncId::SerialSync { tx_id: id })) .on_rpc_error(
alice,
RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
)
.await; .await;
match ctx.sync_recv.try_recv() { match ctx.sync_recv.try_recv() {

View File

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use metrics::{register_meter, Histogram, Meter, Sample}; use metrics::{register_meter, register_meter_with_group, Histogram, Meter, Sample};
lazy_static::lazy_static! { lazy_static::lazy_static! {
// service // service
@ -11,10 +11,10 @@ lazy_static::lazy_static! {
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_already"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_ok"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter("router_service_route_network_message_dail_peer_new_fail"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp"); pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
@ -25,14 +25,17 @@ lazy_static::lazy_static! {
// libp2p_event_handler // libp2p_event_handler
pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status"); pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_outgoing"); pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing");
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_connected_incoming"); pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming");
pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected"); pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status"); pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status");
pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks"); pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks");
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_status"); pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_status", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_get_chunks"); pub static ref LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_status", "latency", 1024);
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter("router_libp2p_handle_response_error"); pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_get_chunks", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_get_chunks", "latency", 1024);
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file");
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks"); pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks");
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file"); pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file");

View File

@ -609,9 +609,9 @@ impl LogManager {
.get_tx_by_seq_number(last_tx_seq)? .get_tx_by_seq_number(last_tx_seq)?
.expect("tx missing"); .expect("tx missing");
let mut current_len = initial_data.leaves(); let mut current_len = initial_data.leaves();
let expected_len = (last_tx.start_entry_index + last_tx.num_entries() as u64) let expected_len =
/ PORA_CHUNK_SIZE as u64; sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
match expected_len.cmp(&(current_len as u64)) { match expected_len.cmp(&(current_len)) {
Ordering::Less => { Ordering::Less => {
bail!( bail!(
"Unexpected DB: merkle tree larger than the known data size,\ "Unexpected DB: merkle tree larger than the known data size,\
@ -634,10 +634,9 @@ impl LogManager {
let previous_tx = tx_store let previous_tx = tx_store
.get_tx_by_seq_number(last_tx_seq - 1)? .get_tx_by_seq_number(last_tx_seq - 1)?
.expect("tx missing"); .expect("tx missing");
let expected_len = ((previous_tx.start_entry_index let expected_len = sector_to_segment(
+ previous_tx.num_entries() as u64) previous_tx.start_entry_index + previous_tx.num_entries() as u64,
/ PORA_CHUNK_SIZE as u64) );
as usize;
if current_len > expected_len { if current_len > expected_len {
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop() while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
{ {
@ -737,13 +736,13 @@ impl LogManager {
maybe_tx_seq: Option<u64>, maybe_tx_seq: Option<u64>,
) -> Result<FlowProof> { ) -> Result<FlowProof> {
let merkle = self.merkle.read_recursive(); let merkle = self.merkle.read_recursive();
let chunk_index = flow_index / PORA_CHUNK_SIZE as u64; let seg_index = sector_to_segment(flow_index);
let top_proof = match maybe_tx_seq { let top_proof = match maybe_tx_seq {
None => merkle.pora_chunks_merkle.gen_proof(chunk_index as usize)?, None => merkle.pora_chunks_merkle.gen_proof(seg_index)?,
Some(tx_seq) => merkle Some(tx_seq) => merkle
.pora_chunks_merkle .pora_chunks_merkle
.at_version(tx_seq)? .at_version(tx_seq)?
.gen_proof(chunk_index as usize)?, .gen_proof(seg_index)?,
}; };
// TODO(zz): Maybe we can decide that all proofs are at the PoRA chunk level, so // TODO(zz): Maybe we can decide that all proofs are at the PoRA chunk level, so
@ -753,11 +752,11 @@ impl LogManager {
// and `flow_index` must be within a complete PoRA chunk. For possible future usages, // and `flow_index` must be within a complete PoRA chunk. For possible future usages,
// we'll need to find the flow length at the given root and load a partial chunk // we'll need to find the flow length at the given root and load a partial chunk
// if `flow_index` is in the last chunk. // if `flow_index` is in the last chunk.
let sub_proof = if chunk_index as usize != merkle.pora_chunks_merkle.leaves() - 1 let sub_proof = if seg_index != merkle.pora_chunks_merkle.leaves() - 1
|| merkle.last_chunk_merkle.leaves() == 0 || merkle.last_chunk_merkle.leaves() == 0
{ {
self.flow_store self.flow_store
.gen_proof_in_batch(chunk_index as usize, flow_index as usize % PORA_CHUNK_SIZE)? .gen_proof_in_batch(seg_index, flow_index as usize % PORA_CHUNK_SIZE)?
} else { } else {
match maybe_tx_seq { match maybe_tx_seq {
None => merkle None => merkle
@ -1236,3 +1235,11 @@ pub fn tx_subtree_root_list_padded(data: &[u8]) -> Vec<(usize, DataRoot)> {
root_list root_list
} }
pub fn sector_to_segment(sector_index: u64) -> usize {
(sector_index / PORA_CHUNK_SIZE as u64) as usize
}
pub fn segment_to_sector(segment_index: usize) -> usize {
segment_index * PORA_CHUNK_SIZE
}

View File

@ -18,10 +18,12 @@ pub struct FileSyncGoal {
pub index_start: u64, pub index_start: u64,
/// Chunk index to sync to (exclusive). /// Chunk index to sync to (exclusive).
pub index_end: u64, pub index_end: u64,
/// `true` if we are syncing all the needed data of this file.
pub all_chunks: bool,
} }
impl FileSyncGoal { impl FileSyncGoal {
pub fn new(num_chunks: u64, index_start: u64, index_end: u64) -> Self { pub fn new(num_chunks: u64, index_start: u64, index_end: u64, all_chunks: bool) -> Self {
assert!( assert!(
index_start < index_end && index_end <= num_chunks, index_start < index_end && index_end <= num_chunks,
"invalid index_end" "invalid index_end"
@ -30,15 +32,16 @@ impl FileSyncGoal {
num_chunks, num_chunks,
index_start, index_start,
index_end, index_end,
all_chunks,
} }
} }
pub fn new_file(num_chunks: u64) -> Self { pub fn new_file(num_chunks: u64) -> Self {
Self::new(num_chunks, 0, num_chunks) Self::new(num_chunks, 0, num_chunks, true)
} }
pub fn is_all_chunks(&self) -> bool { pub fn is_all_chunks(&self) -> bool {
self.index_start == 0 && self.index_end == self.num_chunks self.all_chunks
} }
} }

View File

@ -12,7 +12,7 @@ use network::{
use rand::Rng; use rand::Rng;
use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE}; use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
use std::{sync::Arc, time::Instant}; use std::{sync::Arc, time::Instant};
use storage::log_store::log_manager::PORA_CHUNK_SIZE; use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage_async::Store; use storage_async::Store;
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
@ -139,7 +139,7 @@ impl SerialSyncController {
if let Some((start, end)) = maybe_range { if let Some((start, end)) = maybe_range {
// Sync new chunks regardless of previously downloaded file or chunks. // Sync new chunks regardless of previously downloaded file or chunks.
// It's up to client to avoid duplicated chunks sync. // It's up to client to avoid duplicated chunks sync.
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end); self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end, false);
self.next_chunk = start; self.next_chunk = start;
} else if self.goal.is_all_chunks() { } else if self.goal.is_all_chunks() {
// retry the failed file sync at break point // retry the failed file sync at break point
@ -258,7 +258,8 @@ impl SerialSyncController {
// request next chunk array // request next chunk array
let from_chunk = self.next_chunk; let from_chunk = self.next_chunk;
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end); let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id }); let request_id =
network::RequestId::Sync(Instant::now(), RequestId::SerialSync { tx_id: self.tx_id });
// TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed. // TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed.
// We can wait for its commitment, but this will slow down this state machine. // We can wait for its commitment, but this will slow down this state machine.
// Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new // Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new
@ -478,10 +479,10 @@ impl SerialSyncController {
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0); metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
let shard_config = self.store.get_store().flow().get_shard_config(); let shard_config = self.store.get_store().flow().get_shard_config();
let next_chunk = shard_config.next_segment_index( let next_chunk = segment_to_sector(shard_config.next_segment_index(
(from_chunk / PORA_CHUNK_SIZE as u64) as usize, sector_to_segment(from_chunk),
(self.tx_start_chunk_in_flow / PORA_CHUNK_SIZE as u64) as usize, sector_to_segment(self.tx_start_chunk_in_flow),
) * PORA_CHUNK_SIZE; ));
// store in db // store in db
match self match self
.store .store
@ -575,12 +576,11 @@ impl SerialSyncController {
/// Randomly select a `Connected` peer to sync chunks. /// Randomly select a `Connected` peer to sync chunks.
fn select_peer_for_request(&self, request: &GetChunksRequest) -> Option<PeerId> { fn select_peer_for_request(&self, request: &GetChunksRequest) -> Option<PeerId> {
let segment_index = let segment_index = sector_to_segment(request.index_start + self.tx_start_chunk_in_flow);
(request.index_start + self.tx_start_chunk_in_flow) / PORA_CHUNK_SIZE as u64;
let mut peers = self.peers.filter_peers(vec![PeerState::Connected]); let mut peers = self.peers.filter_peers(vec![PeerState::Connected]);
peers.retain(|peer_id| match self.peers.shard_config(peer_id) { peers.retain(|peer_id| match self.peers.shard_config(peer_id) {
Some(v) => v.in_range(segment_index), Some(v) => v.in_range(segment_index as u64),
None => false, None => false,
}); });
@ -884,7 +884,7 @@ mod tests {
); );
match request_id { match request_id {
network::RequestId::Sync(sync_id) => match sync_id { network::RequestId::Sync(_, sync_id) => match sync_id {
network::SyncId::SerialSync { tx_id } => { network::SyncId::SerialSync { tx_id } => {
assert_eq!(tx_id, controller.tx_id); assert_eq!(tx_id, controller.tx_id);
} }

View File

@ -4,7 +4,7 @@ use crate::controllers::{
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState, FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
}; };
use crate::{Config, SyncServiceState}; use crate::{Config, SyncServiceState};
use anyhow::{bail, Result}; use anyhow::{anyhow, bail, Result};
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError; use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent; use log_entry_sync::LogSyncEvent;
@ -14,14 +14,16 @@ use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId, rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
PeerRequestId, SyncId as RequestId, PeerRequestId, SyncId as RequestId,
}; };
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID}; use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::{ use std::{
cmp,
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
sync::Arc, sync::Arc,
}; };
use storage::config::ShardConfig; use storage::config::ShardConfig;
use storage::error::Result as StorageResult; use storage::error::Result as StorageResult;
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage::log_store::Store as LogStore; use storage::log_store::Store as LogStore;
use storage_async::Store; use storage_async::Store;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
@ -633,9 +635,19 @@ impl SyncService {
bail!("File already exists"); bail!("File already exists");
} }
let (index_start, index_end) = match maybe_range { let (index_start, index_end, all_chunks) = match maybe_range {
Some((start, end)) => (start, end), Some((start, end)) => (start, end, false),
None => (0, num_chunks), None => {
let start = match Self::tx_sync_start_index(&self.store, &tx).await? {
Some(s) => s,
None => {
debug!(%tx.seq, "No more data needed");
self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?;
return Ok(());
}
};
(start, num_chunks, true)
}
}; };
if index_start >= index_end || index_end > num_chunks { if index_start >= index_end || index_end > num_chunks {
@ -646,7 +658,7 @@ impl SyncService {
self.config, self.config,
tx.id(), tx.id(),
tx.start_entry_index(), tx.start_entry_index(),
FileSyncGoal::new(num_chunks, index_start, index_end), FileSyncGoal::new(num_chunks, index_start, index_end, all_chunks),
self.ctx.clone(), self.ctx.clone(),
self.store.clone(), self.store.clone(),
self.file_location_cache.clone(), self.file_location_cache.clone(),
@ -793,6 +805,35 @@ impl SyncService {
self.controllers.remove(&tx_seq); self.controllers.remove(&tx_seq);
} }
} }
async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
let shard_config = store.get_store().flow().get_shard_config();
let start_segment = sector_to_segment(tx.start_entry_index());
let end =
bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);
let mut start = if shard_config.in_range(start_segment as u64) {
0
} else {
segment_to_sector(shard_config.next_segment_index(0, start_segment))
};
while start < end {
if store
.get_chunks_by_tx_and_index_range(
tx.seq,
start,
cmp::min(start + PORA_CHUNK_SIZE, end),
)
.await?
.is_none()
{
return Ok(Some(start as u64));
}
start = segment_to_sector(
shard_config.next_segment_index(sector_to_segment(start as u64), start_segment),
);
}
Ok(None)
}
} }
#[cfg(test)] #[cfg(test)]
@ -1709,7 +1750,7 @@ mod tests {
}; };
let sync_id = match request_id { let sync_id = match request_id {
network::RequestId::Sync(sync_id) => sync_id, network::RequestId::Sync(_, sync_id) => sync_id,
_ => unreachable!("All Chunks responses belong to sync"), _ => unreachable!("All Chunks responses belong to sync"),
}; };

View File

@ -68,7 +68,7 @@ class ZgsNode(TestNode):
os.mkdir(self.data_dir) os.mkdir(self.data_dir)
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"]) log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
with open(log_config_path, "w") as f: with open(log_config_path, "w") as f:
f.write("debug,hyper=info,h2=info") f.write("trace,hyper=info,h2=info")
initialize_toml_config(self.config_file, self.config) initialize_toml_config(self.config_file, self.config)