From 9189cabbb251d9379b250eca46d7522ac3efbb68 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Thu, 8 Aug 2024 18:36:08 +0800 Subject: [PATCH] Supports batch randomly auto sync files (#154) --- node/sync/src/auto_sync/batcher.rs | 25 ++++++++--------------- node/sync/src/auto_sync/batcher_random.rs | 7 +------ node/sync/src/lib.rs | 2 ++ run/config-testnet.toml | 3 +++ run/config.toml | 3 +++ 5 files changed, 18 insertions(+), 22 deletions(-) diff --git a/node/sync/src/auto_sync/batcher.rs b/node/sync/src/auto_sync/batcher.rs index 7b301de..0107413 100644 --- a/node/sync/src/auto_sync/batcher.rs +++ b/node/sync/src/auto_sync/batcher.rs @@ -1,7 +1,7 @@ use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender}; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, sync::Arc}; +use std::{collections::HashSet, fmt::Debug, sync::Arc}; use storage_async::Store; use tokio::sync::RwLock; @@ -17,7 +17,7 @@ pub enum SyncResult { pub struct Batcher { pub(crate) config: Config, capacity: usize, - tasks: Arc>>, // files to sync + tasks: Arc>>, // files to sync store: Store, sync_send: SyncSender, } @@ -33,12 +33,10 @@ impl Batcher { } } - pub async fn len(&self) -> usize { - self.tasks.read().await.len() - } - pub async fn tasks(&self) -> Vec { - self.tasks.read().await.clone() + let mut result: Vec = self.tasks.read().await.iter().copied().collect(); + result.sort(); + result } pub async fn add(&self, tx_seq: u64) -> Result { @@ -54,9 +52,7 @@ impl Batcher { return Ok(false); } - tasks.push(tx_seq); - - Ok(true) + Ok(tasks.insert(tx_seq)) } pub async fn reorg(&self, reverted_tx_seq: u64) { @@ -67,19 +63,16 @@ impl Batcher { pub async fn poll(&self) -> Result> { let mut result = None; let tasks = self.tasks.read().await.clone(); - let mut index = tasks.len(); - for (i, tx_seq) in tasks.iter().enumerate() { + for tx_seq in tasks.iter() { if let Some(ret) = self.poll_tx(*tx_seq).await? { result = Some((*tx_seq, ret)); - index = i; break; } } - let mut tasks = self.tasks.write().await; - if index < tasks.len() { - tasks.swap_remove(index); + if let Some((tx_seq, _)) = &result { + self.tasks.write().await.remove(tx_seq); } Ok(result) diff --git a/node/sync/src/auto_sync/batcher_random.rs b/node/sync/src/auto_sync/batcher_random.rs index 363b826..8d70fb9 100644 --- a/node/sync/src/auto_sync/batcher_random.rs +++ b/node/sync/src/auto_sync/batcher_random.rs @@ -31,8 +31,7 @@ impl RandomBatcher { sync_store: Arc, ) -> Self { Self { - // now, only 1 thread to sync file randomly - batcher: Batcher::new(config, 1, store, sync_send), + batcher: Batcher::new(config, config.max_random_workers, store, sync_send), sync_store, } } @@ -97,10 +96,6 @@ impl RandomBatcher { } async fn schedule(&mut self) -> Result { - if self.batcher.len().await > 0 { - return Ok(false); - } - let tx_seq = match self.sync_store.random_tx().await? { Some(v) => v, None => return Ok(false), diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 911b930..d299a3b 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -50,6 +50,7 @@ pub struct Config { #[serde(deserialize_with = "deserialize_duration")] pub auto_sync_error_interval: Duration, pub max_sequential_workers: usize, + pub max_random_workers: usize, #[serde(deserialize_with = "deserialize_duration")] pub find_peer_timeout: Duration, } @@ -78,6 +79,7 @@ impl Default for Config { auto_sync_idle_interval: Duration::from_secs(3), auto_sync_error_interval: Duration::from_secs(10), max_sequential_workers: 8, + max_random_workers: 4, find_peer_timeout: Duration::from_secs(10), } } diff --git a/run/config-testnet.toml b/run/config-testnet.toml index 6dbf85b..0809152 100644 --- a/run/config-testnet.toml +++ b/run/config-testnet.toml @@ -241,3 +241,6 @@ auto_sync_enabled = true # Maximum threads to sync files in sequence. # max_sequential_workers = 8 + +# Maximum threads to sync files randomly. +# max_random_workers = 4 diff --git a/run/config.toml b/run/config.toml index 540e47e..e58f7e9 100644 --- a/run/config.toml +++ b/run/config.toml @@ -242,6 +242,9 @@ # Maximum threads to sync files in sequence. # max_sequential_workers = 8 +# Maximum threads to sync files randomly. +# max_random_workers = 4 + ####################################################################### ### File Location Cache Options ### #######################################################################