disable auto sync until catched up (#125)

This commit is contained in:
Bo QIU 2024-07-12 17:45:20 +08:00 committed by GitHub
parent cca14e246e
commit b1c52c7ad6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 41 additions and 9 deletions

View File

@ -4,7 +4,10 @@ use crate::{
Config, SyncSender,
};
use anyhow::Result;
use std::sync::Arc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use storage_async::Store;
use tokio::time::sleep;
@ -27,10 +30,17 @@ impl RandomBatcher {
}
}
pub async fn start(mut self) {
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
info!("Start to sync files");
loop {
// disable file sync until catched up
if !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase");
sleep(INTERVAL_IDLE).await;
continue;
}
match self.sync_once().await {
Ok(true) => {}
Ok(false) => {

View File

@ -8,9 +8,15 @@ use crate::{
};
use anyhow::Result;
use log_entry_sync::LogSyncEvent;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use std::{
collections::HashMap,
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use storage_async::Store;
use tokio::sync::oneshot;
use tokio::{
sync::{broadcast::Receiver, mpsc::UnboundedReceiver},
time::sleep,
@ -84,12 +90,10 @@ impl SerialBatcher {
mut self,
mut file_announcement_recv: UnboundedReceiver<u64>,
mut log_sync_recv: Receiver<LogSyncEvent>,
catch_up_end_recv: oneshot::Receiver<()>,
catched_up: Arc<AtomicBool>,
) {
info!(?self, "Start to sync files");
catch_up_end_recv.await.expect("log sync sender dropped");
loop {
// handle all pending file announcements
if self
@ -104,6 +108,13 @@ impl SerialBatcher {
continue;
}
// disable file sync until catched up
if !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase");
sleep(INTERVAL_IDLE).await;
continue;
}
// sync files
match self.sync_once().await {
Ok(true) => {}

View File

@ -17,6 +17,7 @@ use network::{
PeerRequestId, SyncId as RequestId,
};
use shared_types::{bytes_to_chunks, ChunkArrayWithProof, TxID};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
@ -169,20 +170,30 @@ impl SyncService {
let file_announcement_send = if config.auto_sync_enabled {
let (send, recv) = unbounded_channel();
let sync_store = Arc::new(SyncStore::new(store.clone()));
let catched_up = Arc::new(AtomicBool::new(false));
// sync in sequence
let serial_batcher =
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
.await?;
executor.spawn(
serial_batcher.start(recv, event_recv, catch_up_end_recv),
serial_batcher.start(recv, event_recv, catched_up.clone()),
"auto_sync_serial",
);
// sync randomly
let random_batcher =
RandomBatcher::new(config, store.clone(), sync_send.clone(), sync_store);
executor.spawn(random_batcher.start(), "auto_sync_random");
executor.spawn(random_batcher.start(catched_up.clone()), "auto_sync_random");
// handle on catched up notification
executor.spawn(
async move {
catch_up_end_recv.await.expect("Catch up sender dropped");
catched_up.store(true, Ordering::Relaxed);
},
"auto_sync_wait_for_catchup",
);
Some(send)
} else {