diff --git a/node/sync/src/auto_sync/batcher.rs b/node/sync/src/auto_sync/batcher.rs new file mode 100644 index 0000000..b4ae9da --- /dev/null +++ b/node/sync/src/auto_sync/batcher.rs @@ -0,0 +1,148 @@ +use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender}; +use anyhow::{bail, Ok, Result}; +use std::fmt::Debug; +use storage_async::Store; + +#[derive(Debug)] +pub enum SyncResult { + Completed, + Failed, + Timeout, +} + +/// Supports to sync files concurrently. +pub struct Batcher { + config: Config, + capacity: usize, + tasks: Vec, // files to sync + store: Store, + sync_send: SyncSender, +} + +impl Debug for Batcher { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.tasks) + } +} + +impl Batcher { + pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self { + Self { + config, + capacity, + tasks: Default::default(), + store, + sync_send, + } + } + + pub fn len(&self) -> usize { + self.tasks.len() + } + + pub async fn add(&mut self, tx_seq: u64) -> Result { + // limits the number of threads + if self.tasks.len() >= self.capacity { + return Ok(false); + } + + // requires log entry available before file sync + if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() { + return Ok(false); + } + + self.tasks.push(tx_seq); + + Ok(true) + } + + pub fn reorg(&mut self, reverted_tx_seq: u64) { + self.tasks.retain(|&x| x < reverted_tx_seq); + } + + /// Poll the sync result of any completed file sync. + pub async fn poll(&mut self) -> Result> { + let mut result = None; + let mut index = self.tasks.len(); + + for (i, tx_seq) in self.tasks.iter().enumerate() { + if let Some(ret) = self.poll_tx(*tx_seq).await? { + result = Some((*tx_seq, ret)); + index = i; + break; + } + } + + if index < self.tasks.len() { + self.tasks.swap_remove(index); + } + + Ok(result) + } + + async fn poll_tx(&self, tx_seq: u64) -> Result> { + // file already exists + if self.store.check_tx_completed(tx_seq).await? { + return Ok(Some(SyncResult::Completed)); + } + + // get sync state to handle in advance + let state = match self + .sync_send + .request(SyncRequest::SyncStatus { tx_seq }) + .await? + { + SyncResponse::SyncStatus { status } => status, + _ => bail!("Invalid sync response type"), + }; + trace!(?tx_seq, ?state, "File sync status retrieved"); + + match state { + // start file sync if not launched yet + None => match self + .sync_send + .request(SyncRequest::SyncFile { tx_seq }) + .await? + { + SyncResponse::SyncFile { err } if err.is_empty() => Ok(None), + SyncResponse::SyncFile { err } => bail!("Failed to sync file: {:?}", err), + _ => bail!("Invalid sync response type"), + }, + + // file sync completed + Some(SyncState::Completed) => Ok(Some(SyncResult::Completed)), + + // file sync failed + Some(SyncState::Failed { reason }) => { + debug!(?reason, "Failed to sync file"); + Ok(Some(SyncResult::Failed)) + } + + // file sync timeout + Some(SyncState::FindingPeers { origin, .. }) + if origin.elapsed() > self.config.find_peer_timeout => + { + debug!(%tx_seq, "Terminate file sync due to finding peers timeout"); + self.terminate_file_sync(tx_seq, false).await; + Ok(Some(SyncResult::Timeout)) + } + + // others + _ => Ok(None), + } + } + + pub async fn terminate_file_sync(&self, tx_seq: u64, is_reverted: bool) { + if let Err(err) = self + .sync_send + .request(SyncRequest::TerminateFileSync { + tx_seq, + is_reverted, + }) + .await + { + // just log and go ahead for any error, e.g. timeout + error!(%err, %tx_seq, %is_reverted, "Failed to terminate file sync"); + } + } +} diff --git a/node/sync/src/auto_sync/batcher_random.rs b/node/sync/src/auto_sync/batcher_random.rs new file mode 100644 index 0000000..1967c4f --- /dev/null +++ b/node/sync/src/auto_sync/batcher_random.rs @@ -0,0 +1,84 @@ +use super::{batcher::Batcher, sync_store::SyncStore}; +use crate::{ + auto_sync::{batcher::SyncResult, INTERVAL_ERROR, INTERVAL_IDLE}, + Config, SyncSender, +}; +use anyhow::Result; +use storage_async::Store; +use tokio::time::sleep; + +#[derive(Debug)] +pub struct RandomBatcher { + batcher: Batcher, + sync_store: SyncStore, +} + +impl RandomBatcher { + pub fn new(config: Config, store: Store, sync_send: SyncSender) -> Self { + let sync_store = SyncStore::new(store.clone()); + + Self { + // now, only 1 thread to sync file randomly + batcher: Batcher::new(config, 1, store, sync_send), + sync_store, + } + } + + pub async fn start(mut self) { + info!("Start to sync files"); + + loop { + match self.sync_once().await { + Ok(true) => {} + Ok(false) => { + trace!(?self, "File sync still in progress or idle"); + sleep(INTERVAL_IDLE).await; + } + Err(err) => { + warn!(%err, ?self, "Failed to sync file once"); + sleep(INTERVAL_ERROR).await; + } + } + } + } + + async fn sync_once(&mut self) -> Result { + if self.schedule().await? { + return Ok(true); + } + + // poll any completed file sync + let (tx_seq, sync_result) = match self.batcher.poll().await? { + Some(v) => v, + None => return Ok(false), + }; + + debug!(%tx_seq, ?sync_result, ?self, "Completed to sync file"); + + match sync_result { + SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?, + _ => self.sync_store.downgrade_tx_to_pending(tx_seq).await?, + }; + + Ok(true) + } + + async fn schedule(&mut self) -> Result { + if self.batcher.len() > 0 { + return Ok(false); + } + + let tx_seq = match self.sync_store.random_tx().await? { + Some(v) => v, + None => return Ok(false), + }; + + if !self.batcher.add(tx_seq).await? { + return Ok(false); + } + + debug!(?self, "Pick a file to sync"); + + Ok(true) + } +} diff --git a/node/sync/src/auto_sync/batcher_serial.rs b/node/sync/src/auto_sync/batcher_serial.rs new file mode 100644 index 0000000..a76fb69 --- /dev/null +++ b/node/sync/src/auto_sync/batcher_serial.rs @@ -0,0 +1,248 @@ +use super::{ + batcher::{Batcher, SyncResult}, + sync_store::SyncStore, +}; +use crate::{ + auto_sync::{INTERVAL_ERROR, INTERVAL_IDLE}, + Config, SyncSender, +}; +use anyhow::Result; +use log_entry_sync::LogSyncEvent; +use std::{collections::HashMap, fmt::Debug}; +use storage_async::Store; +use tokio::{ + sync::{broadcast::Receiver, mpsc::UnboundedReceiver}, + time::sleep, +}; + +/// Supports to sync files in sequence concurrently. +pub struct SerialBatcher { + batcher: Batcher, + + /// Next tx seq to sync. + next_tx_seq: u64, + /// Maximum tx seq to sync. + max_tx_seq: u64, + + /// Completed txs that pending to update sync db in sequence. + pending_completed_txs: HashMap, + /// Next tx seq to sync in db, so as to continue file sync from + /// break point when program restarted. + next_tx_seq_in_db: u64, + + sync_store: SyncStore, +} + +impl Debug for SerialBatcher { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let max_tx_seq_desc = if self.max_tx_seq == u64::MAX { + "N/A".into() + } else { + format!("{}", self.max_tx_seq) + }; + + let pendings_desc = if self.pending_completed_txs.len() <= 5 { + format!("{:?}", self.pending_completed_txs) + } else { + format!("{}", self.pending_completed_txs.len()) + }; + + f.debug_struct("SerialBatcher") + .field("batcher", &self.batcher) + .field("next", &self.next_tx_seq) + .field("max", &max_tx_seq_desc) + .field("pendings", &pendings_desc) + .field("next_in_db", &self.next_tx_seq_in_db) + .finish() + } +} + +impl SerialBatcher { + pub async fn new(config: Config, store: Store, sync_send: SyncSender) -> Result { + let capacity = config.max_sequential_workers; + let sync_store = SyncStore::new(store.clone()); + + // continue file sync from break point in db + let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?; + + Ok(Self { + batcher: Batcher::new(config, capacity, store, sync_send), + next_tx_seq: next_tx_seq.unwrap_or(0), + max_tx_seq: max_tx_seq.unwrap_or(u64::MAX), + pending_completed_txs: Default::default(), + next_tx_seq_in_db: next_tx_seq.unwrap_or(0), + sync_store, + }) + } + + pub async fn start( + mut self, + mut file_announcement_recv: UnboundedReceiver, + mut log_sync_recv: Receiver, + ) { + info!(?self, "Start to sync files"); + + loop { + // handle all pending file announcements + if self + .update_file_announcement(&mut file_announcement_recv) + .await + { + continue; + } + + // handle all reorg events + if self.handle_reorg(&mut log_sync_recv).await { + continue; + } + + // sync files + match self.sync_once().await { + Ok(true) => {} + Ok(false) => { + trace!(?self, "File sync still in progress or idle"); + sleep(INTERVAL_IDLE).await; + } + Err(err) => { + warn!(%err, ?self, "Failed to sync file once"); + sleep(INTERVAL_ERROR).await; + } + } + } + } + + async fn update_file_announcement(&mut self, recv: &mut UnboundedReceiver) -> bool { + let announced_tx_seq = match recv.try_recv() { + Ok(v) => v, + Err(_) => return false, + }; + + debug!(%announced_tx_seq, "Received file announcement"); + + // new file announced + if self.max_tx_seq == u64::MAX || announced_tx_seq > self.max_tx_seq { + debug!(%announced_tx_seq, ?self, "Update for new file announcement"); + + if let Err(err) = self.sync_store.set_max_tx_seq(announced_tx_seq).await { + error!(%err, %announced_tx_seq, ?self, "Failed to set max_tx_seq in store"); + } + + self.max_tx_seq = announced_tx_seq; + + return true; + } + + // already wait for sequential sync + if announced_tx_seq >= self.next_tx_seq { + return true; + } + + // otherwise, mark tx as ready for sync + if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await { + error!(%err, %announced_tx_seq, ?self, "Failed to promote announced tx to ready"); + } + + true + } + + async fn handle_reorg(&mut self, recv: &mut Receiver) -> bool { + let reverted_tx_seq = match recv.try_recv() { + Ok(LogSyncEvent::Reverted { tx_seq }) => tx_seq, + _ => return false, + }; + + debug!(%reverted_tx_seq, "Reorg detected"); + + // reorg happened, but no impact on file sync + if reverted_tx_seq >= self.next_tx_seq { + return true; + } + + info!(%reverted_tx_seq, ?self, "Handle reorg"); + + // terminate all files in progress + self.batcher + .terminate_file_sync(reverted_tx_seq, true) + .await; + + // update states + self.batcher.reorg(reverted_tx_seq); + self.next_tx_seq = reverted_tx_seq; + self.pending_completed_txs + .retain(|&k, _| k < reverted_tx_seq); + if self.next_tx_seq_in_db > reverted_tx_seq { + self.next_tx_seq_in_db = reverted_tx_seq; + + if let Err(err) = self + .sync_store + .set_next_tx_seq(self.next_tx_seq_in_db) + .await + { + error!(%err, %reverted_tx_seq, ?self, "Failed to set next tx seq due to tx reverted"); + } + } + + true + } + + async fn sync_once(&mut self) -> Result { + // try to trigger more file sync + if self.schedule_next().await? { + return Ok(true); + } + + // poll any completed file sync + let (tx_seq, sync_result) = match self.batcher.poll().await? { + Some(v) => v, + None => return Ok(false), + }; + + info!(%tx_seq, ?sync_result, ?self, "Completed to sync file"); + self.pending_completed_txs.insert(tx_seq, sync_result); + + // update sync db + self.update_completed_txs_in_db().await?; + + Ok(true) + } + + /// Schedule file sync in sequence. + async fn schedule_next(&mut self) -> Result { + if self.next_tx_seq > self.max_tx_seq { + return Ok(false); + } + + if !self.batcher.add(self.next_tx_seq).await? { + return Ok(false); + } + + self.next_tx_seq += 1; + + info!(?self, "Move forward"); + + Ok(true) + } + + /// Update file sync index in db. + async fn update_completed_txs_in_db(&mut self) -> Result<()> { + while let Some(sync_result) = self.pending_completed_txs.get(&self.next_tx_seq_in_db) { + // downgrade to random sync if file sync failed or timeout + if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) { + self.sync_store + .add_pending_tx(self.next_tx_seq_in_db) + .await?; + } + + // always move forward in db + self.sync_store + .set_next_tx_seq(self.next_tx_seq_in_db + 1) + .await?; + + // update in memory after db updated + self.pending_completed_txs.remove(&self.next_tx_seq_in_db); + self.next_tx_seq_in_db += 1; + } + + Ok(()) + } +} diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs deleted file mode 100644 index 9dd76c6..0000000 --- a/node/sync/src/auto_sync/manager.rs +++ /dev/null @@ -1,677 +0,0 @@ -use super::sync_store::SyncStore; -use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender}; -use anyhow::{bail, Result}; -use log_entry_sync::LogSyncEvent; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; -use std::time::Duration; -use storage_async::Store; -use task_executor::TaskExecutor; -use tokio::sync::broadcast::{error::RecvError, Receiver}; -use tokio::time::sleep; - -const INTERVAL_CATCHUP: Duration = Duration::from_millis(1); -const INTERVAL: Duration = Duration::from_secs(1); -const INTERVAL_ERROR: Duration = Duration::from_secs(10); - -/// Manager to synchronize files among storage nodes automatically. -/// -/// Generally, most files could be synchronized among storage nodes. However, a few -/// files may be unavailable on all storage nodes, e.g. -/// -/// 1. File not uploaded by user in time. -/// 2. File removed due to blockchain reorg, and user do not upload again. -/// -/// So, there are 2 workers to synchronize files in parallel: -/// -/// 1. Synchronize announced files in sequence. If any file unavailable, store it into db. -/// 2. Synchronize the missed files in sequential synchronization. -#[derive(Clone)] -pub struct Manager { - config: Config, - - /// The next `tx_seq` to sync in sequence. - next_tx_seq: Arc, - - /// The maximum `tx_seq` to sync in sequence, `u64::MAX` means unlimited. - /// Generally, it is updated when file announcement received. - max_tx_seq: Arc, - - /// The last reverted transaction seq, `u64::MAX` means no tx reverted. - /// Generally, it is updated when transaction reverted. - reverted_tx_seq: Arc, - - store: Store, - sync_store: SyncStore, - - /// Used to interact with sync service for the current file in sync. - sync_send: SyncSender, -} - -impl Manager { - pub async fn new(store: Store, sync_send: SyncSender, config: Config) -> Result { - let sync_store = SyncStore::new(store.clone()); - - let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?; - let next_tx_seq = next_tx_seq.unwrap_or(0); - let max_tx_seq = max_tx_seq.unwrap_or(u64::MAX); - - Ok(Self { - config, - next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq)), - max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq)), - reverted_tx_seq: Arc::new(AtomicU64::new(u64::MAX)), - store, - sync_store, - sync_send, - }) - } - - pub fn spwn(&self, executor: &TaskExecutor, receiver: Receiver) { - executor.spawn( - self.clone().monitor_reorg(receiver), - "sync_manager_reorg_monitor", - ); - - executor.spawn(self.clone().start_sync(), "sync_manager_sequential_syncer"); - - executor.spawn( - self.clone().start_sync_pending_txs(), - "sync_manager_pending_syncer", - ); - } - - fn set_reverted(&self, tx_seq: u64) -> bool { - if tx_seq >= self.reverted_tx_seq.load(Ordering::Relaxed) { - return false; - } - - self.reverted_tx_seq.store(tx_seq, Ordering::Relaxed); - - true - } - - fn handle_on_reorg(&self) -> Option { - let reverted_tx_seq = self.reverted_tx_seq.load(Ordering::Relaxed); - - // no reorg happened - if reverted_tx_seq == u64::MAX { - return None; - } - - self.reverted_tx_seq.store(u64::MAX, Ordering::Relaxed); - - // reorg happened, but no impact on file sync - let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed); - if reverted_tx_seq > next_tx_seq { - return None; - } - - // handles on reorg - info!(%reverted_tx_seq, %next_tx_seq, "Transaction reverted"); - - // re-sync files from the reverted tx seq - self.next_tx_seq.store(reverted_tx_seq, Ordering::Relaxed); - - Some(next_tx_seq) - } - - pub async fn update_on_announcement(&self, announced_tx_seq: u64) { - let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed); - let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed); - debug!(%next_tx_seq, %max_tx_seq, %announced_tx_seq, "Update for new announcement"); - - // new file announced - if max_tx_seq == u64::MAX || announced_tx_seq > max_tx_seq { - match self.sync_store.set_max_tx_seq(announced_tx_seq).await { - Ok(()) => self.max_tx_seq.store(announced_tx_seq, Ordering::Relaxed), - Err(e) => error!(%e, "Failed to set max_tx_seq in store"), - }; - return; - } - - // already wait for sequential sync - if announced_tx_seq >= next_tx_seq { - return; - } - - // otherwise, mark tx as ready for sync - if let Err(e) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await { - error!(%e, "Failed to promote announced tx to ready"); - } - } - - async fn move_forward(&self, pending: bool) -> Result { - let tx_seq = self.next_tx_seq.load(Ordering::Relaxed); - let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed); - if tx_seq > max_tx_seq { - return Ok(false); - } - - // put the tx into pending list - if pending && self.sync_store.add_pending_tx(tx_seq).await? { - debug!(%tx_seq, "Pending tx added"); - } - - let next_tx_seq = tx_seq + 1; - self.sync_store.set_next_tx_seq(next_tx_seq).await?; - self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed); - - debug!(%next_tx_seq, %max_tx_seq, "Move forward"); - - Ok(true) - } - - /// Returns whether file sync in progress but no peers found - async fn sync_tx(&self, tx_seq: u64) -> Result { - // tx not available yet - if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() { - return Ok(false); - } - - // get sync state to handle in advance - let state = match self - .sync_send - .request(SyncRequest::SyncStatus { tx_seq }) - .await? - { - SyncResponse::SyncStatus { status } => status, - _ => bail!("Invalid sync response type"), - }; - trace!(?tx_seq, ?state, "sync_tx tx status"); - - // notify service to sync file if not started or failed - if matches!(state, None | Some(SyncState::Failed { .. })) { - match self - .sync_send - .request(SyncRequest::SyncFile { tx_seq }) - .await? - { - SyncResponse::SyncFile { err } if err.is_empty() => return Ok(false), - SyncResponse::SyncFile { err } => bail!("Failed to sync file: {:?}", err), - _ => bail!("Invalid sync response type"), - } - } - - if matches!(state, Some(SyncState::FindingPeers { origin, .. }) if origin.elapsed() > self.config.find_peer_timeout) - { - // no peers found for a long time - self.terminate_file_sync(tx_seq, false).await; - info!(%tx_seq, "Terminate file sync due to finding peers timeout"); - Ok(true) - } else { - // otherwise, continue to wait for file sync that already in progress - Ok(false) - } - } - - async fn terminate_file_sync(&self, tx_seq: u64, is_reverted: bool) { - if let Err(err) = self - .sync_send - .request(SyncRequest::TerminateFileSync { - tx_seq, - is_reverted, - }) - .await - { - // just log and go ahead for any error, e.g. timeout - error!(%err, %tx_seq, "Failed to terminate file sync"); - } - } - - /// Starts to monitor reorg and handle on transaction reverted. - async fn monitor_reorg(self, mut receiver: Receiver) { - info!("Start to monitor reorg"); - - loop { - match receiver.recv().await { - Ok(LogSyncEvent::ReorgDetected { .. }) => {} - Ok(LogSyncEvent::Reverted { tx_seq }) => { - // requires to re-sync files since transaction and files removed in storage - self.set_reverted(tx_seq); - } - Ok(LogSyncEvent::TxSynced { .. }) => {} //No need to handle synced tx in reorg - Err(RecvError::Closed) => { - // program terminated - info!("Completed to monitor reorg"); - return; - } - Err(RecvError::Lagged(lagged)) => { - // Generally, such error should not happen since confirmed block - // reorg rarely happen, and the buffer size of broadcast channel - // is big enough. - error!(%lagged, "Failed to receive reverted tx (Lagged)"); - } - } - } - } - - /// Starts to synchronize files in sequence. - async fn start_sync(self) { - info!( - "Start to sync files periodically, next = {}, max = {}", - self.next_tx_seq.load(Ordering::Relaxed), - self.max_tx_seq.load(Ordering::Relaxed) - ); - - loop { - // handles reorg before file sync - if let Some(tx_seq) = self.handle_on_reorg() { - // request sync service to terminate the file sync immediately - self.terminate_file_sync(tx_seq, true).await; - } - - // sync file - let sync_result = self.sync_once().await; - let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed); - match sync_result { - Ok(true) => { - debug!(%next_tx_seq, "Completed to sync file"); - sleep(INTERVAL_CATCHUP).await; - } - Ok(false) => { - trace!(%next_tx_seq, "File in sync or log entry unavailable"); - sleep(INTERVAL).await; - } - Err(err) => { - warn!(%err, %next_tx_seq, "Failed to sync file"); - sleep(INTERVAL_ERROR).await; - } - } - } - } - - async fn sync_once(&self) -> Result { - // already sync to the latest file - let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed); - if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) { - return Ok(false); - } - - // already finalized - if self.store.check_tx_completed(next_tx_seq).await? { - self.move_forward(false).await?; - return Ok(true); - } - - // try sync tx - let no_peer_timeout = self.sync_tx(next_tx_seq).await?; - - // put tx to pending list if no peers found for a long time - if no_peer_timeout { - self.move_forward(true).await?; - } - - Ok(no_peer_timeout) - } - - /// Starts to synchronize pending files that unavailable during sequential synchronization. - async fn start_sync_pending_txs(self) { - info!("Start to sync pending files"); - - let mut tx_seq = 0; - let mut next = true; - - loop { - if next { - match self.sync_store.random_tx().await { - Ok(Some(seq)) => { - tx_seq = seq; - debug!(%tx_seq, "Start to sync pending file"); - } - Ok(None) => { - trace!("No pending file to sync"); - sleep(INTERVAL).await; - continue; - } - Err(err) => { - warn!(%err, "Failed to pick pending file to sync"); - sleep(INTERVAL_ERROR).await; - continue; - } - } - } - - match self.sync_pending_tx(tx_seq).await { - Ok(true) => { - debug!(%tx_seq, "Completed to sync pending file"); - sleep(INTERVAL_CATCHUP).await; - next = true; - } - Ok(false) => { - trace!(%tx_seq, "Pending file in sync or tx unavailable"); - sleep(INTERVAL).await; - next = false; - } - Err(err) => { - warn!(%err, %tx_seq, "Failed to sync pending file"); - sleep(INTERVAL_ERROR).await; - next = false; - } - } - } - } - - async fn sync_pending_tx(&self, tx_seq: u64) -> Result { - // already finalized - if self.store.check_tx_completed(tx_seq).await? { - self.sync_store.remove_tx(tx_seq).await?; - return Ok(true); - } - - // try sync tx - let no_peer_timeout = self.sync_tx(tx_seq).await?; - - // downgrade if no peers found for a long time - if no_peer_timeout && self.sync_store.downgrade_tx_to_pending(tx_seq).await? { - debug!(%tx_seq, "No peers found for pending file and downgraded"); - } - - Ok(no_peer_timeout) - } -} - -#[cfg(test)] -mod tests { - use std::{ - ops::Sub, - sync::atomic::Ordering, - time::{Duration, Instant}, - }; - - use channel::{test_util::TestReceiver, Channel}; - use tokio::sync::mpsc::error::TryRecvError; - - use crate::{ - auto_sync::sync_store::SyncStore, - controllers::SyncState, - test_util::{create_2_store, tests::TestStoreRuntime}, - Config, SyncMessage, SyncRequest, SyncResponse, - }; - - use super::Manager; - - async fn new_manager( - runtime: &TestStoreRuntime, - next_tx_seq: u64, - max_tx_seq: u64, - ) -> ( - Manager, - TestReceiver, - ) { - let sync_store = SyncStore::new(runtime.store.clone()); - sync_store.set_next_tx_seq(next_tx_seq).await.unwrap(); - if max_tx_seq < u64::MAX { - sync_store.set_max_tx_seq(max_tx_seq).await.unwrap(); - } - - let (sync_send, sync_recv) = Channel::unbounded(); - let manager = Manager::new(runtime.store.clone(), sync_send, Config::default()) - .await - .unwrap(); - (manager, sync_recv.into()) - } - - #[tokio::test] - async fn test_manager_init_values() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await; - - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12); - assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX); - } - - #[tokio::test] - async fn test_manager_set_reverted() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await; - - // reverted to tx 5 - assert!(manager.set_reverted(5)); - assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), 5); - - // no effect if tx 6 reverted again - assert!(!manager.set_reverted(6)); - assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), 5); - - // overwrite tx 5 if tx 3 reverted - assert!(manager.set_reverted(3)); - assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), 3); - } - - #[tokio::test] - async fn test_manager_handle_reorg() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await; - - // no effect if not reverted - assert_eq!(manager.handle_on_reorg(), None); - assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4); - - // tx 5 reverted, but sync in future - assert!(manager.set_reverted(5)); - assert_eq!(manager.handle_on_reorg(), None); - assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4); - - // tx 3 reverted, should terminate tx 4 and re-sync files since tx 3 - assert!(manager.set_reverted(3)); - assert_eq!(manager.handle_on_reorg(), Some(4)); - assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 3); - } - - #[tokio::test] - async fn test_manager_update_on_announcement() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await; - - // no effect if tx 10 announced - manager.update_on_announcement(10).await; - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12); - - // `max_tx_seq` enlarged if tx 20 announced - manager.update_on_announcement(20).await; - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 20); - - // no effect if announced for a non-pending tx - manager.update_on_announcement(2).await; - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 20); - assert_eq!(manager.sync_store.random_tx().await.unwrap(), None); - - // pending tx upgraded if announcement received - assert!(manager.sync_store.add_pending_tx(1).await.unwrap()); - assert!(manager.sync_store.add_pending_tx(2).await.unwrap()); - manager.update_on_announcement(2).await; - assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(2)); - } - - #[tokio::test] - async fn test_manager_move_forward() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await; - - // move forward from 4 to 5 - assert!(manager.move_forward(false).await.unwrap()); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 5); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12); - assert_eq!( - manager.sync_store.get_tx_seq_range().await.unwrap(), - (Some(5), Some(12)) - ); - - // move forward and add tx 5 to pending list - assert!(manager.move_forward(true).await.unwrap()); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 6); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12); - assert_eq!( - manager.sync_store.get_tx_seq_range().await.unwrap(), - (Some(6), Some(12)) - ); - assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(5)); - } - - #[tokio::test] - async fn test_manager_move_forward_failed() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 5, 5).await; - - // 5 -> 6 - assert!(manager.move_forward(false).await.unwrap()); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 6); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 5); - - // cannot move forward anymore - assert!(!manager.move_forward(false).await.unwrap()); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 6); - assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 5); - assert_eq!( - manager.sync_store.get_tx_seq_range().await.unwrap(), - (Some(6), Some(5)) - ); - } - - #[tokio::test] - async fn test_manager_sync_tx_unavailable() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await; - - assert!(!manager.sync_tx(4).await.unwrap()); - } - - #[tokio::test] - async fn test_manager_sync_tx_status_none() { - let (_, store, _, _) = create_2_store(vec![1314, 1324]); - let runtime = TestStoreRuntime::new(store); - let (manager, mut sync_recv) = new_manager(&runtime, 1, 5).await; - - let (_, sync_result) = tokio::join!( - sync_recv.expect_responses(vec![ - SyncResponse::SyncStatus { status: None }, - // cause to file sync started - SyncResponse::SyncFile { err: String::new() }, - ]), - manager.sync_tx(1) - ); - assert!(!sync_result.unwrap()); - assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty))); - } - - #[tokio::test] - async fn test_manager_sync_tx_in_progress() { - let (_, store, _, _) = create_2_store(vec![1314, 1324]); - let runtime = TestStoreRuntime::new(store); - let (manager, mut sync_recv) = new_manager(&runtime, 1, 5).await; - - let (_, sync_result) = tokio::join!( - // unnecessary to start file sync again - sync_recv.expect_response(SyncResponse::SyncStatus { - status: Some(SyncState::ConnectingPeers) - }), - manager.sync_tx(1) - ); - assert!(!sync_result.unwrap()); - assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty))); - } - - async fn expect_no_peer_found( - sync_recv: &mut TestReceiver, - ) { - let responses = vec![ - // no peers for file sync for a long time - SyncResponse::SyncStatus { - status: Some(SyncState::FindingPeers { - origin: Instant::now().sub(Duration::from_secs(10000)), - since: Instant::now(), - }), - }, - // required to terminate the file sync - SyncResponse::TerminateFileSync { count: 1 }, - ]; - sync_recv.expect_responses(responses).await - } - - #[tokio::test] - async fn test_manager_sync_tx_no_peer_found() { - let (_, store, _, _) = create_2_store(vec![1314, 1324]); - let runtime = TestStoreRuntime::new(store); - let (manager, mut sync_recv) = new_manager(&runtime, 1, 5).await; - - let (_, sync_result) = - tokio::join!(expect_no_peer_found(&mut sync_recv), manager.sync_tx(1)); - assert!(sync_result.unwrap()); - assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty))); - } - - #[tokio::test] - async fn test_manager_sync_once_already_latest() { - let runtime = TestStoreRuntime::default(); - let (manager, _sync_recv) = new_manager(&runtime, 6, 5).await; - - assert!(!manager.sync_once().await.unwrap()); - } - - #[tokio::test] - async fn test_manager_sync_once_finalized() { - let (_, store, _, _) = create_2_store(vec![1314, 1324]); - let runtime = TestStoreRuntime::new(store); - let (manager, _sync_recv) = new_manager(&runtime, 1, 5).await; - - assert!(manager.sync_once().await.unwrap()); - assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 2); - assert_eq!(manager.sync_store.random_tx().await.unwrap(), None); - } - - #[tokio::test] - async fn test_manager_sync_once_no_peer_found() { - let (store, _, _, _) = create_2_store(vec![1314]); - let runtime = TestStoreRuntime::new(store); - let (manager, mut sync_recv) = new_manager(&runtime, 0, 5).await; - - let (_, sync_result) = - tokio::join!(expect_no_peer_found(&mut sync_recv), manager.sync_once(),); - assert!(sync_result.unwrap()); - assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty))); - assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(0)); - } - - #[tokio::test] - async fn test_manager_sync_pending_tx_finalized() { - let (_, store, _, _) = create_2_store(vec![1314, 1324]); - let runtime = TestStoreRuntime::new(store); - let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await; - - assert!(manager.sync_store.add_pending_tx(0).await.unwrap()); - assert!(manager.sync_store.add_pending_tx(1).await.unwrap()); - - assert!(manager.sync_pending_tx(1).await.unwrap()); - assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(0)); - assert!(manager.sync_store.add_pending_tx(1).await.unwrap()); - } - - #[tokio::test] - async fn test_manager_sync_pending_tx_no_peer_found() { - let (store, _, _, _) = create_2_store(vec![1314, 1324]); - let runtime = TestStoreRuntime::new(store); - let (manager, mut sync_recv) = new_manager(&runtime, 4, 12).await; - - assert!(manager.sync_store.add_pending_tx(0).await.unwrap()); - assert!(manager.sync_store.add_pending_tx(1).await.unwrap()); - assert!(manager.sync_store.upgrade_tx_to_ready(1).await.unwrap()); - - let (_, sync_result) = tokio::join!( - expect_no_peer_found(&mut sync_recv), - manager.sync_pending_tx(1), - ); - assert!(sync_result.unwrap()); - assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty))); - assert!(manager.sync_store.upgrade_tx_to_ready(1).await.unwrap()); - } -} diff --git a/node/sync/src/auto_sync/mod.rs b/node/sync/src/auto_sync/mod.rs index abf577c..cb1cfef 100644 --- a/node/sync/src/auto_sync/mod.rs +++ b/node/sync/src/auto_sync/mod.rs @@ -1,5 +1,10 @@ -mod manager; +mod batcher; +pub mod batcher_random; +pub mod batcher_serial; mod sync_store; mod tx_store; -pub use manager::Manager as AutoSyncManager; +use std::time::Duration; + +const INTERVAL_IDLE: Duration = Duration::from_secs(3); +const INTERVAL_ERROR: Duration = Duration::from_secs(10); diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index e483d8c..f42f4a3 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -1,6 +1,6 @@ use super::tx_store::TxStore; use anyhow::Result; - +use std::fmt::Debug; use storage::log_store::config::{ConfigTx, ConfigurableExt}; use storage_async::Store; @@ -19,6 +19,27 @@ pub struct SyncStore { ready_txs: TxStore, } +impl Debug for SyncStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let store = self.store.get_store(); + + let pendings = match self.pending_txs.count(store) { + Ok(count) => format!("{}", count), + Err(err) => format!("Err: {:?}", err), + }; + + let ready = match self.ready_txs.count(store) { + Ok(count) => format!("{}", count), + Err(err) => format!("Err: {:?}", err), + }; + + f.debug_struct("SyncStore") + .field("pending_txs", &pendings) + .field("ready_txs", &ready) + .finish() + } +} + impl SyncStore { pub fn new(store: Store) -> Self { Self { diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index e6c2d71..a6154df 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -1,7 +1,7 @@ #[macro_use] extern crate tracing; -mod auto_sync; +pub mod auto_sync; mod context; mod controllers; mod service; @@ -18,10 +18,13 @@ use std::time::Duration; pub struct Config { pub auto_sync_enabled: bool, pub max_sync_files: usize, - #[serde(deserialize_with = "deserialize_duration")] - pub find_peer_timeout: Duration, pub sync_file_by_rpc_enabled: bool, pub sync_file_on_announcement_enabled: bool, + + // auto_sync config + pub max_sequential_workers: usize, + #[serde(deserialize_with = "deserialize_duration")] + pub find_peer_timeout: Duration, } impl Default for Config { @@ -29,9 +32,11 @@ impl Default for Config { Self { auto_sync_enabled: false, max_sync_files: 8, - find_peer_timeout: Duration::from_secs(10), sync_file_by_rpc_enabled: true, sync_file_on_announcement_enabled: false, + + max_sequential_workers: 8, + find_peer_timeout: Duration::from_secs(10), } } } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index ec3a98e..3c37d33 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1,4 +1,5 @@ -use crate::auto_sync::AutoSyncManager; +use crate::auto_sync::batcher_random::RandomBatcher; +use crate::auto_sync::batcher_serial::SerialBatcher; use crate::context::SyncNetworkContext; use crate::controllers::{ FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState, @@ -23,6 +24,7 @@ use storage::config::ShardConfig; use storage::error::Result as StorageResult; use storage::log_store::Store as LogStore; use storage_async::Store; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::{broadcast, mpsc}; const HEARTBEAT_INTERVAL_SEC: u64 = 5; @@ -122,7 +124,7 @@ pub struct SyncService { /// Heartbeat interval for executing periodic tasks. heartbeat: tokio::time::Interval, - manager: AutoSyncManager, + file_announcement_send: Option>, } impl SyncService { @@ -159,10 +161,23 @@ impl SyncService { let store = Store::new(store, executor.clone()); - let manager = AutoSyncManager::new(store.clone(), sync_send.clone(), config).await?; - if config.auto_sync_enabled { - manager.spwn(&executor, event_recv); - } + // init auto sync + let file_announcement_send = if config.auto_sync_enabled { + let (send, recv) = unbounded_channel(); + + // sync in sequence + let serial_batcher = + SerialBatcher::new(config, store.clone(), sync_send.clone()).await?; + executor.spawn(serial_batcher.start(recv, event_recv), "auto_sync_serial"); + + // sync randomly + let random_batcher = RandomBatcher::new(config, store.clone(), sync_send.clone()); + executor.spawn(random_batcher.start(), "auto_sync_random"); + + Some(send) + } else { + None + }; let mut sync = SyncService { config, @@ -172,7 +187,7 @@ impl SyncService { file_location_cache, controllers: Default::default(), heartbeat, - manager, + file_announcement_send, }; info!("Starting sync service"); @@ -606,7 +621,9 @@ impl SyncService { let tx_seq = tx_id.seq; debug!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip"); - self.manager.update_on_announcement(tx_seq).await; + if let Some(send) = &self.file_announcement_send { + let _ = send.send(tx_seq); + } // File already in sync if let Some(controller) = self.controllers.get_mut(&tx_seq) { @@ -827,12 +844,9 @@ mod tests { create_file_location_cache(init_peer_id, vec![txs[0].id()]); let (network_send, mut network_recv) = mpsc::unbounded_channel::(); - let (sync_send, sync_recv) = channel::Channel::unbounded(); + let (_, sync_recv) = channel::Channel::unbounded(); let heartbeat = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SEC)); - let manager = AutoSyncManager::new(store.clone(), sync_send, Config::default()) - .await - .unwrap(); let mut sync = SyncService { config: Config::default(), @@ -842,7 +856,7 @@ mod tests { file_location_cache, controllers: Default::default(), heartbeat, - manager, + file_announcement_send: None, }; sync.on_peer_connected(init_peer_id); @@ -862,12 +876,9 @@ mod tests { create_file_location_cache(init_peer_id, vec![txs[0].id()]); let (network_send, mut network_recv) = mpsc::unbounded_channel::(); - let (sync_send, sync_recv) = channel::Channel::unbounded(); + let (_, sync_recv) = channel::Channel::unbounded(); let heartbeat = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SEC)); - let manager = AutoSyncManager::new(store.clone(), sync_send, Config::default()) - .await - .unwrap(); let mut sync = SyncService { config: Config::default(), @@ -877,7 +888,7 @@ mod tests { file_location_cache, controllers: Default::default(), heartbeat, - manager, + file_announcement_send: None, }; sync.on_peer_disconnected(init_peer_id); diff --git a/run/config.toml b/run/config.toml index bc15b42..a799577 100644 --- a/run/config.toml +++ b/run/config.toml @@ -237,3 +237,6 @@ miner_key = "" # Enable to start a file sync automatically when a file announcement P2P message received. # sync_file_on_announcement_enabled = false + +# Maximum threads to sync files in sequence. +# max_sequential_workers = 8