From 0edd63e715da7642ee04efc4375e4e0c19eb8a83 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Fri, 2 Aug 2024 19:15:17 +0800 Subject: [PATCH] Refactor auto sync to return sync state --- node/sync/src/auto_sync/batcher.rs | 53 ++++---- node/sync/src/auto_sync/batcher_random.rs | 45 ++++--- node/sync/src/auto_sync/batcher_serial.rs | 149 +++++++++++++--------- 3 files changed, 146 insertions(+), 101 deletions(-) diff --git a/node/sync/src/auto_sync/batcher.rs b/node/sync/src/auto_sync/batcher.rs index 847048d..0fcea06 100644 --- a/node/sync/src/auto_sync/batcher.rs +++ b/node/sync/src/auto_sync/batcher.rs @@ -1,9 +1,11 @@ use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender}; use anyhow::{bail, Result}; -use std::fmt::Debug; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, sync::Arc}; use storage_async::Store; +use tokio::sync::RwLock; -#[derive(Debug)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum SyncResult { Completed, Failed, @@ -11,20 +13,15 @@ pub enum SyncResult { } /// Supports to sync files concurrently. +#[derive(Clone)] pub struct Batcher { config: Config, capacity: usize, - tasks: Vec, // files to sync + tasks: Arc>>, // files to sync store: Store, sync_send: SyncSender, } -impl Debug for Batcher { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.tasks) - } -} - impl Batcher { pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self { Self { @@ -36,36 +33,43 @@ impl Batcher { } } - pub fn len(&self) -> usize { - self.tasks.len() + pub async fn len(&self) -> usize { + self.tasks.read().await.len() } - pub async fn add(&mut self, tx_seq: u64) -> Result { - // limits the number of threads - if self.tasks.len() >= self.capacity { - return Ok(false); - } + pub async fn tasks(&self) -> Vec { + self.tasks.read().await.clone() + } + pub async fn add(&self, tx_seq: u64) -> Result { // requires log entry available before file sync if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() { return Ok(false); } - self.tasks.push(tx_seq); + let mut tasks = self.tasks.write().await; + + // limits the number of threads + if tasks.len() >= self.capacity { + return Ok(false); + } + + tasks.push(tx_seq); Ok(true) } - pub fn reorg(&mut self, reverted_tx_seq: u64) { - self.tasks.retain(|&x| x < reverted_tx_seq); + pub async fn reorg(&self, reverted_tx_seq: u64) { + self.tasks.write().await.retain(|&x| x < reverted_tx_seq); } /// Poll the sync result of any completed file sync. - pub async fn poll(&mut self) -> Result> { + pub async fn poll(&self) -> Result> { let mut result = None; - let mut index = self.tasks.len(); + let tasks = self.tasks.read().await.clone(); + let mut index = tasks.len(); - for (i, tx_seq) in self.tasks.iter().enumerate() { + for (i, tx_seq) in tasks.iter().enumerate() { if let Some(ret) = self.poll_tx(*tx_seq).await? { result = Some((*tx_seq, ret)); index = i; @@ -73,8 +77,9 @@ impl Batcher { } } - if index < self.tasks.len() { - self.tasks.swap_remove(index); + let mut tasks = self.tasks.write().await; + if index < tasks.len() { + tasks.swap_remove(index); } Ok(result) diff --git a/node/sync/src/auto_sync/batcher_random.rs b/node/sync/src/auto_sync/batcher_random.rs index ebedc19..e1a8f92 100644 --- a/node/sync/src/auto_sync/batcher_random.rs +++ b/node/sync/src/auto_sync/batcher_random.rs @@ -4,6 +4,7 @@ use crate::{ Config, SyncSender, }; use anyhow::Result; +use serde::{Deserialize, Serialize}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -11,6 +12,15 @@ use std::sync::{ use storage_async::Store; use tokio::time::sleep; +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RandomBatcherState { + pub tasks: Vec, + pub pending_txs: usize, + pub ready_txs: usize, +} + +#[derive(Clone)] pub struct RandomBatcher { batcher: Batcher, sync_store: Arc, @@ -30,6 +40,16 @@ impl RandomBatcher { } } + pub async fn get_state(&self) -> Result { + let (pending_txs, ready_txs) = self.sync_store.stat().await?; + + Ok(RandomBatcherState { + tasks: self.batcher.tasks().await, + pending_txs, + ready_txs, + }) + } + pub async fn start(mut self, catched_up: Arc) { info!("Start to sync files"); @@ -45,13 +65,13 @@ impl RandomBatcher { Ok(true) => {} Ok(false) => { trace!( - "File sync still in progress or idle, {:?}", - self.stat().await + "File sync still in progress or idle, state = {:?}", + self.get_state().await ); sleep(INTERVAL_IDLE).await; } Err(err) => { - warn!(%err, "Failed to sync file once, {:?}", self.stat().await); + warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await); sleep(INTERVAL_ERROR).await; } } @@ -69,7 +89,7 @@ impl RandomBatcher { None => return Ok(false), }; - debug!(%tx_seq, ?sync_result, "Completed to sync file, {:?}", self.stat().await); + debug!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await); match sync_result { SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?, @@ -80,7 +100,7 @@ impl RandomBatcher { } async fn schedule(&mut self) -> Result { - if self.batcher.len() > 0 { + if self.batcher.len().await > 0 { return Ok(false); } @@ -93,21 +113,8 @@ impl RandomBatcher { return Ok(false); } - debug!("Pick a file to sync, {:?}", self.stat().await); + debug!("Pick a file to sync, state = {:?}", self.get_state().await); Ok(true) } - - async fn stat(&self) -> String { - match self.sync_store.stat().await { - Ok((num_pending_txs, num_ready_txs)) => format!( - "RandomBatcher {{ batcher = {:?}, pending_txs = {}, ready_txs = {}}}", - self.batcher, num_pending_txs, num_ready_txs - ), - Err(err) => format!( - "RandomBatcher {{ batcher = {:?}, pending_txs/ready_txs = Error({:?})}}", - self.batcher, err - ), - } - } } diff --git a/node/sync/src/auto_sync/batcher_serial.rs b/node/sync/src/auto_sync/batcher_serial.rs index 6193b0c..a8e4d39 100644 --- a/node/sync/src/auto_sync/batcher_serial.rs +++ b/node/sync/src/auto_sync/batcher_serial.rs @@ -8,58 +8,70 @@ use crate::{ }; use anyhow::Result; use log_entry_sync::LogSyncEvent; +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, fmt::Debug, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, }; use storage_async::Store; use tokio::{ - sync::{broadcast::Receiver, mpsc::UnboundedReceiver}, + sync::{broadcast::Receiver, mpsc::UnboundedReceiver, RwLock}, time::sleep, }; /// Supports to sync files in sequence concurrently. +#[derive(Clone)] pub struct SerialBatcher { batcher: Batcher, /// Next tx seq to sync. - next_tx_seq: u64, + next_tx_seq: Arc, /// Maximum tx seq to sync. - max_tx_seq: u64, + max_tx_seq: Arc, /// Completed txs that pending to update sync db in sequence. - pending_completed_txs: HashMap, + pending_completed_txs: Arc>>, /// Next tx seq to sync in db, so as to continue file sync from /// break point when program restarted. - next_tx_seq_in_db: u64, + next_tx_seq_in_db: Arc, sync_store: Arc, } -impl Debug for SerialBatcher { +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SerialBatcherState { + pub tasks: Vec, + pub next: u64, + pub max: u64, + pub pendings: HashMap, + pub next_in_db: u64, +} + +impl Debug for SerialBatcherState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let max_tx_seq_desc = if self.max_tx_seq == u64::MAX { + let max_tx_seq_desc = if self.max == u64::MAX { "N/A".into() } else { - format!("{}", self.max_tx_seq) + format!("{}", self.max) }; - let pendings_desc = if self.pending_completed_txs.len() <= 5 { - format!("{:?}", self.pending_completed_txs) + let pendings_desc = if self.pendings.len() <= 5 { + format!("{:?}", self.pendings) } else { - format!("{}", self.pending_completed_txs.len()) + format!("{}", self.pendings.len()) }; f.debug_struct("SerialBatcher") - .field("batcher", &self.batcher) - .field("next", &self.next_tx_seq) + .field("tasks", &self.tasks) + .field("next", &self.next) .field("max", &max_tx_seq_desc) .field("pendings", &pendings_desc) - .field("next_in_db", &self.next_tx_seq_in_db) + .field("next_in_db", &self.next_in_db) .finish() } } @@ -78,21 +90,37 @@ impl SerialBatcher { Ok(Self { batcher: Batcher::new(config, capacity, store, sync_send), - next_tx_seq: next_tx_seq.unwrap_or(0), - max_tx_seq: max_tx_seq.unwrap_or(u64::MAX), + next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))), + max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq.unwrap_or(u64::MAX))), pending_completed_txs: Default::default(), - next_tx_seq_in_db: next_tx_seq.unwrap_or(0), + next_tx_seq_in_db: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))), sync_store, }) } + pub async fn get_state(&self) -> SerialBatcherState { + SerialBatcherState { + tasks: self.batcher.tasks().await, + next: self.next_tx_seq.load(Ordering::Relaxed), + max: self.max_tx_seq.load(Ordering::Relaxed), + pendings: self + .pending_completed_txs + .read() + .await + .iter() + .map(|(k, v)| (*k, *v)) + .collect(), + next_in_db: self.next_tx_seq_in_db.load(Ordering::Relaxed), + } + } + pub async fn start( mut self, mut file_announcement_recv: UnboundedReceiver, mut log_sync_recv: Receiver, catched_up: Arc, ) { - info!(?self, "Start to sync files"); + info!("Start to sync files, state = {:?}", self.get_state().await); loop { // handle all pending file announcements @@ -119,11 +147,14 @@ impl SerialBatcher { match self.sync_once().await { Ok(true) => {} Ok(false) => { - trace!(?self, "File sync still in progress or idle"); + trace!( + "File sync still in progress or idle, state = {:?}", + self.get_state().await + ); sleep(INTERVAL_IDLE).await; } Err(err) => { - warn!(%err, ?self, "Failed to sync file once"); + warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await); sleep(INTERVAL_ERROR).await; } } @@ -139,26 +170,27 @@ impl SerialBatcher { trace!(%announced_tx_seq, "Received file announcement"); // new file announced - if self.max_tx_seq == u64::MAX || announced_tx_seq > self.max_tx_seq { - debug!(%announced_tx_seq, ?self, "Update for new file announcement"); + let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed); + if max_tx_seq == u64::MAX || announced_tx_seq > max_tx_seq { + debug!(%announced_tx_seq, "Update for new file announcement, state = {:?}", self.get_state().await); if let Err(err) = self.sync_store.set_max_tx_seq(announced_tx_seq).await { - error!(%err, %announced_tx_seq, ?self, "Failed to set max_tx_seq in store"); + error!(%err, %announced_tx_seq, "Failed to set max_tx_seq in store, state = {:?}", self.get_state().await); } - self.max_tx_seq = announced_tx_seq; + self.max_tx_seq.store(announced_tx_seq, Ordering::Relaxed); return true; } // already wait for sequential sync - if announced_tx_seq >= self.next_tx_seq { + if announced_tx_seq >= self.next_tx_seq.load(Ordering::Relaxed) { return true; } // otherwise, mark tx as ready for sync if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await { - error!(%err, %announced_tx_seq, ?self, "Failed to promote announced tx to ready"); + error!(%err, %announced_tx_seq, "Failed to promote announced tx to ready, state = {:?}", self.get_state().await); } true @@ -173,11 +205,11 @@ impl SerialBatcher { debug!(%reverted_tx_seq, "Reorg detected"); // reorg happened, but no impact on file sync - if reverted_tx_seq >= self.next_tx_seq { + if reverted_tx_seq >= self.next_tx_seq.load(Ordering::Relaxed) { return true; } - info!(%reverted_tx_seq, ?self, "Handle reorg started"); + info!(%reverted_tx_seq, "Handle reorg started, state = {:?}", self.get_state().await); // terminate all files in progress self.batcher @@ -185,23 +217,22 @@ impl SerialBatcher { .await; // update states - self.batcher.reorg(reverted_tx_seq); - self.next_tx_seq = reverted_tx_seq; + self.batcher.reorg(reverted_tx_seq).await; + self.next_tx_seq.store(reverted_tx_seq, Ordering::Relaxed); self.pending_completed_txs + .write() + .await .retain(|&k, _| k < reverted_tx_seq); - if self.next_tx_seq_in_db > reverted_tx_seq { - self.next_tx_seq_in_db = reverted_tx_seq; + if self.next_tx_seq_in_db.load(Ordering::Relaxed) > reverted_tx_seq { + self.next_tx_seq_in_db + .store(reverted_tx_seq, Ordering::Relaxed); - if let Err(err) = self - .sync_store - .set_next_tx_seq(self.next_tx_seq_in_db) - .await - { - error!(%err, %reverted_tx_seq, ?self, "Failed to set next tx seq due to tx reverted"); + if let Err(err) = self.sync_store.set_next_tx_seq(reverted_tx_seq).await { + error!(%err, %reverted_tx_seq, "Failed to set next tx seq due to tx reverted, state = {:?}", self.get_state().await); } } - info!(%reverted_tx_seq, ?self, "Handle reorg ended"); + info!(%reverted_tx_seq, "Handle reorg ended, state = {:?}", self.get_state().await); true } @@ -218,8 +249,11 @@ impl SerialBatcher { None => return Ok(false), }; - info!(%tx_seq, ?sync_result, ?self, "Completed to sync file"); - self.pending_completed_txs.insert(tx_seq, sync_result); + info!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await); + self.pending_completed_txs + .write() + .await + .insert(tx_seq, sync_result); // update sync db self.update_completed_txs_in_db().await?; @@ -229,45 +263,44 @@ impl SerialBatcher { /// Schedule file sync in sequence. async fn schedule_next(&mut self) -> Result { - if self.next_tx_seq > self.max_tx_seq { + let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed); + if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) { return Ok(false); } - if !self.batcher.add(self.next_tx_seq).await? { + if !self.batcher.add(next_tx_seq).await? { return Ok(false); } - self.next_tx_seq += 1; + self.next_tx_seq.store(next_tx_seq + 1, Ordering::Relaxed); - info!(?self, "Move forward"); + info!("Move forward, state = {:?}", self.get_state().await); Ok(true) } /// Update file sync index in db. async fn update_completed_txs_in_db(&mut self) -> Result<()> { - let origin = self.next_tx_seq_in_db; + let origin = self.next_tx_seq_in_db.load(Ordering::Relaxed); + let mut current = origin; - while let Some(sync_result) = self.pending_completed_txs.get(&self.next_tx_seq_in_db) { + while let Some(&sync_result) = self.pending_completed_txs.read().await.get(¤t) { // downgrade to random sync if file sync failed or timeout if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) { - self.sync_store - .add_pending_tx(self.next_tx_seq_in_db) - .await?; + self.sync_store.add_pending_tx(current).await?; } // always move forward in db - self.sync_store - .set_next_tx_seq(self.next_tx_seq_in_db + 1) - .await?; + self.sync_store.set_next_tx_seq(current + 1).await?; // update in memory after db updated - self.pending_completed_txs.remove(&self.next_tx_seq_in_db); - self.next_tx_seq_in_db += 1; + self.pending_completed_txs.write().await.remove(¤t); + current += 1; + self.next_tx_seq_in_db.store(current, Ordering::Relaxed); } - if self.next_tx_seq_in_db > origin { - info!(%origin, %self.next_tx_seq_in_db, "Move forward in db"); + if current > origin { + info!(%origin, %current, "Move forward in db"); } Ok(())