Refactor auto sync to return sync status (#150)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run

* Refactor auto sync to return sync state

* Add auto sync manager to wrap multiple objects

* use auto sync manager

* fix python tests
This commit is contained in:
Bo QIU 2024-08-05 17:30:26 +08:00 committed by GitHub
parent dbd865fded
commit 891e00fa80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 242 additions and 141 deletions

View File

@ -1,9 +1,11 @@
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender}; use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use std::fmt::Debug; use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc};
use storage_async::Store; use storage_async::Store;
use tokio::sync::RwLock;
#[derive(Debug)] #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum SyncResult { pub enum SyncResult {
Completed, Completed,
Failed, Failed,
@ -11,20 +13,15 @@ pub enum SyncResult {
} }
/// Supports to sync files concurrently. /// Supports to sync files concurrently.
#[derive(Clone)]
pub struct Batcher { pub struct Batcher {
config: Config, config: Config,
capacity: usize, capacity: usize,
tasks: Vec<u64>, // files to sync tasks: Arc<RwLock<Vec<u64>>>, // files to sync
store: Store, store: Store,
sync_send: SyncSender, sync_send: SyncSender,
} }
impl Debug for Batcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.tasks)
}
}
impl Batcher { impl Batcher {
pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self { pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self {
Self { Self {
@ -36,36 +33,43 @@ impl Batcher {
} }
} }
pub fn len(&self) -> usize { pub async fn len(&self) -> usize {
self.tasks.len() self.tasks.read().await.len()
} }
pub async fn add(&mut self, tx_seq: u64) -> Result<bool> { pub async fn tasks(&self) -> Vec<u64> {
// limits the number of threads self.tasks.read().await.clone()
if self.tasks.len() >= self.capacity { }
return Ok(false);
}
pub async fn add(&self, tx_seq: u64) -> Result<bool> {
// requires log entry available before file sync // requires log entry available before file sync
if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() { if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() {
return Ok(false); return Ok(false);
} }
self.tasks.push(tx_seq); let mut tasks = self.tasks.write().await;
// limits the number of threads
if tasks.len() >= self.capacity {
return Ok(false);
}
tasks.push(tx_seq);
Ok(true) Ok(true)
} }
pub fn reorg(&mut self, reverted_tx_seq: u64) { pub async fn reorg(&self, reverted_tx_seq: u64) {
self.tasks.retain(|&x| x < reverted_tx_seq); self.tasks.write().await.retain(|&x| x < reverted_tx_seq);
} }
/// Poll the sync result of any completed file sync. /// Poll the sync result of any completed file sync.
pub async fn poll(&mut self) -> Result<Option<(u64, SyncResult)>> { pub async fn poll(&self) -> Result<Option<(u64, SyncResult)>> {
let mut result = None; let mut result = None;
let mut index = self.tasks.len(); let tasks = self.tasks.read().await.clone();
let mut index = tasks.len();
for (i, tx_seq) in self.tasks.iter().enumerate() { for (i, tx_seq) in tasks.iter().enumerate() {
if let Some(ret) = self.poll_tx(*tx_seq).await? { if let Some(ret) = self.poll_tx(*tx_seq).await? {
result = Some((*tx_seq, ret)); result = Some((*tx_seq, ret));
index = i; index = i;
@ -73,8 +77,9 @@ impl Batcher {
} }
} }
if index < self.tasks.len() { let mut tasks = self.tasks.write().await;
self.tasks.swap_remove(index); if index < tasks.len() {
tasks.swap_remove(index);
} }
Ok(result) Ok(result)

View File

@ -4,6 +4,7 @@ use crate::{
Config, SyncSender, Config, SyncSender,
}; };
use anyhow::Result; use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::sync::{ use std::sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
@ -11,6 +12,15 @@ use std::sync::{
use storage_async::Store; use storage_async::Store;
use tokio::time::sleep; use tokio::time::sleep;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RandomBatcherState {
pub tasks: Vec<u64>,
pub pending_txs: usize,
pub ready_txs: usize,
}
#[derive(Clone)]
pub struct RandomBatcher { pub struct RandomBatcher {
batcher: Batcher, batcher: Batcher,
sync_store: Arc<SyncStore>, sync_store: Arc<SyncStore>,
@ -30,6 +40,16 @@ impl RandomBatcher {
} }
} }
pub async fn get_state(&self) -> Result<RandomBatcherState> {
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
Ok(RandomBatcherState {
tasks: self.batcher.tasks().await,
pending_txs,
ready_txs,
})
}
pub async fn start(mut self, catched_up: Arc<AtomicBool>) { pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
info!("Start to sync files"); info!("Start to sync files");
@ -45,13 +65,13 @@ impl RandomBatcher {
Ok(true) => {} Ok(true) => {}
Ok(false) => { Ok(false) => {
trace!( trace!(
"File sync still in progress or idle, {:?}", "File sync still in progress or idle, state = {:?}",
self.stat().await self.get_state().await
); );
sleep(INTERVAL_IDLE).await; sleep(INTERVAL_IDLE).await;
} }
Err(err) => { Err(err) => {
warn!(%err, "Failed to sync file once, {:?}", self.stat().await); warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
sleep(INTERVAL_ERROR).await; sleep(INTERVAL_ERROR).await;
} }
} }
@ -69,7 +89,7 @@ impl RandomBatcher {
None => return Ok(false), None => return Ok(false),
}; };
debug!(%tx_seq, ?sync_result, "Completed to sync file, {:?}", self.stat().await); debug!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
match sync_result { match sync_result {
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?, SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,
@ -80,7 +100,7 @@ impl RandomBatcher {
} }
async fn schedule(&mut self) -> Result<bool> { async fn schedule(&mut self) -> Result<bool> {
if self.batcher.len() > 0 { if self.batcher.len().await > 0 {
return Ok(false); return Ok(false);
} }
@ -93,21 +113,8 @@ impl RandomBatcher {
return Ok(false); return Ok(false);
} }
debug!("Pick a file to sync, {:?}", self.stat().await); debug!("Pick a file to sync, state = {:?}", self.get_state().await);
Ok(true) Ok(true)
} }
async fn stat(&self) -> String {
match self.sync_store.stat().await {
Ok((num_pending_txs, num_ready_txs)) => format!(
"RandomBatcher {{ batcher = {:?}, pending_txs = {}, ready_txs = {}}}",
self.batcher, num_pending_txs, num_ready_txs
),
Err(err) => format!(
"RandomBatcher {{ batcher = {:?}, pending_txs/ready_txs = Error({:?})}}",
self.batcher, err
),
}
}
} }

View File

@ -8,58 +8,70 @@ use crate::{
}; };
use anyhow::Result; use anyhow::Result;
use log_entry_sync::LogSyncEvent; use log_entry_sync::LogSyncEvent;
use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::HashMap, collections::HashMap,
fmt::Debug, fmt::Debug,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Arc,
}, },
}; };
use storage_async::Store; use storage_async::Store;
use tokio::{ use tokio::{
sync::{broadcast::Receiver, mpsc::UnboundedReceiver}, sync::{broadcast::Receiver, mpsc::UnboundedReceiver, RwLock},
time::sleep, time::sleep,
}; };
/// Supports to sync files in sequence concurrently. /// Supports to sync files in sequence concurrently.
#[derive(Clone)]
pub struct SerialBatcher { pub struct SerialBatcher {
batcher: Batcher, batcher: Batcher,
/// Next tx seq to sync. /// Next tx seq to sync.
next_tx_seq: u64, next_tx_seq: Arc<AtomicU64>,
/// Maximum tx seq to sync. /// Maximum tx seq to sync.
max_tx_seq: u64, max_tx_seq: Arc<AtomicU64>,
/// Completed txs that pending to update sync db in sequence. /// Completed txs that pending to update sync db in sequence.
pending_completed_txs: HashMap<u64, SyncResult>, pending_completed_txs: Arc<RwLock<HashMap<u64, SyncResult>>>,
/// Next tx seq to sync in db, so as to continue file sync from /// Next tx seq to sync in db, so as to continue file sync from
/// break point when program restarted. /// break point when program restarted.
next_tx_seq_in_db: u64, next_tx_seq_in_db: Arc<AtomicU64>,
sync_store: Arc<SyncStore>, sync_store: Arc<SyncStore>,
} }
impl Debug for SerialBatcher { #[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SerialBatcherState {
pub tasks: Vec<u64>,
pub next: u64,
pub max: u64,
pub pendings: HashMap<u64, SyncResult>,
pub next_in_db: u64,
}
impl Debug for SerialBatcherState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let max_tx_seq_desc = if self.max_tx_seq == u64::MAX { let max_tx_seq_desc = if self.max == u64::MAX {
"N/A".into() "N/A".into()
} else { } else {
format!("{}", self.max_tx_seq) format!("{}", self.max)
}; };
let pendings_desc = if self.pending_completed_txs.len() <= 5 { let pendings_desc = if self.pendings.len() <= 5 {
format!("{:?}", self.pending_completed_txs) format!("{:?}", self.pendings)
} else { } else {
format!("{}", self.pending_completed_txs.len()) format!("{}", self.pendings.len())
}; };
f.debug_struct("SerialBatcher") f.debug_struct("SerialBatcher")
.field("batcher", &self.batcher) .field("tasks", &self.tasks)
.field("next", &self.next_tx_seq) .field("next", &self.next)
.field("max", &max_tx_seq_desc) .field("max", &max_tx_seq_desc)
.field("pendings", &pendings_desc) .field("pendings", &pendings_desc)
.field("next_in_db", &self.next_tx_seq_in_db) .field("next_in_db", &self.next_in_db)
.finish() .finish()
} }
} }
@ -78,21 +90,37 @@ impl SerialBatcher {
Ok(Self { Ok(Self {
batcher: Batcher::new(config, capacity, store, sync_send), batcher: Batcher::new(config, capacity, store, sync_send),
next_tx_seq: next_tx_seq.unwrap_or(0), next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
max_tx_seq: max_tx_seq.unwrap_or(u64::MAX), max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq.unwrap_or(u64::MAX))),
pending_completed_txs: Default::default(), pending_completed_txs: Default::default(),
next_tx_seq_in_db: next_tx_seq.unwrap_or(0), next_tx_seq_in_db: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
sync_store, sync_store,
}) })
} }
pub async fn get_state(&self) -> SerialBatcherState {
SerialBatcherState {
tasks: self.batcher.tasks().await,
next: self.next_tx_seq.load(Ordering::Relaxed),
max: self.max_tx_seq.load(Ordering::Relaxed),
pendings: self
.pending_completed_txs
.read()
.await
.iter()
.map(|(k, v)| (*k, *v))
.collect(),
next_in_db: self.next_tx_seq_in_db.load(Ordering::Relaxed),
}
}
pub async fn start( pub async fn start(
mut self, mut self,
mut file_announcement_recv: UnboundedReceiver<u64>, mut file_announcement_recv: UnboundedReceiver<u64>,
mut log_sync_recv: Receiver<LogSyncEvent>, mut log_sync_recv: Receiver<LogSyncEvent>,
catched_up: Arc<AtomicBool>, catched_up: Arc<AtomicBool>,
) { ) {
info!(?self, "Start to sync files"); info!("Start to sync files, state = {:?}", self.get_state().await);
loop { loop {
// handle all pending file announcements // handle all pending file announcements
@ -119,11 +147,14 @@ impl SerialBatcher {
match self.sync_once().await { match self.sync_once().await {
Ok(true) => {} Ok(true) => {}
Ok(false) => { Ok(false) => {
trace!(?self, "File sync still in progress or idle"); trace!(
"File sync still in progress or idle, state = {:?}",
self.get_state().await
);
sleep(INTERVAL_IDLE).await; sleep(INTERVAL_IDLE).await;
} }
Err(err) => { Err(err) => {
warn!(%err, ?self, "Failed to sync file once"); warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
sleep(INTERVAL_ERROR).await; sleep(INTERVAL_ERROR).await;
} }
} }
@ -139,26 +170,27 @@ impl SerialBatcher {
trace!(%announced_tx_seq, "Received file announcement"); trace!(%announced_tx_seq, "Received file announcement");
// new file announced // new file announced
if self.max_tx_seq == u64::MAX || announced_tx_seq > self.max_tx_seq { let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
debug!(%announced_tx_seq, ?self, "Update for new file announcement"); if max_tx_seq == u64::MAX || announced_tx_seq > max_tx_seq {
debug!(%announced_tx_seq, "Update for new file announcement, state = {:?}", self.get_state().await);
if let Err(err) = self.sync_store.set_max_tx_seq(announced_tx_seq).await { if let Err(err) = self.sync_store.set_max_tx_seq(announced_tx_seq).await {
error!(%err, %announced_tx_seq, ?self, "Failed to set max_tx_seq in store"); error!(%err, %announced_tx_seq, "Failed to set max_tx_seq in store, state = {:?}", self.get_state().await);
} }
self.max_tx_seq = announced_tx_seq; self.max_tx_seq.store(announced_tx_seq, Ordering::Relaxed);
return true; return true;
} }
// already wait for sequential sync // already wait for sequential sync
if announced_tx_seq >= self.next_tx_seq { if announced_tx_seq >= self.next_tx_seq.load(Ordering::Relaxed) {
return true; return true;
} }
// otherwise, mark tx as ready for sync // otherwise, mark tx as ready for sync
if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await { if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await {
error!(%err, %announced_tx_seq, ?self, "Failed to promote announced tx to ready"); error!(%err, %announced_tx_seq, "Failed to promote announced tx to ready, state = {:?}", self.get_state().await);
} }
true true
@ -173,11 +205,11 @@ impl SerialBatcher {
debug!(%reverted_tx_seq, "Reorg detected"); debug!(%reverted_tx_seq, "Reorg detected");
// reorg happened, but no impact on file sync // reorg happened, but no impact on file sync
if reverted_tx_seq >= self.next_tx_seq { if reverted_tx_seq >= self.next_tx_seq.load(Ordering::Relaxed) {
return true; return true;
} }
info!(%reverted_tx_seq, ?self, "Handle reorg started"); info!(%reverted_tx_seq, "Handle reorg started, state = {:?}", self.get_state().await);
// terminate all files in progress // terminate all files in progress
self.batcher self.batcher
@ -185,23 +217,22 @@ impl SerialBatcher {
.await; .await;
// update states // update states
self.batcher.reorg(reverted_tx_seq); self.batcher.reorg(reverted_tx_seq).await;
self.next_tx_seq = reverted_tx_seq; self.next_tx_seq.store(reverted_tx_seq, Ordering::Relaxed);
self.pending_completed_txs self.pending_completed_txs
.write()
.await
.retain(|&k, _| k < reverted_tx_seq); .retain(|&k, _| k < reverted_tx_seq);
if self.next_tx_seq_in_db > reverted_tx_seq { if self.next_tx_seq_in_db.load(Ordering::Relaxed) > reverted_tx_seq {
self.next_tx_seq_in_db = reverted_tx_seq; self.next_tx_seq_in_db
.store(reverted_tx_seq, Ordering::Relaxed);
if let Err(err) = self if let Err(err) = self.sync_store.set_next_tx_seq(reverted_tx_seq).await {
.sync_store error!(%err, %reverted_tx_seq, "Failed to set next tx seq due to tx reverted, state = {:?}", self.get_state().await);
.set_next_tx_seq(self.next_tx_seq_in_db)
.await
{
error!(%err, %reverted_tx_seq, ?self, "Failed to set next tx seq due to tx reverted");
} }
} }
info!(%reverted_tx_seq, ?self, "Handle reorg ended"); info!(%reverted_tx_seq, "Handle reorg ended, state = {:?}", self.get_state().await);
true true
} }
@ -218,8 +249,11 @@ impl SerialBatcher {
None => return Ok(false), None => return Ok(false),
}; };
info!(%tx_seq, ?sync_result, ?self, "Completed to sync file"); info!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
self.pending_completed_txs.insert(tx_seq, sync_result); self.pending_completed_txs
.write()
.await
.insert(tx_seq, sync_result);
// update sync db // update sync db
self.update_completed_txs_in_db().await?; self.update_completed_txs_in_db().await?;
@ -229,45 +263,49 @@ impl SerialBatcher {
/// Schedule file sync in sequence. /// Schedule file sync in sequence.
async fn schedule_next(&mut self) -> Result<bool> { async fn schedule_next(&mut self) -> Result<bool> {
if self.next_tx_seq > self.max_tx_seq { let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) {
return Ok(false); return Ok(false);
} }
if !self.batcher.add(self.next_tx_seq).await? { if !self.batcher.add(next_tx_seq).await? {
return Ok(false); return Ok(false);
} }
self.next_tx_seq += 1; self.next_tx_seq.store(next_tx_seq + 1, Ordering::Relaxed);
info!(?self, "Move forward"); info!("Move forward, state = {:?}", self.get_state().await);
Ok(true) Ok(true)
} }
/// Update file sync index in db. /// Update file sync index in db.
async fn update_completed_txs_in_db(&mut self) -> Result<()> { async fn update_completed_txs_in_db(&mut self) -> Result<()> {
let origin = self.next_tx_seq_in_db; let origin = self.next_tx_seq_in_db.load(Ordering::Relaxed);
let mut current = origin;
loop {
let sync_result = match self.pending_completed_txs.read().await.get(&current) {
Some(&v) => v,
None => break,
};
while let Some(sync_result) = self.pending_completed_txs.get(&self.next_tx_seq_in_db) {
// downgrade to random sync if file sync failed or timeout // downgrade to random sync if file sync failed or timeout
if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) { if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) {
self.sync_store self.sync_store.add_pending_tx(current).await?;
.add_pending_tx(self.next_tx_seq_in_db)
.await?;
} }
// always move forward in db // always move forward in db
self.sync_store self.sync_store.set_next_tx_seq(current + 1).await?;
.set_next_tx_seq(self.next_tx_seq_in_db + 1)
.await?;
// update in memory after db updated // update in memory after db updated
self.pending_completed_txs.remove(&self.next_tx_seq_in_db); self.pending_completed_txs.write().await.remove(&current);
self.next_tx_seq_in_db += 1; current += 1;
self.next_tx_seq_in_db.store(current, Ordering::Relaxed);
} }
if self.next_tx_seq_in_db > origin { if current > origin {
info!(%origin, %self.next_tx_seq_in_db, "Move forward in db"); info!(%origin, %current, "Move forward in db");
} }
Ok(()) Ok(())

View File

@ -0,0 +1,71 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use anyhow::Result;
use log_entry_sync::LogSyncEvent;
use storage_async::Store;
use task_executor::TaskExecutor;
use tokio::sync::{
broadcast,
mpsc::{unbounded_channel, UnboundedSender},
oneshot,
};
use crate::{Config, SyncSender};
use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore};
pub struct AutoSyncManager {
pub serial: SerialBatcher,
pub random: RandomBatcher,
pub file_announcement_send: UnboundedSender<u64>,
}
impl AutoSyncManager {
pub async fn spawn(
config: Config,
executor: &TaskExecutor,
store: Store,
sync_send: SyncSender,
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
catch_up_end_recv: oneshot::Receiver<()>,
) -> Result<Self> {
let (send, recv) = unbounded_channel();
let sync_store = Arc::new(SyncStore::new(store.clone()));
let catched_up = Arc::new(AtomicBool::new(false));
// sync in sequence
let serial =
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
.await?;
executor.spawn(
serial
.clone()
.start(recv, log_sync_recv, catched_up.clone()),
"auto_sync_serial",
);
// sync randomly
let random = RandomBatcher::new(config, store, sync_send, sync_store);
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
// handle on catched up notification
executor.spawn(
async move {
if catch_up_end_recv.await.is_ok() {
info!("log entry catched up");
catched_up.store(true, Ordering::Relaxed);
}
},
"auto_sync_wait_for_catchup",
);
Ok(Self {
serial,
random,
file_announcement_send: send,
})
}
}

View File

@ -1,6 +1,7 @@
mod batcher; mod batcher;
pub mod batcher_random; pub mod batcher_random;
pub mod batcher_serial; pub mod batcher_serial;
pub mod manager;
pub mod sync_store; pub mod sync_store;
mod tx_store; mod tx_store;

View File

@ -1,6 +1,4 @@
use crate::auto_sync::batcher_random::RandomBatcher; use crate::auto_sync::manager::AutoSyncManager;
use crate::auto_sync::batcher_serial::SerialBatcher;
use crate::auto_sync::sync_store::SyncStore;
use crate::context::SyncNetworkContext; use crate::context::SyncNetworkContext;
use crate::controllers::{ use crate::controllers::{
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState, FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
@ -18,7 +16,6 @@ use network::{
PeerRequestId, SyncId as RequestId, PeerRequestId, SyncId as RequestId,
}; };
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID}; use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{ use std::{
collections::{hash_map::Entry, HashMap}, collections::{hash_map::Entry, HashMap},
sync::Arc, sync::Arc,
@ -27,7 +24,6 @@ use storage::config::ShardConfig;
use storage::error::Result as StorageResult; use storage::error::Result as StorageResult;
use storage::log_store::Store as LogStore; use storage::log_store::Store as LogStore;
use storage_async::Store; use storage_async::Store;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
const HEARTBEAT_INTERVAL_SEC: u64 = 5; const HEARTBEAT_INTERVAL_SEC: u64 = 5;
@ -131,7 +127,7 @@ pub struct SyncService {
/// Heartbeat interval for executing periodic tasks. /// Heartbeat interval for executing periodic tasks.
heartbeat: tokio::time::Interval, heartbeat: tokio::time::Interval,
file_announcement_send: Option<UnboundedSender<u64>>, auto_sync_manager: Option<AutoSyncManager>,
} }
impl SyncService { impl SyncService {
@ -172,35 +168,18 @@ impl SyncService {
let store = Store::new(store, executor.clone()); let store = Store::new(store, executor.clone());
// init auto sync // init auto sync
let file_announcement_send = if config.auto_sync_enabled { let auto_sync_manager = if config.auto_sync_enabled {
let (send, recv) = unbounded_channel(); Some(
let sync_store = Arc::new(SyncStore::new(store.clone())); AutoSyncManager::spawn(
let catched_up = Arc::new(AtomicBool::new(false)); config,
&executor,
// sync in sequence store.clone(),
let serial_batcher = sync_send.clone(),
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) event_recv,
.await?; catch_up_end_recv,
executor.spawn( )
serial_batcher.start(recv, event_recv, catched_up.clone()), .await?,
"auto_sync_serial", )
);
// sync randomly
let random_batcher =
RandomBatcher::new(config, store.clone(), sync_send.clone(), sync_store);
executor.spawn(random_batcher.start(catched_up.clone()), "auto_sync_random");
// handle on catched up notification
executor.spawn(
async move {
catch_up_end_recv.await.expect("Catch up sender dropped");
catched_up.store(true, Ordering::Relaxed);
},
"auto_sync_wait_for_catchup",
);
Some(send)
} else { } else {
None None
}; };
@ -213,7 +192,7 @@ impl SyncService {
file_location_cache, file_location_cache,
controllers: Default::default(), controllers: Default::default(),
heartbeat, heartbeat,
file_announcement_send, auto_sync_manager,
}; };
info!("Starting sync service"); info!("Starting sync service");
@ -676,8 +655,8 @@ impl SyncService {
let tx_seq = tx_id.seq; let tx_seq = tx_id.seq;
trace!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip"); trace!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip");
if let Some(send) = &self.file_announcement_send { if let Some(manager) = &self.auto_sync_manager {
let _ = send.send(tx_seq); let _ = manager.file_announcement_send.send(tx_seq);
} }
// File already in sync // File already in sync
@ -915,7 +894,7 @@ mod tests {
file_location_cache, file_location_cache,
controllers: Default::default(), controllers: Default::default(),
heartbeat, heartbeat,
file_announcement_send: None, auto_sync_manager: None,
}; };
sync.on_peer_connected(init_peer_id); sync.on_peer_connected(init_peer_id);
@ -947,7 +926,7 @@ mod tests {
file_location_cache, file_location_cache,
controllers: Default::default(), controllers: Default::default(),
heartbeat, heartbeat,
file_announcement_send: None, auto_sync_manager: None,
}; };
sync.on_peer_disconnected(init_peer_id); sync.on_peer_disconnected(init_peer_id);