diff --git a/node/sync/src/auto_sync/manager.rs b/node/sync/src/auto_sync/manager.rs index c880ffb..6a87cdf 100644 --- a/node/sync/src/auto_sync/manager.rs +++ b/node/sync/src/auto_sync/manager.rs @@ -15,12 +15,17 @@ use tokio::sync::{ use crate::{Config, SyncSender}; -use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore}; +use super::{ + batcher_random::RandomBatcher, + batcher_serial::SerialBatcher, + sync_store::{Queue, SyncStore}, +}; pub struct AutoSyncManager { pub serial: SerialBatcher, pub random: RandomBatcher, pub file_announcement_send: UnboundedSender, + pub new_file_send: UnboundedSender, pub catched_up: Arc, } @@ -33,10 +38,24 @@ impl AutoSyncManager { log_sync_recv: broadcast::Receiver, catch_up_end_recv: oneshot::Receiver<()>, ) -> Result { - let (send, recv) = unbounded_channel(); + let (file_announcement_send, file_announcement_recv) = unbounded_channel(); + let (new_file_send, mut new_file_recv) = unbounded_channel(); let sync_store = Arc::new(SyncStore::new(store.clone())); let catched_up = Arc::new(AtomicBool::new(false)); + // handle new file + let sync_store_cloned = sync_store.clone(); + executor.spawn( + async move { + while let Some(tx_seq) = new_file_recv.recv().await { + if let Err(err) = sync_store_cloned.insert(tx_seq, Queue::Ready).await { + warn!(?err, %tx_seq, "Failed to insert new file to ready queue"); + } + } + }, + "auto_sync_handle_new_file", + ); + // sync in sequence let serial = SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) @@ -44,7 +63,7 @@ impl AutoSyncManager { executor.spawn( serial .clone() - .start(recv, log_sync_recv, catched_up.clone()), + .start(file_announcement_recv, log_sync_recv, catched_up.clone()), "auto_sync_serial", ); @@ -67,7 +86,8 @@ impl AutoSyncManager { Ok(Self { serial, random, - file_announcement_send: send, + file_announcement_send, + new_file_send, catched_up, }) } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 56fb896..66c4a48 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -269,7 +269,7 @@ impl SyncService { SyncMessage::AnnounceShardConfig { .. } => { // FIXME: Check if controllers need to be reset? } - SyncMessage::NewFile { from, msg } => todo!(), + SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await, } } @@ -756,6 +756,19 @@ impl SyncService { } } + async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) { + debug!(%from, ?msg, "Received NewFile gossip"); + + if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) { + // Notify new peer found if file already in sync + // TODO qbit: do not require remote address since already TCP connected + // controller.on_peer_found(from, addr); + controller.transition(); + } else if let Some(manager) = &self.auto_sync_manager { + let _ = manager.new_file_send.send(msg.tx_id.seq); + } + } + /// Terminate file sync of `min_tx_seq`. /// If `is_reverted` is `true` (means confirmed transactions reverted), /// also terminate `tx_seq` greater than `min_tx_seq`