mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
handle NewFile in sync servic to write in db
This commit is contained in:
parent
364640adc8
commit
245ee0d903
@ -15,12 +15,17 @@ use tokio::sync::{
|
||||
|
||||
use crate::{Config, SyncSender};
|
||||
|
||||
use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore};
|
||||
use super::{
|
||||
batcher_random::RandomBatcher,
|
||||
batcher_serial::SerialBatcher,
|
||||
sync_store::{Queue, SyncStore},
|
||||
};
|
||||
|
||||
pub struct AutoSyncManager {
|
||||
pub serial: SerialBatcher,
|
||||
pub random: RandomBatcher,
|
||||
pub file_announcement_send: UnboundedSender<u64>,
|
||||
pub new_file_send: UnboundedSender<u64>,
|
||||
pub catched_up: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
@ -33,10 +38,24 @@ impl AutoSyncManager {
|
||||
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
|
||||
catch_up_end_recv: oneshot::Receiver<()>,
|
||||
) -> Result<Self> {
|
||||
let (send, recv) = unbounded_channel();
|
||||
let (file_announcement_send, file_announcement_recv) = unbounded_channel();
|
||||
let (new_file_send, mut new_file_recv) = unbounded_channel();
|
||||
let sync_store = Arc::new(SyncStore::new(store.clone()));
|
||||
let catched_up = Arc::new(AtomicBool::new(false));
|
||||
|
||||
// handle new file
|
||||
let sync_store_cloned = sync_store.clone();
|
||||
executor.spawn(
|
||||
async move {
|
||||
while let Some(tx_seq) = new_file_recv.recv().await {
|
||||
if let Err(err) = sync_store_cloned.insert(tx_seq, Queue::Ready).await {
|
||||
warn!(?err, %tx_seq, "Failed to insert new file to ready queue");
|
||||
}
|
||||
}
|
||||
},
|
||||
"auto_sync_handle_new_file",
|
||||
);
|
||||
|
||||
// sync in sequence
|
||||
let serial =
|
||||
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
|
||||
@ -44,7 +63,7 @@ impl AutoSyncManager {
|
||||
executor.spawn(
|
||||
serial
|
||||
.clone()
|
||||
.start(recv, log_sync_recv, catched_up.clone()),
|
||||
.start(file_announcement_recv, log_sync_recv, catched_up.clone()),
|
||||
"auto_sync_serial",
|
||||
);
|
||||
|
||||
@ -67,7 +86,8 @@ impl AutoSyncManager {
|
||||
Ok(Self {
|
||||
serial,
|
||||
random,
|
||||
file_announcement_send: send,
|
||||
file_announcement_send,
|
||||
new_file_send,
|
||||
catched_up,
|
||||
})
|
||||
}
|
||||
|
@ -269,7 +269,7 @@ impl SyncService {
|
||||
SyncMessage::AnnounceShardConfig { .. } => {
|
||||
// FIXME: Check if controllers need to be reset?
|
||||
}
|
||||
SyncMessage::NewFile { from, msg } => todo!(),
|
||||
SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await,
|
||||
}
|
||||
}
|
||||
|
||||
@ -756,6 +756,19 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
|
||||
debug!(%from, ?msg, "Received NewFile gossip");
|
||||
|
||||
if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) {
|
||||
// Notify new peer found if file already in sync
|
||||
// TODO qbit: do not require remote address since already TCP connected
|
||||
// controller.on_peer_found(from, addr);
|
||||
controller.transition();
|
||||
} else if let Some(manager) = &self.auto_sync_manager {
|
||||
let _ = manager.new_file_send.send(msg.tx_id.seq);
|
||||
}
|
||||
}
|
||||
|
||||
/// Terminate file sync of `min_tx_seq`.
|
||||
/// If `is_reverted` is `true` (means confirmed transactions reverted),
|
||||
/// also terminate `tx_seq` greater than `min_tx_seq`
|
||||
|
Loading…
Reference in New Issue
Block a user