From ceb165d79b33b268812dfbd8dc92142cd2794894 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 24 Oct 2024 15:30:07 +0800 Subject: [PATCH] Disable sequential sync and store new file in v2 sync store --- node/sync/src/auto_sync/batcher_random.rs | 13 ++-- node/sync/src/auto_sync/manager.rs | 75 +++++++++++++---------- node/sync/src/auto_sync/sync_store.rs | 8 +++ node/sync/src/service.rs | 5 +- 4 files changed, 62 insertions(+), 39 deletions(-) diff --git a/node/sync/src/auto_sync/batcher_random.rs b/node/sync/src/auto_sync/batcher_random.rs index 16cd5f2..3f872a6 100644 --- a/node/sync/src/auto_sync/batcher_random.rs +++ b/node/sync/src/auto_sync/batcher_random.rs @@ -59,14 +59,13 @@ impl RandomBatcher { pub async fn start(mut self, catched_up: Arc) { info!("Start to sync files"); - loop { - // disable file sync until catched up - if !catched_up.load(Ordering::Relaxed) { - trace!("Cannot sync file in catch-up phase"); - sleep(self.config.auto_sync_idle_interval).await; - continue; - } + // wait for log entry sync catched up + while !catched_up.load(Ordering::Relaxed) { + trace!("Cannot sync file in catch-up phase"); + sleep(self.config.auto_sync_idle_interval).await; + } + loop { if let Ok(state) = self.get_state().await { metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64); metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64); diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index 6a87cdf..187f6af 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -9,7 +9,7 @@ use storage_async::Store; use task_executor::TaskExecutor; use tokio::sync::{ broadcast, - mpsc::{unbounded_channel, UnboundedSender}, + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }; @@ -22,7 +22,7 @@ use super::{ }; pub struct AutoSyncManager { - pub serial: SerialBatcher, + pub serial: Option, pub random: RandomBatcher, pub file_announcement_send: UnboundedSender, pub new_file_send: UnboundedSender, @@ -35,60 +35,73 @@ impl AutoSyncManager { executor: &TaskExecutor, store: Store, sync_send: SyncSender, - log_sync_recv: broadcast::Receiver, + _log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { - let (file_announcement_send, file_announcement_recv) = unbounded_channel(); - let (new_file_send, mut new_file_recv) = unbounded_channel(); - let sync_store = Arc::new(SyncStore::new(store.clone())); + let (file_announcement_send, _file_announcement_recv) = unbounded_channel(); + let (new_file_send, new_file_recv) = unbounded_channel(); + // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors + let sync_store = Arc::new(SyncStore::new_with_name( + store.clone(), + "pendingv2", + "readyv2", + )); let catched_up = Arc::new(AtomicBool::new(false)); // handle new file - let sync_store_cloned = sync_store.clone(); executor.spawn( - async move { - while let Some(tx_seq) = new_file_recv.recv().await { - if let Err(err) = sync_store_cloned.insert(tx_seq, Queue::Ready).await { - warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); - } - } - }, + Self::handle_new_file(new_file_recv, sync_store.clone()), "auto_sync_handle_new_file", ); // sync in sequence - let serial = - SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) - .await?; - executor.spawn( - serial - .clone() - .start(file_announcement_recv, log_sync_recv, catched_up.clone()), - "auto_sync_serial", - ); + // let serial = + // SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) + // .await?; + // executor.spawn( + // serial + // .clone() + // .start(file_announcement_recv, log_sync_recv, catched_up.clone()), + // "auto_sync_serial", + // ); // sync randomly let random = RandomBatcher::new(config, store, sync_send, sync_store); executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random"); // handle on catched up notification - let catched_up_cloned = catched_up.clone(); executor.spawn( - async move { - if catch_up_end_recv.await.is_ok() { - info!("log entry catched up"); - catched_up_cloned.store(true, Ordering::Relaxed); - } - }, + Self::listen_catch_up(catch_up_end_recv, catched_up.clone()), "auto_sync_wait_for_catchup", ); Ok(Self { - serial, + serial: None, random, file_announcement_send, new_file_send, catched_up, }) } + + async fn handle_new_file( + mut new_file_recv: UnboundedReceiver, + sync_store: Arc, + ) { + while let Some(tx_seq) = new_file_recv.recv().await { + if let Err(err) = sync_store.insert(tx_seq, Queue::Ready).await { + warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); + } + } + } + + async fn listen_catch_up( + catch_up_end_recv: oneshot::Receiver<()>, + catched_up: Arc, + ) { + if catch_up_end_recv.await.is_ok() { + info!("log entry catched up"); + catched_up.store(true, Ordering::Relaxed); + } + } } diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index 796dc1a..825b858 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -42,6 +42,14 @@ impl SyncStore { } } + pub fn new_with_name(store: Store, pending: &'static str, ready: &'static str) -> Self { + Self { + store: Arc::new(RwLock::new(store)), + pending_txs: TxStore::new(pending), + ready_txs: TxStore::new(ready), + } + } + /// Returns the number of pending txs and ready txs. pub async fn stat(&self) -> Result<(usize, usize)> { let async_store = self.store.read().await; diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 66c4a48..d7de9c0 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -284,7 +284,10 @@ impl SyncService { Some(manager) => SyncServiceState { num_syncing: self.controllers.len(), catched_up: Some(manager.catched_up.load(Ordering::Relaxed)), - auto_sync_serial: Some(manager.serial.get_state().await), + auto_sync_serial: match &manager.serial { + Some(v) => Some(v.get_state().await), + None => None, + }, auto_sync_random: manager.random.get_state().await.ok(), }, None => SyncServiceState {