use auto sync manager

This commit is contained in:
boqiu 2024-08-02 19:23:42 +08:00
parent 4654f792b2
commit c85d8ff7ed

View File

@ -1,6 +1,4 @@
use crate::auto_sync::batcher_random::RandomBatcher; use crate::auto_sync::manager::AutoSyncManager;
use crate::auto_sync::batcher_serial::SerialBatcher;
use crate::auto_sync::sync_store::SyncStore;
use crate::context::SyncNetworkContext; use crate::context::SyncNetworkContext;
use crate::controllers::{ use crate::controllers::{
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState, FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
@ -18,7 +16,6 @@ use network::{
PeerRequestId, SyncId as RequestId, PeerRequestId, SyncId as RequestId,
}; };
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID}; use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{ use std::{
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
sync::Arc, sync::Arc,
@ -27,7 +24,6 @@ use storage::config::ShardConfig;
use storage::error::Result as StorageResult; use storage::error::Result as StorageResult;
use storage::log_store::Store as LogStore; use storage::log_store::Store as LogStore;
use storage_async::Store; use storage_async::Store;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
const HEARTBEAT_INTERVAL_SEC: u64 = 5; const HEARTBEAT_INTERVAL_SEC: u64 = 5;
@ -131,7 +127,7 @@ pub struct SyncService {
/// Heartbeat interval for executing periodic tasks. /// Heartbeat interval for executing periodic tasks.
heartbeat: tokio::time::Interval, heartbeat: tokio::time::Interval,
file_announcement_send: Option<UnboundedSender<u64>>, auto_sync_manager: Option<AutoSyncManager>,
} }
impl SyncService { impl SyncService {
@ -172,35 +168,18 @@ impl SyncService {
let store = Store::new(store, executor.clone()); let store = Store::new(store, executor.clone());
// init auto sync // init auto sync
let file_announcement_send = if config.auto_sync_enabled { let auto_sync_manager = if config.auto_sync_enabled {
let (send, recv) = unbounded_channel(); Some(
let sync_store = Arc::new(SyncStore::new(store.clone())); AutoSyncManager::spawn(
let catched_up = Arc::new(AtomicBool::new(false)); config,
&executor,
// sync in sequence store.clone(),
let serial_batcher = sync_send.clone(),
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) event_recv,
.await?; catch_up_end_recv,
executor.spawn( )
serial_batcher.start(recv, event_recv, catched_up.clone()), .await?,
"auto_sync_serial", )
);
// sync randomly
let random_batcher =
RandomBatcher::new(config, store.clone(), sync_send.clone(), sync_store);
executor.spawn(random_batcher.start(catched_up.clone()), "auto_sync_random");
// handle on catched up notification
executor.spawn(
async move {
catch_up_end_recv.await.expect("Catch up sender dropped");
catched_up.store(true, Ordering::Relaxed);
},
"auto_sync_wait_for_catchup",
);
Some(send)
} else { } else {
None None
}; };
@ -213,7 +192,7 @@ impl SyncService {
file_location_cache, file_location_cache,
controllers: Default::default(), controllers: Default::default(),
heartbeat, heartbeat,
file_announcement_send, auto_sync_manager,
}; };
info!("Starting sync service"); info!("Starting sync service");
@ -676,8 +655,8 @@ impl SyncService {
let tx_seq = tx_id.seq; let tx_seq = tx_id.seq;
trace!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip"); trace!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip");
if let Some(send) = &self.file_announcement_send { if let Some(manager) = &self.auto_sync_manager {
let _ = send.send(tx_seq); let _ = manager.file_announcement_send.send(tx_seq);
} }
// File already in sync // File already in sync
@ -915,7 +894,7 @@ mod tests {
file_location_cache, file_location_cache,
controllers: Default::default(), controllers: Default::default(),
heartbeat, heartbeat,
file_announcement_send: None, auto_sync_manager: None,
}; };
sync.on_peer_connected(init_peer_id); sync.on_peer_connected(init_peer_id);
@ -947,7 +926,7 @@ mod tests {
file_location_cache, file_location_cache,
controllers: Default::default(), controllers: Default::default(),
heartbeat, heartbeat,
file_announcement_send: None, auto_sync_manager: None,
}; };
sync.on_peer_disconnected(init_peer_id); sync.on_peer_disconnected(init_peer_id);