From 98c62b314e0140941cc02810ad6efa10a241031a Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 25 Oct 2024 12:23:08 +0800 Subject: [PATCH] fix random test failure --- node/sync/src/auto_sync/manager.rs | 46 ++++++++++++++++++------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index 187f6af..68e3da9 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -35,17 +35,21 @@ impl AutoSyncManager { executor: &TaskExecutor, store: Store, sync_send: SyncSender, - _log_sync_recv: broadcast::Receiver, + log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { - let (file_announcement_send, _file_announcement_recv) = unbounded_channel(); + let (file_announcement_send, file_announcement_recv) = unbounded_channel(); let (new_file_send, new_file_recv) = unbounded_channel(); - // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors - let sync_store = Arc::new(SyncStore::new_with_name( - store.clone(), - "pendingv2", - "readyv2", - )); + let sync_store = if config.neighbors_only { + // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors + Arc::new(SyncStore::new_with_name( + store.clone(), + "pendingv2", + "readyv2", + )) + } else { + Arc::new(SyncStore::new(store.clone())) + }; let catched_up = Arc::new(AtomicBool::new(false)); // handle new file @@ -55,15 +59,21 @@ impl AutoSyncManager { ); // sync in sequence - // let serial = - // SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) - // .await?; - // executor.spawn( - // serial - // .clone() - // .start(file_announcement_recv, log_sync_recv, catched_up.clone()), - // "auto_sync_serial", - // ); + let serial = if config.neighbors_only { + None + } else { + let serial = + SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) + .await?; + executor.spawn( + serial + .clone() + .start(file_announcement_recv, log_sync_recv, catched_up.clone()), + "auto_sync_serial", + ); + + Some(serial) + }; // sync randomly let random = RandomBatcher::new(config, store, sync_send, sync_store); @@ -76,7 +86,7 @@ impl AutoSyncManager { ); Ok(Self { - serial: None, + serial, random, file_announcement_send, new_file_send,