Supports batch randomly auto sync files (#154)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run

This commit is contained in:
Bo QIU 2024-08-08 18:36:08 +08:00 committed by GitHub
parent e1df2f0868
commit 9189cabbb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 18 additions and 22 deletions

View File

@ -1,7 +1,7 @@
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc};
use std::{collections::HashSet, fmt::Debug, sync::Arc};
use storage_async::Store;
use tokio::sync::RwLock;
@ -17,7 +17,7 @@ pub enum SyncResult {
pub struct Batcher {
pub(crate) config: Config,
capacity: usize,
tasks: Arc<RwLock<Vec<u64>>>, // files to sync
tasks: Arc<RwLock<HashSet<u64>>>, // files to sync
store: Store,
sync_send: SyncSender,
}
@ -33,12 +33,10 @@ impl Batcher {
}
}
pub async fn len(&self) -> usize {
self.tasks.read().await.len()
}
pub async fn tasks(&self) -> Vec<u64> {
self.tasks.read().await.clone()
let mut result: Vec<u64> = self.tasks.read().await.iter().copied().collect();
result.sort();
result
}
pub async fn add(&self, tx_seq: u64) -> Result<bool> {
@ -54,9 +52,7 @@ impl Batcher {
return Ok(false);
}
tasks.push(tx_seq);
Ok(true)
Ok(tasks.insert(tx_seq))
}
pub async fn reorg(&self, reverted_tx_seq: u64) {
@ -67,19 +63,16 @@ impl Batcher {
pub async fn poll(&self) -> Result<Option<(u64, SyncResult)>> {
let mut result = None;
let tasks = self.tasks.read().await.clone();
let mut index = tasks.len();
for (i, tx_seq) in tasks.iter().enumerate() {
for tx_seq in tasks.iter() {
if let Some(ret) = self.poll_tx(*tx_seq).await? {
result = Some((*tx_seq, ret));
index = i;
break;
}
}
let mut tasks = self.tasks.write().await;
if index < tasks.len() {
tasks.swap_remove(index);
if let Some((tx_seq, _)) = &result {
self.tasks.write().await.remove(tx_seq);
}
Ok(result)

View File

@ -31,8 +31,7 @@ impl RandomBatcher {
sync_store: Arc<SyncStore>,
) -> Self {
Self {
// now, only 1 thread to sync file randomly
batcher: Batcher::new(config, 1, store, sync_send),
batcher: Batcher::new(config, config.max_random_workers, store, sync_send),
sync_store,
}
}
@ -97,10 +96,6 @@ impl RandomBatcher {
}
async fn schedule(&mut self) -> Result<bool> {
if self.batcher.len().await > 0 {
return Ok(false);
}
let tx_seq = match self.sync_store.random_tx().await? {
Some(v) => v,
None => return Ok(false),

View File

@ -50,6 +50,7 @@ pub struct Config {
#[serde(deserialize_with = "deserialize_duration")]
pub auto_sync_error_interval: Duration,
pub max_sequential_workers: usize,
pub max_random_workers: usize,
#[serde(deserialize_with = "deserialize_duration")]
pub find_peer_timeout: Duration,
}
@ -78,6 +79,7 @@ impl Default for Config {
auto_sync_idle_interval: Duration::from_secs(3),
auto_sync_error_interval: Duration::from_secs(10),
max_sequential_workers: 8,
max_random_workers: 4,
find_peer_timeout: Duration::from_secs(10),
}
}

View File

@ -241,3 +241,6 @@ auto_sync_enabled = true
# Maximum threads to sync files in sequence.
# max_sequential_workers = 8
# Maximum threads to sync files randomly.
# max_random_workers = 4

View File

@ -242,6 +242,9 @@
# Maximum threads to sync files in sequence.
# max_sequential_workers = 8
# Maximum threads to sync files randomly.
# max_random_workers = 4
#######################################################################
### File Location Cache Options ###
#######################################################################