Fix file sync pending due to peer connection timeout (#95)

* enhance logs for sync module

* report peer if connection timeout

* Use origin timestamp of sync state to determine if sync too long to terminate

* fix compilation error
This commit is contained in:
Bo QIU 2024-06-25 17:47:25 +08:00 committed by GitHub
parent d8d8c28c64
commit c6325f7643
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 87 additions and 37 deletions

View File

@ -119,8 +119,11 @@ impl Manager {
}
pub async fn update_on_announcement(&self, announced_tx_seq: u64) {
// new file announced
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
debug!(%next_tx_seq, %max_tx_seq, %announced_tx_seq, "Update for new announcement");
// new file announced
if max_tx_seq == u64::MAX || announced_tx_seq > max_tx_seq {
match self.sync_store.set_max_tx_seq(announced_tx_seq).await {
Ok(()) => self.max_tx_seq.store(announced_tx_seq, Ordering::Relaxed),
@ -130,7 +133,7 @@ impl Manager {
}
// already wait for sequential sync
if announced_tx_seq >= self.next_tx_seq.load(Ordering::Relaxed) {
if announced_tx_seq >= next_tx_seq {
return;
}
@ -142,7 +145,8 @@ impl Manager {
async fn move_forward(&self, pending: bool) -> Result<bool> {
let tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
if tx_seq > self.max_tx_seq.load(Ordering::Relaxed) {
let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
if tx_seq > max_tx_seq {
return Ok(false);
}
@ -155,6 +159,8 @@ impl Manager {
self.sync_store.set_next_tx_seq(next_tx_seq).await?;
self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed);
debug!(%next_tx_seq, %max_tx_seq, "Move forward");
Ok(true)
}
@ -189,10 +195,11 @@ impl Manager {
}
}
if matches!(state, Some(SyncState::FindingPeers { since, .. }) if since.elapsed() > self.config.find_peer_timeout)
if matches!(state, Some(SyncState::FindingPeers { origin, .. }) if origin.elapsed() > self.config.find_peer_timeout)
{
// no peers found for a long time
self.terminate_file_sync(tx_seq, false).await;
info!(%tx_seq, "Terminate file sync due to finding peers timeout");
Ok(true)
} else {
// otherwise, continue to wait for file sync that already in progress
@ -310,7 +317,10 @@ impl Manager {
loop {
if next {
match self.sync_store.random_tx().await {
Ok(Some(seq)) => tx_seq = seq,
Ok(Some(seq)) => {
tx_seq = seq;
debug!(%tx_seq, "Start to sync pending file");
}
Ok(None) => {
trace!("No pending file to sync");
sleep(INTERVAL).await;
@ -578,8 +588,8 @@ mod tests {
// no peers for file sync for a long time
SyncResponse::SyncStatus {
status: Some(SyncState::FindingPeers {
since: Instant::now().sub(Duration::from_secs(10000)),
updated: Instant::now(),
origin: Instant::now().sub(Duration::from_secs(10000)),
since: Instant::now(),
}),
},
// required to terminate the file sync

View File

@ -1,11 +1,14 @@
use network::{Multiaddr, PeerId};
use network::{Multiaddr, PeerAction, PeerId};
use rand::seq::IteratorRandom;
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec;
use storage::config::ShardConfig;
use crate::context::SyncNetworkContext;
const PEER_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const PEER_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(5);
@ -39,12 +42,20 @@ impl PeerInfo {
}
}
#[derive(Default, Debug)]
#[derive(Default)]
pub struct SyncPeers {
peers: HashMap<PeerId, PeerInfo>,
ctx: Option<Arc<SyncNetworkContext>>,
}
impl SyncPeers {
pub fn new(ctx: Arc<SyncNetworkContext>) -> Self {
Self {
peers: Default::default(),
ctx: Some(ctx),
}
}
pub fn add_new_peer_with_config(
&mut self,
peer_id: PeerId,
@ -181,6 +192,13 @@ impl SyncPeers {
if info.since.elapsed() >= PEER_CONNECT_TIMEOUT {
info!(%peer_id, %info.addr, "Peer connection timeout");
bad_peers.push(*peer_id);
if let Some(ctx) = &self.ctx {
ctx.report_peer(
*peer_id,
PeerAction::LowToleranceError,
"Dail timeout",
);
}
}
}

View File

@ -33,8 +33,8 @@ pub enum FailureReason {
pub enum SyncState {
Idle,
FindingPeers {
origin: Instant,
since: Instant,
updated: Instant,
},
FoundPeers,
ConnectingPeers,
@ -110,7 +110,7 @@ impl SerialSyncController {
next_chunk: goal.index_start,
failures: 0,
state: SyncState::Idle,
peers: Default::default(),
peers: SyncPeers::new(ctx.clone()),
ctx,
store,
file_location_cache,
@ -169,14 +169,10 @@ impl SerialSyncController {
self.publish_find_chunks();
}
let now = Instant::now();
let (since, updated) = match self.state {
SyncState::FindingPeers { since, .. } => (since, now),
_ => (now, now),
self.state = SyncState::FindingPeers {
origin: self.since,
since: Instant::now(),
};
self.state = SyncState::FindingPeers { since, updated };
}
fn publish_find_file(&mut self) {
@ -563,10 +559,14 @@ impl SerialSyncController {
pub fn transition(&mut self) {
use PeerState::*;
debug!(%self.tx_seq, ?self.state, "transition started");
// update peer connection states
self.peers.transition();
loop {
let mut completed = false;
while !completed {
match self.state {
SyncState::Idle => {
if self.peers.count(&[Found, Connecting, Connected]) > 0 {
@ -576,19 +576,19 @@ impl SerialSyncController {
}
}
SyncState::FindingPeers { updated, .. } => {
SyncState::FindingPeers { since, .. } => {
if self.peers.count(&[Found, Connecting, Connected]) > 0 {
self.state = SyncState::FoundPeers;
} else {
// storage node may not have the specific file when `FindFile`
// gossip message received. In this case, just broadcast the
// `FindFile` message again.
if updated.elapsed() >= PEER_REQUEST_TIMEOUT {
debug!(%self.tx_seq, "Peer request timeout");
if since.elapsed() >= PEER_REQUEST_TIMEOUT {
debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
self.try_find_peers();
}
return;
completed = true;
}
}
@ -606,43 +606,48 @@ impl SerialSyncController {
since: Instant::now(),
};
} else if self.peers.count(&[Connecting]) == 0 {
debug!(%self.tx_seq, "Connecting to peers timeout and try to find other peers to dial");
self.state = SyncState::Idle;
} else {
// peers.transition() will handle the case that peer connecting timeout
return;
completed = true;
}
}
SyncState::AwaitingOutgoingConnection { since } => {
if since.elapsed() < WAIT_OUTGOING_CONNECTION_TIMEOUT {
return;
completed = true;
} else {
debug!(%self.tx_seq, "Waiting for outgoing connection timeout and try to find other peers to dial");
self.state = SyncState::Idle;
}
self.state = SyncState::Idle;
}
SyncState::AwaitingDownload { since } => {
if Instant::now() < since {
return;
completed = true;
} else {
self.try_request_next();
}
self.try_request_next();
}
SyncState::Downloading { peer_id, since, .. } => {
if !matches!(self.peers.peer_state(&peer_id), Some(PeerState::Connected)) {
// e.g. peer disconnected by remote node
debug!(%self.tx_seq, "No peer to continue downloading and try to find other peers to download");
self.state = SyncState::Idle;
} else if since.elapsed() >= DOWNLOAD_TIMEOUT {
self.handle_response_failure(peer_id, "RPC timeout");
} else {
return;
completed = true;
}
}
SyncState::Completed | SyncState::Failed { .. } => return,
SyncState::Completed | SyncState::Failed { .. } => completed = true,
}
}
debug!(%self.tx_seq, ?self.state, "transition ended");
}
}

View File

@ -175,7 +175,7 @@ impl SyncService {
manager,
};
debug!("Starting sync service");
info!("Starting sync service");
executor.spawn(async move { Box::pin(sync.main()).await }, "sync");
Ok(sync_send)
@ -306,7 +306,6 @@ impl SyncService {
tx_seq,
is_reverted,
} => {
debug!(?tx_seq, "terminate file sync");
let count = self.on_terminate_file_sync(tx_seq, is_reverted);
let _ = sender.send(SyncResponse::TerminateFileSync { count });
}
@ -314,7 +313,7 @@ impl SyncService {
}
fn on_dail_failed(&mut self, peer_id: PeerId, err: DialError) {
info!(%peer_id, "Dail to peer failed");
info!(%peer_id, ?err, "Dail to peer failed");
for controller in self.controllers.values_mut() {
controller.on_dail_failed(peer_id, &err);
@ -525,7 +524,7 @@ impl SyncService {
maybe_range: Option<(u64, u64)>,
maybe_peer: Option<(PeerId, Multiaddr)>,
) -> Result<()> {
info!(%tx_seq, "Start to sync file");
info!(%tx_seq, ?maybe_range, ?maybe_peer, "Start to sync file");
// remove failed entry if caused by tx reverted, so as to re-sync
// file with latest tx_id.
@ -541,10 +540,14 @@ impl SyncService {
if tx_reverted {
self.controllers.remove(&tx_seq);
info!(%tx_seq, "Terminate file sync due to tx reverted");
}
let controller = match self.controllers.entry(tx_seq) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Occupied(entry) => {
debug!(%tx_seq, "File already in sync");
entry.into_mut()
}
Entry::Vacant(entry) => {
let tx = match self.store.get_tx_by_seq_number(tx_seq).await? {
Some(tx) => tx,
@ -587,6 +590,7 @@ impl SyncService {
// Trigger file or chunks sync again if completed or failed.
if controller.is_completed_or_failed() {
controller.reset(maybe_range);
debug!(%tx_seq, "Reset completed or failed file sync");
}
if let Some((peer_id, addr)) = maybe_peer {
@ -661,6 +665,7 @@ impl SyncService {
/// Note, this function should be as fast as possible to avoid
/// message lagged in channel.
fn on_terminate_file_sync(&mut self, min_tx_seq: u64, is_reverted: bool) -> usize {
info!(%min_tx_seq, %is_reverted, "Terminate file sync");
let mut to_terminate = vec![];
if is_reverted {
@ -677,20 +682,32 @@ impl SyncService {
self.controllers.remove(tx_seq);
}
debug!(?to_terminate, "File sync terminated");
to_terminate.len()
}
fn on_heartbeat(&mut self) {
let mut completed = vec![];
let mut incompleted = vec![];
for (&tx_seq, controller) in self.controllers.iter_mut() {
controller.transition();
if let SyncState::Completed = controller.get_status() {
completed.push(tx_seq);
} else {
incompleted.push(tx_seq);
}
}
if !completed.is_empty() || !incompleted.is_empty() {
debug!(
"Sync stat: incompleted = {:?}, completed = {:?}",
incompleted, completed
);
}
for tx_seq in completed {
self.controllers.remove(&tx_seq);
}