From 4654f792b20961283767c2172d702794ecdc5696 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 2 Aug 2024 19:15:54 +0800 Subject: [PATCH] Add auto sync manager to wrap multiple objects --- node/sync/src/auto_sync/manager.rs | 70 ++++++++++++++++++++++++++++++ node/sync/src/auto_sync/mod.rs | 1 + 2 files changed, 71 insertions(+) create mode 100644 node/sync/src/auto_sync/manager.rs diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs new file mode 100644 index 0000000..84e5f62 --- /dev/null +++ b/node/sync/src/auto_sync/manager.rs @@ -0,0 +1,70 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use anyhow::Result; +use log_entry_sync::LogSyncEvent; +use storage_async::Store; +use task_executor::TaskExecutor; +use tokio::sync::{ + broadcast, + mpsc::{unbounded_channel, UnboundedSender}, + oneshot, +}; + +use crate::{Config, SyncSender}; + +use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore}; + +pub struct AutoSyncManager { + pub serial: SerialBatcher, + pub random: RandomBatcher, + pub file_announcement_send: UnboundedSender, +} + +impl AutoSyncManager { + pub async fn spawn( + config: Config, + executor: &TaskExecutor, + store: Store, + sync_send: SyncSender, + log_sync_recv: broadcast::Receiver, + catch_up_end_recv: oneshot::Receiver<()>, + ) -> Result { + let (send, recv) = unbounded_channel(); + let sync_store = Arc::new(SyncStore::new(store.clone())); + let catched_up = Arc::new(AtomicBool::new(false)); + + // sync in sequence + let serial = + SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) + .await?; + executor.spawn( + serial + .clone() + .start(recv, log_sync_recv, catched_up.clone()), + "auto_sync_serial", + ); + + // sync randomly + let random = RandomBatcher::new(config, store.clone(), sync_send.clone(), sync_store); + executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random"); + + // handle on catched up notification + executor.spawn( + async move { + catch_up_end_recv.await.expect("Catch up sender dropped"); + info!("log entry catched up"); + catched_up.store(true, Ordering::Relaxed); + }, + "auto_sync_wait_for_catchup", + ); + + Ok(Self { + serial, + random, + file_announcement_send: send, + }) + } +} diff --git a/node/sync/src/auto_sync/mod.rs b/node/sync/src/auto_sync/mod.rs index 0f1ce41..12b3b62 100644 --- a/node/sync/src/auto_sync/mod.rs +++ b/node/sync/src/auto_sync/mod.rs @@ -1,6 +1,7 @@ mod batcher; pub mod batcher_random; pub mod batcher_serial; +pub mod manager; pub mod sync_store; mod tx_store;