From 2bc402f94b8b27cd4ff4339313ebc18705c587d1 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Fri, 12 Jul 2024 14:26:23 +0800 Subject: [PATCH] Enhance sync logs (#122) * enhance sync logs * fix lint --- node/sync/src/controllers/peers.rs | 1 + node/sync/src/controllers/serial.rs | 89 +++++++++++++++++++---------- 2 files changed, 59 insertions(+), 31 deletions(-) diff --git a/node/sync/src/controllers/peers.rs b/node/sync/src/controllers/peers.rs index cc166a9..d48f03c 100644 --- a/node/sync/src/controllers/peers.rs +++ b/node/sync/src/controllers/peers.rs @@ -160,6 +160,7 @@ impl SyncPeers { .collect() } + #[cfg(test)] pub fn count(&self, states: &[PeerState]) -> usize { self.peers .values() diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 2ec58f5..25ef92d 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -19,7 +19,7 @@ use storage::log_store::log_manager::PORA_CHUNK_SIZE; use storage_async::Store; pub const MAX_CHUNKS_TO_REQUEST: u64 = 2 * 1024; -const MAX_REQUEST_FAILURES: usize = 100; +const MAX_REQUEST_FAILURES: usize = 5; const PEER_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(5); const WAIT_OUTGOING_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); @@ -161,14 +161,16 @@ impl SerialSyncController { self.peers.transition(); } + /// Find more peers to sync chunks. Return whether `FindFile` pubsub message published, fn try_find_peers(&mut self) { - info!(%self.tx_seq, "Finding peers"); - - if self.goal.is_all_chunks() { - self.publish_find_file(); + let (published, num_new_peers) = if self.goal.is_all_chunks() { + self.publish_find_file() } else { self.publish_find_chunks(); - } + (true, 0) + }; + + info!(%self.tx_seq, %published, %num_new_peers, "Finding peers"); self.state = SyncState::FindingPeers { origin: self.since, @@ -176,32 +178,37 @@ impl SerialSyncController { }; } - fn publish_find_file(&mut self) { + fn publish_find_file(&mut self) -> (bool, usize) { // try from cache - let mut found_new_peer = false; + let mut num_new_peers = 0; for announcement in self.file_location_cache.get_all(self.tx_id) { // make sure peer_id is part of the address let peer_id: PeerId = announcement.peer_id.clone().into(); let mut addr: Multiaddr = announcement.at.clone().into(); addr.push(Protocol::P2p(peer_id.into())); - found_new_peer = self.on_peer_found(peer_id, addr) || found_new_peer; + + if self.on_peer_found(peer_id, addr) { + num_new_peers += 1; + } } - if found_new_peer + if num_new_peers > 0 && self.peers.all_shards_available(vec![ PeerState::Found, PeerState::Connecting, PeerState::Connected, ]) { - return; + return (false, num_new_peers); } self.ctx.publish(PubsubMessage::FindFile(FindFile { tx_id: self.tx_id, timestamp: timestamp_now(), })); + + (true, num_new_peers) } fn publish_find_chunks(&self) { @@ -213,7 +220,11 @@ impl SerialSyncController { })); } + /// Dial to peers in `Found` state, so that `Connecting` or `Connected` peers cover + /// data in all shards. fn try_connect(&mut self) { + let mut num_peers_dailed = 0; + // select a random peer while !self .peers @@ -230,17 +241,23 @@ impl SerialSyncController { }; // connect to peer - info!(%self.tx_seq, %peer_id, %address, "Attempting to connect to peer"); + debug!(%self.tx_seq, %peer_id, %address, "Attempting to connect to peer"); self.ctx.send(NetworkMessage::DialPeer { address, peer_id }); self.peers .update_state(&peer_id, PeerState::Found, PeerState::Connecting); + + num_peers_dailed += 1; } + + info!(%self.tx_seq, %num_peers_dailed, "Connecting peers"); + self.state = SyncState::ConnectingPeers { since: Instant::now().into(), }; } + /// Randomly select a peer to sync the next segment. fn try_request_next(&mut self) { // request next chunk array let from_chunk = self.next_chunk; @@ -267,6 +284,9 @@ impl SerialSyncController { request_id, request: network::Request::GetChunks(request), }); + + info!(%self.tx_seq, %from_chunk, %to_chunk, %peer_id, "Sent request to get chunks"); + self.state = SyncState::Downloading { peer_id, from_chunk, @@ -276,6 +296,7 @@ impl SerialSyncController { } fn ban_peer(&mut self, peer_id: PeerId, reason: &'static str) { + debug!(%self.tx_seq, %peer_id, %reason, "Ban peer"); self.ctx.ban_peer(peer_id, reason); self.peers @@ -296,7 +317,7 @@ impl SerialSyncController { false } } else { - debug!(%self.tx_seq, %peer_id, %addr, "No shard config found"); + debug!(%self.tx_seq, %peer_id, %addr, "Found peer without shard config"); false } } @@ -320,7 +341,7 @@ impl SerialSyncController { PeerState::Connecting, PeerState::Disconnected, ) { - info!(%self.tx_seq, %peer_id, "Failed to dail peer"); + info!(%self.tx_seq, %peer_id, %err, "Failed to dail peer"); self.state = SyncState::Idle; } } @@ -370,11 +391,11 @@ impl SerialSyncController { true } _ => { - // FIXME(zz). Delayed response can enter this. + // Delayed response can enter this. warn!(%self.tx_seq, %from_peer_id, ?self.state, "Got response in unexpected state"); self.ctx.report_peer( from_peer_id, - PeerAction::HighToleranceError, + PeerAction::LowToleranceError, "Sync state mismatch", ); true @@ -383,20 +404,23 @@ impl SerialSyncController { } pub async fn on_response(&mut self, from_peer_id: PeerId, response: ChunkArrayWithProof) { + debug!(%self.tx_seq, %from_peer_id, "Received RPC response"); if self.handle_on_response_mismatch(from_peer_id) { return; } - let (from_chunk, to_chunk) = match self.state { + let (from_chunk, to_chunk, since) = match self.state { SyncState::Downloading { - peer_id: _peer_id, from_chunk, to_chunk, + since, .. - } => (from_chunk, to_chunk), + } => (from_chunk, to_chunk, since), _ => return, }; + debug!(%self.tx_seq, %from_peer_id, %from_chunk, %to_chunk, ?since, "Received RPC response from expected peer"); + debug_assert!(from_chunk < to_chunk, "Invalid chunk boundaries"); // invalid chunk array size: ban and re-request @@ -408,14 +432,16 @@ impl SerialSyncController { return; } - // invalid chunk range: ban and re-request + // invalid chunk range: may be response timeout, just ignore it let start_index = response.chunks.start_index; let end_index = start_index + (data_len / CHUNK_SIZE) as u64; if start_index != from_chunk || end_index != to_chunk { - // FIXME(zz): Possible for relayed response. warn!(%self.tx_seq, "Invalid chunk response range, expected={from_chunk}..{to_chunk}, actual={start_index}..{end_index}"); - // self.ban_peer(from_peer_id, "Invalid chunk response range"); - // self.state = SyncState::Idle; + self.ctx.report_peer( + from_peer_id, + PeerAction::LowToleranceError, + "Got response with unexpected chunk range", + ); return; } @@ -428,6 +454,7 @@ impl SerialSyncController { match validation_result { Ok(true) => {} Ok(false) => { + // occurs when remote peer has higher block height info!(%self.tx_seq, "Failed to validate chunks response due to no root found"); self.state = SyncState::AwaitingDownload { since: (Instant::now() + NEXT_REQUEST_WAIT_TIME).into(), @@ -490,7 +517,10 @@ impl SerialSyncController { .finalize_tx_with_hash(self.tx_id.seq, self.tx_id.hash) .await { - Ok(true) => self.state = SyncState::Completed, + Ok(true) => { + info!(%self.tx_seq, "Succeeded to finalize file"); + self.state = SyncState::Completed; + } Ok(false) => { warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx"); self.state = SyncState::Failed { @@ -517,11 +547,6 @@ impl SerialSyncController { fn handle_response_failure(&mut self, peer_id: PeerId, reason: &'static str) { info!(%peer_id, %self.tx_seq, %reason, "Chunks request failed"); - // ban peer on too many failures - // FIXME(zz): If remote removes a file, we will also get failure here. - // self.ctx - // .report_peer(peer_id, PeerAction::HighToleranceError, reason); - self.failures += 1; if self.failures <= MAX_REQUEST_FAILURES { @@ -536,6 +561,7 @@ impl SerialSyncController { } } + /// Randomly select a `Connected` peer to sync chunks. fn select_peer_for_request(&self, request: &GetChunksRequest) -> Option { let segment_index = (request.index_start + self.tx_start_chunk_in_flow) / PORA_CHUNK_SIZE as u64; @@ -612,8 +638,8 @@ impl SerialSyncController { self.state = SyncState::AwaitingDownload { since: Instant::now().into(), }; - } else if self.peers.count(&[Connecting]) == 0 { - debug!(%self.tx_seq, "Connecting to peers timeout and try to find other peers to dial"); + } else if !self.peers.all_shards_available(vec![Connecting, Connected]) { + debug!(%self.tx_seq, "Connecting to peers timeout or remote peers disconnected, try to find more peers"); self.state = SyncState::Idle; } else { // peers.transition() will handle the case that peer connecting timeout @@ -632,6 +658,7 @@ impl SerialSyncController { SyncState::AwaitingDownload { since } => { if Instant::now() < since.0 { + // retry seconds later completed = true; } else { self.try_request_next();