2024-07-04 06:04:17 +00:00
|
|
|
use super::{batcher::Batcher, sync_store::SyncStore};
|
|
|
|
use crate::{
|
|
|
|
auto_sync::{batcher::SyncResult, INTERVAL_ERROR, INTERVAL_IDLE},
|
|
|
|
Config, SyncSender,
|
|
|
|
};
|
|
|
|
use anyhow::Result;
|
2024-07-08 03:47:59 +00:00
|
|
|
use std::sync::Arc;
|
2024-07-04 06:04:17 +00:00
|
|
|
use storage_async::Store;
|
|
|
|
use tokio::time::sleep;
|
|
|
|
|
|
|
|
pub struct RandomBatcher {
|
|
|
|
batcher: Batcher,
|
2024-07-08 03:47:59 +00:00
|
|
|
sync_store: Arc<SyncStore>,
|
2024-07-04 06:04:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl RandomBatcher {
|
2024-07-08 03:47:59 +00:00
|
|
|
pub fn new(
|
|
|
|
config: Config,
|
|
|
|
store: Store,
|
|
|
|
sync_send: SyncSender,
|
|
|
|
sync_store: Arc<SyncStore>,
|
|
|
|
) -> Self {
|
2024-07-04 06:04:17 +00:00
|
|
|
Self {
|
|
|
|
// now, only 1 thread to sync file randomly
|
|
|
|
batcher: Batcher::new(config, 1, store, sync_send),
|
|
|
|
sync_store,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn start(mut self) {
|
|
|
|
info!("Start to sync files");
|
|
|
|
|
|
|
|
loop {
|
|
|
|
match self.sync_once().await {
|
|
|
|
Ok(true) => {}
|
|
|
|
Ok(false) => {
|
2024-07-08 03:47:59 +00:00
|
|
|
trace!(
|
|
|
|
"File sync still in progress or idle, {:?}",
|
|
|
|
self.stat().await
|
|
|
|
);
|
2024-07-04 06:04:17 +00:00
|
|
|
sleep(INTERVAL_IDLE).await;
|
|
|
|
}
|
|
|
|
Err(err) => {
|
2024-07-08 03:47:59 +00:00
|
|
|
warn!(%err, "Failed to sync file once, {:?}", self.stat().await);
|
2024-07-04 06:04:17 +00:00
|
|
|
sleep(INTERVAL_ERROR).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn sync_once(&mut self) -> Result<bool> {
|
|
|
|
if self.schedule().await? {
|
|
|
|
return Ok(true);
|
|
|
|
}
|
|
|
|
|
|
|
|
// poll any completed file sync
|
|
|
|
let (tx_seq, sync_result) = match self.batcher.poll().await? {
|
|
|
|
Some(v) => v,
|
|
|
|
None => return Ok(false),
|
|
|
|
};
|
|
|
|
|
2024-07-08 03:47:59 +00:00
|
|
|
debug!(%tx_seq, ?sync_result, "Completed to sync file, {:?}", self.stat().await);
|
2024-07-04 06:04:17 +00:00
|
|
|
|
|
|
|
match sync_result {
|
|
|
|
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,
|
|
|
|
_ => self.sync_store.downgrade_tx_to_pending(tx_seq).await?,
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(true)
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn schedule(&mut self) -> Result<bool> {
|
|
|
|
if self.batcher.len() > 0 {
|
|
|
|
return Ok(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
let tx_seq = match self.sync_store.random_tx().await? {
|
|
|
|
Some(v) => v,
|
|
|
|
None => return Ok(false),
|
|
|
|
};
|
|
|
|
|
|
|
|
if !self.batcher.add(tx_seq).await? {
|
|
|
|
return Ok(false);
|
|
|
|
}
|
|
|
|
|
2024-07-08 03:47:59 +00:00
|
|
|
debug!("Pick a file to sync, {:?}", self.stat().await);
|
2024-07-04 06:04:17 +00:00
|
|
|
|
|
|
|
Ok(true)
|
|
|
|
}
|
2024-07-08 03:47:59 +00:00
|
|
|
|
|
|
|
async fn stat(&self) -> String {
|
|
|
|
match self.sync_store.stat().await {
|
|
|
|
Ok((num_pending_txs, num_ready_txs)) => format!(
|
|
|
|
"RandomBatcher {{ batcher = {:?}, pending_txs = {}, ready_txs = {}}}",
|
|
|
|
self.batcher, num_pending_txs, num_ready_txs
|
|
|
|
),
|
|
|
|
Err(err) => format!(
|
|
|
|
"RandomBatcher {{ batcher = {:?}, pending_txs/ready_txs = Error({:?})}}",
|
|
|
|
self.batcher, err
|
|
|
|
),
|
|
|
|
}
|
|
|
|
}
|
2024-07-04 06:04:17 +00:00
|
|
|
}
|