mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-24 23:35:18 +00:00
Supports to concurrently sync files in sequence (#108)
* Supports to concurrently sync files in sequence * add more comments * refactor random auto sync
This commit is contained in:
parent
d66fa10fe5
commit
816353dfd4
148
node/sync/src/auto_sync/batcher.rs
Normal file
148
node/sync/src/auto_sync/batcher.rs
Normal file
@ -0,0 +1,148 @@
|
||||
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender};
|
||||
use anyhow::{bail, Ok, Result};
|
||||
use std::fmt::Debug;
|
||||
use storage_async::Store;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SyncResult {
|
||||
Completed,
|
||||
Failed,
|
||||
Timeout,
|
||||
}
|
||||
|
||||
/// Supports to sync files concurrently.
|
||||
pub struct Batcher {
|
||||
config: Config,
|
||||
capacity: usize,
|
||||
tasks: Vec<u64>, // files to sync
|
||||
store: Store,
|
||||
sync_send: SyncSender,
|
||||
}
|
||||
|
||||
impl Debug for Batcher {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self.tasks)
|
||||
}
|
||||
}
|
||||
|
||||
impl Batcher {
|
||||
pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self {
|
||||
Self {
|
||||
config,
|
||||
capacity,
|
||||
tasks: Default::default(),
|
||||
store,
|
||||
sync_send,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.tasks.len()
|
||||
}
|
||||
|
||||
pub async fn add(&mut self, tx_seq: u64) -> Result<bool> {
|
||||
// limits the number of threads
|
||||
if self.tasks.len() >= self.capacity {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// requires log entry available before file sync
|
||||
if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.tasks.push(tx_seq);
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn reorg(&mut self, reverted_tx_seq: u64) {
|
||||
self.tasks.retain(|&x| x < reverted_tx_seq);
|
||||
}
|
||||
|
||||
/// Poll the sync result of any completed file sync.
|
||||
pub async fn poll(&mut self) -> Result<Option<(u64, SyncResult)>> {
|
||||
let mut result = None;
|
||||
let mut index = self.tasks.len();
|
||||
|
||||
for (i, tx_seq) in self.tasks.iter().enumerate() {
|
||||
if let Some(ret) = self.poll_tx(*tx_seq).await? {
|
||||
result = Some((*tx_seq, ret));
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if index < self.tasks.len() {
|
||||
self.tasks.swap_remove(index);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
||||
// file already exists
|
||||
if self.store.check_tx_completed(tx_seq).await? {
|
||||
return Ok(Some(SyncResult::Completed));
|
||||
}
|
||||
|
||||
// get sync state to handle in advance
|
||||
let state = match self
|
||||
.sync_send
|
||||
.request(SyncRequest::SyncStatus { tx_seq })
|
||||
.await?
|
||||
{
|
||||
SyncResponse::SyncStatus { status } => status,
|
||||
_ => bail!("Invalid sync response type"),
|
||||
};
|
||||
trace!(?tx_seq, ?state, "File sync status retrieved");
|
||||
|
||||
match state {
|
||||
// start file sync if not launched yet
|
||||
None => match self
|
||||
.sync_send
|
||||
.request(SyncRequest::SyncFile { tx_seq })
|
||||
.await?
|
||||
{
|
||||
SyncResponse::SyncFile { err } if err.is_empty() => Ok(None),
|
||||
SyncResponse::SyncFile { err } => bail!("Failed to sync file: {:?}", err),
|
||||
_ => bail!("Invalid sync response type"),
|
||||
},
|
||||
|
||||
// file sync completed
|
||||
Some(SyncState::Completed) => Ok(Some(SyncResult::Completed)),
|
||||
|
||||
// file sync failed
|
||||
Some(SyncState::Failed { reason }) => {
|
||||
debug!(?reason, "Failed to sync file");
|
||||
Ok(Some(SyncResult::Failed))
|
||||
}
|
||||
|
||||
// file sync timeout
|
||||
Some(SyncState::FindingPeers { origin, .. })
|
||||
if origin.elapsed() > self.config.find_peer_timeout =>
|
||||
{
|
||||
debug!(%tx_seq, "Terminate file sync due to finding peers timeout");
|
||||
self.terminate_file_sync(tx_seq, false).await;
|
||||
Ok(Some(SyncResult::Timeout))
|
||||
}
|
||||
|
||||
// others
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn terminate_file_sync(&self, tx_seq: u64, is_reverted: bool) {
|
||||
if let Err(err) = self
|
||||
.sync_send
|
||||
.request(SyncRequest::TerminateFileSync {
|
||||
tx_seq,
|
||||
is_reverted,
|
||||
})
|
||||
.await
|
||||
{
|
||||
// just log and go ahead for any error, e.g. timeout
|
||||
error!(%err, %tx_seq, %is_reverted, "Failed to terminate file sync");
|
||||
}
|
||||
}
|
||||
}
|
84
node/sync/src/auto_sync/batcher_random.rs
Normal file
84
node/sync/src/auto_sync/batcher_random.rs
Normal file
@ -0,0 +1,84 @@
|
||||
use super::{batcher::Batcher, sync_store::SyncStore};
|
||||
use crate::{
|
||||
auto_sync::{batcher::SyncResult, INTERVAL_ERROR, INTERVAL_IDLE},
|
||||
Config, SyncSender,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use storage_async::Store;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RandomBatcher {
|
||||
batcher: Batcher,
|
||||
sync_store: SyncStore,
|
||||
}
|
||||
|
||||
impl RandomBatcher {
|
||||
pub fn new(config: Config, store: Store, sync_send: SyncSender) -> Self {
|
||||
let sync_store = SyncStore::new(store.clone());
|
||||
|
||||
Self {
|
||||
// now, only 1 thread to sync file randomly
|
||||
batcher: Batcher::new(config, 1, store, sync_send),
|
||||
sync_store,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(mut self) {
|
||||
info!("Start to sync files");
|
||||
|
||||
loop {
|
||||
match self.sync_once().await {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
trace!(?self, "File sync still in progress or idle");
|
||||
sleep(INTERVAL_IDLE).await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%err, ?self, "Failed to sync file once");
|
||||
sleep(INTERVAL_ERROR).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn sync_once(&mut self) -> Result<bool> {
|
||||
if self.schedule().await? {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// poll any completed file sync
|
||||
let (tx_seq, sync_result) = match self.batcher.poll().await? {
|
||||
Some(v) => v,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
||||
debug!(%tx_seq, ?sync_result, ?self, "Completed to sync file");
|
||||
|
||||
match sync_result {
|
||||
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,
|
||||
_ => self.sync_store.downgrade_tx_to_pending(tx_seq).await?,
|
||||
};
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn schedule(&mut self) -> Result<bool> {
|
||||
if self.batcher.len() > 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let tx_seq = match self.sync_store.random_tx().await? {
|
||||
Some(v) => v,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
||||
if !self.batcher.add(tx_seq).await? {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
debug!(?self, "Pick a file to sync");
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
248
node/sync/src/auto_sync/batcher_serial.rs
Normal file
248
node/sync/src/auto_sync/batcher_serial.rs
Normal file
@ -0,0 +1,248 @@
|
||||
use super::{
|
||||
batcher::{Batcher, SyncResult},
|
||||
sync_store::SyncStore,
|
||||
};
|
||||
use crate::{
|
||||
auto_sync::{INTERVAL_ERROR, INTERVAL_IDLE},
|
||||
Config, SyncSender,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use log_entry_sync::LogSyncEvent;
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
use storage_async::Store;
|
||||
use tokio::{
|
||||
sync::{broadcast::Receiver, mpsc::UnboundedReceiver},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
/// Supports to sync files in sequence concurrently.
|
||||
pub struct SerialBatcher {
|
||||
batcher: Batcher,
|
||||
|
||||
/// Next tx seq to sync.
|
||||
next_tx_seq: u64,
|
||||
/// Maximum tx seq to sync.
|
||||
max_tx_seq: u64,
|
||||
|
||||
/// Completed txs that pending to update sync db in sequence.
|
||||
pending_completed_txs: HashMap<u64, SyncResult>,
|
||||
/// Next tx seq to sync in db, so as to continue file sync from
|
||||
/// break point when program restarted.
|
||||
next_tx_seq_in_db: u64,
|
||||
|
||||
sync_store: SyncStore,
|
||||
}
|
||||
|
||||
impl Debug for SerialBatcher {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let max_tx_seq_desc = if self.max_tx_seq == u64::MAX {
|
||||
"N/A".into()
|
||||
} else {
|
||||
format!("{}", self.max_tx_seq)
|
||||
};
|
||||
|
||||
let pendings_desc = if self.pending_completed_txs.len() <= 5 {
|
||||
format!("{:?}", self.pending_completed_txs)
|
||||
} else {
|
||||
format!("{}", self.pending_completed_txs.len())
|
||||
};
|
||||
|
||||
f.debug_struct("SerialBatcher")
|
||||
.field("batcher", &self.batcher)
|
||||
.field("next", &self.next_tx_seq)
|
||||
.field("max", &max_tx_seq_desc)
|
||||
.field("pendings", &pendings_desc)
|
||||
.field("next_in_db", &self.next_tx_seq_in_db)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SerialBatcher {
|
||||
pub async fn new(config: Config, store: Store, sync_send: SyncSender) -> Result<Self> {
|
||||
let capacity = config.max_sequential_workers;
|
||||
let sync_store = SyncStore::new(store.clone());
|
||||
|
||||
// continue file sync from break point in db
|
||||
let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?;
|
||||
|
||||
Ok(Self {
|
||||
batcher: Batcher::new(config, capacity, store, sync_send),
|
||||
next_tx_seq: next_tx_seq.unwrap_or(0),
|
||||
max_tx_seq: max_tx_seq.unwrap_or(u64::MAX),
|
||||
pending_completed_txs: Default::default(),
|
||||
next_tx_seq_in_db: next_tx_seq.unwrap_or(0),
|
||||
sync_store,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
mut self,
|
||||
mut file_announcement_recv: UnboundedReceiver<u64>,
|
||||
mut log_sync_recv: Receiver<LogSyncEvent>,
|
||||
) {
|
||||
info!(?self, "Start to sync files");
|
||||
|
||||
loop {
|
||||
// handle all pending file announcements
|
||||
if self
|
||||
.update_file_announcement(&mut file_announcement_recv)
|
||||
.await
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// handle all reorg events
|
||||
if self.handle_reorg(&mut log_sync_recv).await {
|
||||
continue;
|
||||
}
|
||||
|
||||
// sync files
|
||||
match self.sync_once().await {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
trace!(?self, "File sync still in progress or idle");
|
||||
sleep(INTERVAL_IDLE).await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%err, ?self, "Failed to sync file once");
|
||||
sleep(INTERVAL_ERROR).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_file_announcement(&mut self, recv: &mut UnboundedReceiver<u64>) -> bool {
|
||||
let announced_tx_seq = match recv.try_recv() {
|
||||
Ok(v) => v,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
debug!(%announced_tx_seq, "Received file announcement");
|
||||
|
||||
// new file announced
|
||||
if self.max_tx_seq == u64::MAX || announced_tx_seq > self.max_tx_seq {
|
||||
debug!(%announced_tx_seq, ?self, "Update for new file announcement");
|
||||
|
||||
if let Err(err) = self.sync_store.set_max_tx_seq(announced_tx_seq).await {
|
||||
error!(%err, %announced_tx_seq, ?self, "Failed to set max_tx_seq in store");
|
||||
}
|
||||
|
||||
self.max_tx_seq = announced_tx_seq;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// already wait for sequential sync
|
||||
if announced_tx_seq >= self.next_tx_seq {
|
||||
return true;
|
||||
}
|
||||
|
||||
// otherwise, mark tx as ready for sync
|
||||
if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await {
|
||||
error!(%err, %announced_tx_seq, ?self, "Failed to promote announced tx to ready");
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
async fn handle_reorg(&mut self, recv: &mut Receiver<LogSyncEvent>) -> bool {
|
||||
let reverted_tx_seq = match recv.try_recv() {
|
||||
Ok(LogSyncEvent::Reverted { tx_seq }) => tx_seq,
|
||||
_ => return false,
|
||||
};
|
||||
|
||||
debug!(%reverted_tx_seq, "Reorg detected");
|
||||
|
||||
// reorg happened, but no impact on file sync
|
||||
if reverted_tx_seq >= self.next_tx_seq {
|
||||
return true;
|
||||
}
|
||||
|
||||
info!(%reverted_tx_seq, ?self, "Handle reorg");
|
||||
|
||||
// terminate all files in progress
|
||||
self.batcher
|
||||
.terminate_file_sync(reverted_tx_seq, true)
|
||||
.await;
|
||||
|
||||
// update states
|
||||
self.batcher.reorg(reverted_tx_seq);
|
||||
self.next_tx_seq = reverted_tx_seq;
|
||||
self.pending_completed_txs
|
||||
.retain(|&k, _| k < reverted_tx_seq);
|
||||
if self.next_tx_seq_in_db > reverted_tx_seq {
|
||||
self.next_tx_seq_in_db = reverted_tx_seq;
|
||||
|
||||
if let Err(err) = self
|
||||
.sync_store
|
||||
.set_next_tx_seq(self.next_tx_seq_in_db)
|
||||
.await
|
||||
{
|
||||
error!(%err, %reverted_tx_seq, ?self, "Failed to set next tx seq due to tx reverted");
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
async fn sync_once(&mut self) -> Result<bool> {
|
||||
// try to trigger more file sync
|
||||
if self.schedule_next().await? {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// poll any completed file sync
|
||||
let (tx_seq, sync_result) = match self.batcher.poll().await? {
|
||||
Some(v) => v,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
||||
info!(%tx_seq, ?sync_result, ?self, "Completed to sync file");
|
||||
self.pending_completed_txs.insert(tx_seq, sync_result);
|
||||
|
||||
// update sync db
|
||||
self.update_completed_txs_in_db().await?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Schedule file sync in sequence.
|
||||
async fn schedule_next(&mut self) -> Result<bool> {
|
||||
if self.next_tx_seq > self.max_tx_seq {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if !self.batcher.add(self.next_tx_seq).await? {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.next_tx_seq += 1;
|
||||
|
||||
info!(?self, "Move forward");
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Update file sync index in db.
|
||||
async fn update_completed_txs_in_db(&mut self) -> Result<()> {
|
||||
while let Some(sync_result) = self.pending_completed_txs.get(&self.next_tx_seq_in_db) {
|
||||
// downgrade to random sync if file sync failed or timeout
|
||||
if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) {
|
||||
self.sync_store
|
||||
.add_pending_tx(self.next_tx_seq_in_db)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// always move forward in db
|
||||
self.sync_store
|
||||
.set_next_tx_seq(self.next_tx_seq_in_db + 1)
|
||||
.await?;
|
||||
|
||||
// update in memory after db updated
|
||||
self.pending_completed_txs.remove(&self.next_tx_seq_in_db);
|
||||
self.next_tx_seq_in_db += 1;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,677 +0,0 @@
|
||||
use super::sync_store::SyncStore;
|
||||
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender};
|
||||
use anyhow::{bail, Result};
|
||||
use log_entry_sync::LogSyncEvent;
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use storage_async::Store;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::broadcast::{error::RecvError, Receiver};
|
||||
use tokio::time::sleep;
|
||||
|
||||
const INTERVAL_CATCHUP: Duration = Duration::from_millis(1);
|
||||
const INTERVAL: Duration = Duration::from_secs(1);
|
||||
const INTERVAL_ERROR: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Manager to synchronize files among storage nodes automatically.
|
||||
///
|
||||
/// Generally, most files could be synchronized among storage nodes. However, a few
|
||||
/// files may be unavailable on all storage nodes, e.g.
|
||||
///
|
||||
/// 1. File not uploaded by user in time.
|
||||
/// 2. File removed due to blockchain reorg, and user do not upload again.
|
||||
///
|
||||
/// So, there are 2 workers to synchronize files in parallel:
|
||||
///
|
||||
/// 1. Synchronize announced files in sequence. If any file unavailable, store it into db.
|
||||
/// 2. Synchronize the missed files in sequential synchronization.
|
||||
#[derive(Clone)]
|
||||
pub struct Manager {
|
||||
config: Config,
|
||||
|
||||
/// The next `tx_seq` to sync in sequence.
|
||||
next_tx_seq: Arc<AtomicU64>,
|
||||
|
||||
/// The maximum `tx_seq` to sync in sequence, `u64::MAX` means unlimited.
|
||||
/// Generally, it is updated when file announcement received.
|
||||
max_tx_seq: Arc<AtomicU64>,
|
||||
|
||||
/// The last reverted transaction seq, `u64::MAX` means no tx reverted.
|
||||
/// Generally, it is updated when transaction reverted.
|
||||
reverted_tx_seq: Arc<AtomicU64>,
|
||||
|
||||
store: Store,
|
||||
sync_store: SyncStore,
|
||||
|
||||
/// Used to interact with sync service for the current file in sync.
|
||||
sync_send: SyncSender,
|
||||
}
|
||||
|
||||
impl Manager {
|
||||
pub async fn new(store: Store, sync_send: SyncSender, config: Config) -> Result<Self> {
|
||||
let sync_store = SyncStore::new(store.clone());
|
||||
|
||||
let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?;
|
||||
let next_tx_seq = next_tx_seq.unwrap_or(0);
|
||||
let max_tx_seq = max_tx_seq.unwrap_or(u64::MAX);
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq)),
|
||||
max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq)),
|
||||
reverted_tx_seq: Arc::new(AtomicU64::new(u64::MAX)),
|
||||
store,
|
||||
sync_store,
|
||||
sync_send,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spwn(&self, executor: &TaskExecutor, receiver: Receiver<LogSyncEvent>) {
|
||||
executor.spawn(
|
||||
self.clone().monitor_reorg(receiver),
|
||||
"sync_manager_reorg_monitor",
|
||||
);
|
||||
|
||||
executor.spawn(self.clone().start_sync(), "sync_manager_sequential_syncer");
|
||||
|
||||
executor.spawn(
|
||||
self.clone().start_sync_pending_txs(),
|
||||
"sync_manager_pending_syncer",
|
||||
);
|
||||
}
|
||||
|
||||
fn set_reverted(&self, tx_seq: u64) -> bool {
|
||||
if tx_seq >= self.reverted_tx_seq.load(Ordering::Relaxed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.reverted_tx_seq.store(tx_seq, Ordering::Relaxed);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn handle_on_reorg(&self) -> Option<u64> {
|
||||
let reverted_tx_seq = self.reverted_tx_seq.load(Ordering::Relaxed);
|
||||
|
||||
// no reorg happened
|
||||
if reverted_tx_seq == u64::MAX {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.reverted_tx_seq.store(u64::MAX, Ordering::Relaxed);
|
||||
|
||||
// reorg happened, but no impact on file sync
|
||||
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
if reverted_tx_seq > next_tx_seq {
|
||||
return None;
|
||||
}
|
||||
|
||||
// handles on reorg
|
||||
info!(%reverted_tx_seq, %next_tx_seq, "Transaction reverted");
|
||||
|
||||
// re-sync files from the reverted tx seq
|
||||
self.next_tx_seq.store(reverted_tx_seq, Ordering::Relaxed);
|
||||
|
||||
Some(next_tx_seq)
|
||||
}
|
||||
|
||||
pub async fn update_on_announcement(&self, announced_tx_seq: u64) {
|
||||
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
|
||||
debug!(%next_tx_seq, %max_tx_seq, %announced_tx_seq, "Update for new announcement");
|
||||
|
||||
// new file announced
|
||||
if max_tx_seq == u64::MAX || announced_tx_seq > max_tx_seq {
|
||||
match self.sync_store.set_max_tx_seq(announced_tx_seq).await {
|
||||
Ok(()) => self.max_tx_seq.store(announced_tx_seq, Ordering::Relaxed),
|
||||
Err(e) => error!(%e, "Failed to set max_tx_seq in store"),
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
// already wait for sequential sync
|
||||
if announced_tx_seq >= next_tx_seq {
|
||||
return;
|
||||
}
|
||||
|
||||
// otherwise, mark tx as ready for sync
|
||||
if let Err(e) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await {
|
||||
error!(%e, "Failed to promote announced tx to ready");
|
||||
}
|
||||
}
|
||||
|
||||
async fn move_forward(&self, pending: bool) -> Result<bool> {
|
||||
let tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
|
||||
if tx_seq > max_tx_seq {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// put the tx into pending list
|
||||
if pending && self.sync_store.add_pending_tx(tx_seq).await? {
|
||||
debug!(%tx_seq, "Pending tx added");
|
||||
}
|
||||
|
||||
let next_tx_seq = tx_seq + 1;
|
||||
self.sync_store.set_next_tx_seq(next_tx_seq).await?;
|
||||
self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed);
|
||||
|
||||
debug!(%next_tx_seq, %max_tx_seq, "Move forward");
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Returns whether file sync in progress but no peers found
|
||||
async fn sync_tx(&self, tx_seq: u64) -> Result<bool> {
|
||||
// tx not available yet
|
||||
if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// get sync state to handle in advance
|
||||
let state = match self
|
||||
.sync_send
|
||||
.request(SyncRequest::SyncStatus { tx_seq })
|
||||
.await?
|
||||
{
|
||||
SyncResponse::SyncStatus { status } => status,
|
||||
_ => bail!("Invalid sync response type"),
|
||||
};
|
||||
trace!(?tx_seq, ?state, "sync_tx tx status");
|
||||
|
||||
// notify service to sync file if not started or failed
|
||||
if matches!(state, None | Some(SyncState::Failed { .. })) {
|
||||
match self
|
||||
.sync_send
|
||||
.request(SyncRequest::SyncFile { tx_seq })
|
||||
.await?
|
||||
{
|
||||
SyncResponse::SyncFile { err } if err.is_empty() => return Ok(false),
|
||||
SyncResponse::SyncFile { err } => bail!("Failed to sync file: {:?}", err),
|
||||
_ => bail!("Invalid sync response type"),
|
||||
}
|
||||
}
|
||||
|
||||
if matches!(state, Some(SyncState::FindingPeers { origin, .. }) if origin.elapsed() > self.config.find_peer_timeout)
|
||||
{
|
||||
// no peers found for a long time
|
||||
self.terminate_file_sync(tx_seq, false).await;
|
||||
info!(%tx_seq, "Terminate file sync due to finding peers timeout");
|
||||
Ok(true)
|
||||
} else {
|
||||
// otherwise, continue to wait for file sync that already in progress
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
async fn terminate_file_sync(&self, tx_seq: u64, is_reverted: bool) {
|
||||
if let Err(err) = self
|
||||
.sync_send
|
||||
.request(SyncRequest::TerminateFileSync {
|
||||
tx_seq,
|
||||
is_reverted,
|
||||
})
|
||||
.await
|
||||
{
|
||||
// just log and go ahead for any error, e.g. timeout
|
||||
error!(%err, %tx_seq, "Failed to terminate file sync");
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts to monitor reorg and handle on transaction reverted.
|
||||
async fn monitor_reorg(self, mut receiver: Receiver<LogSyncEvent>) {
|
||||
info!("Start to monitor reorg");
|
||||
|
||||
loop {
|
||||
match receiver.recv().await {
|
||||
Ok(LogSyncEvent::ReorgDetected { .. }) => {}
|
||||
Ok(LogSyncEvent::Reverted { tx_seq }) => {
|
||||
// requires to re-sync files since transaction and files removed in storage
|
||||
self.set_reverted(tx_seq);
|
||||
}
|
||||
Ok(LogSyncEvent::TxSynced { .. }) => {} //No need to handle synced tx in reorg
|
||||
Err(RecvError::Closed) => {
|
||||
// program terminated
|
||||
info!("Completed to monitor reorg");
|
||||
return;
|
||||
}
|
||||
Err(RecvError::Lagged(lagged)) => {
|
||||
// Generally, such error should not happen since confirmed block
|
||||
// reorg rarely happen, and the buffer size of broadcast channel
|
||||
// is big enough.
|
||||
error!(%lagged, "Failed to receive reverted tx (Lagged)");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts to synchronize files in sequence.
|
||||
async fn start_sync(self) {
|
||||
info!(
|
||||
"Start to sync files periodically, next = {}, max = {}",
|
||||
self.next_tx_seq.load(Ordering::Relaxed),
|
||||
self.max_tx_seq.load(Ordering::Relaxed)
|
||||
);
|
||||
|
||||
loop {
|
||||
// handles reorg before file sync
|
||||
if let Some(tx_seq) = self.handle_on_reorg() {
|
||||
// request sync service to terminate the file sync immediately
|
||||
self.terminate_file_sync(tx_seq, true).await;
|
||||
}
|
||||
|
||||
// sync file
|
||||
let sync_result = self.sync_once().await;
|
||||
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
match sync_result {
|
||||
Ok(true) => {
|
||||
debug!(%next_tx_seq, "Completed to sync file");
|
||||
sleep(INTERVAL_CATCHUP).await;
|
||||
}
|
||||
Ok(false) => {
|
||||
trace!(%next_tx_seq, "File in sync or log entry unavailable");
|
||||
sleep(INTERVAL).await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%err, %next_tx_seq, "Failed to sync file");
|
||||
sleep(INTERVAL_ERROR).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn sync_once(&self) -> Result<bool> {
|
||||
// already sync to the latest file
|
||||
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// already finalized
|
||||
if self.store.check_tx_completed(next_tx_seq).await? {
|
||||
self.move_forward(false).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// try sync tx
|
||||
let no_peer_timeout = self.sync_tx(next_tx_seq).await?;
|
||||
|
||||
// put tx to pending list if no peers found for a long time
|
||||
if no_peer_timeout {
|
||||
self.move_forward(true).await?;
|
||||
}
|
||||
|
||||
Ok(no_peer_timeout)
|
||||
}
|
||||
|
||||
/// Starts to synchronize pending files that unavailable during sequential synchronization.
|
||||
async fn start_sync_pending_txs(self) {
|
||||
info!("Start to sync pending files");
|
||||
|
||||
let mut tx_seq = 0;
|
||||
let mut next = true;
|
||||
|
||||
loop {
|
||||
if next {
|
||||
match self.sync_store.random_tx().await {
|
||||
Ok(Some(seq)) => {
|
||||
tx_seq = seq;
|
||||
debug!(%tx_seq, "Start to sync pending file");
|
||||
}
|
||||
Ok(None) => {
|
||||
trace!("No pending file to sync");
|
||||
sleep(INTERVAL).await;
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%err, "Failed to pick pending file to sync");
|
||||
sleep(INTERVAL_ERROR).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match self.sync_pending_tx(tx_seq).await {
|
||||
Ok(true) => {
|
||||
debug!(%tx_seq, "Completed to sync pending file");
|
||||
sleep(INTERVAL_CATCHUP).await;
|
||||
next = true;
|
||||
}
|
||||
Ok(false) => {
|
||||
trace!(%tx_seq, "Pending file in sync or tx unavailable");
|
||||
sleep(INTERVAL).await;
|
||||
next = false;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%err, %tx_seq, "Failed to sync pending file");
|
||||
sleep(INTERVAL_ERROR).await;
|
||||
next = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn sync_pending_tx(&self, tx_seq: u64) -> Result<bool> {
|
||||
// already finalized
|
||||
if self.store.check_tx_completed(tx_seq).await? {
|
||||
self.sync_store.remove_tx(tx_seq).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// try sync tx
|
||||
let no_peer_timeout = self.sync_tx(tx_seq).await?;
|
||||
|
||||
// downgrade if no peers found for a long time
|
||||
if no_peer_timeout && self.sync_store.downgrade_tx_to_pending(tx_seq).await? {
|
||||
debug!(%tx_seq, "No peers found for pending file and downgraded");
|
||||
}
|
||||
|
||||
Ok(no_peer_timeout)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
ops::Sub,
|
||||
sync::atomic::Ordering,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use channel::{test_util::TestReceiver, Channel};
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
|
||||
use crate::{
|
||||
auto_sync::sync_store::SyncStore,
|
||||
controllers::SyncState,
|
||||
test_util::{create_2_store, tests::TestStoreRuntime},
|
||||
Config, SyncMessage, SyncRequest, SyncResponse,
|
||||
};
|
||||
|
||||
use super::Manager;
|
||||
|
||||
async fn new_manager(
|
||||
runtime: &TestStoreRuntime,
|
||||
next_tx_seq: u64,
|
||||
max_tx_seq: u64,
|
||||
) -> (
|
||||
Manager,
|
||||
TestReceiver<SyncMessage, SyncRequest, SyncResponse>,
|
||||
) {
|
||||
let sync_store = SyncStore::new(runtime.store.clone());
|
||||
sync_store.set_next_tx_seq(next_tx_seq).await.unwrap();
|
||||
if max_tx_seq < u64::MAX {
|
||||
sync_store.set_max_tx_seq(max_tx_seq).await.unwrap();
|
||||
}
|
||||
|
||||
let (sync_send, sync_recv) = Channel::unbounded();
|
||||
let manager = Manager::new(runtime.store.clone(), sync_send, Config::default())
|
||||
.await
|
||||
.unwrap();
|
||||
(manager, sync_recv.into())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_init_values() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12);
|
||||
assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_set_reverted() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
// reverted to tx 5
|
||||
assert!(manager.set_reverted(5));
|
||||
assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), 5);
|
||||
|
||||
// no effect if tx 6 reverted again
|
||||
assert!(!manager.set_reverted(6));
|
||||
assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), 5);
|
||||
|
||||
// overwrite tx 5 if tx 3 reverted
|
||||
assert!(manager.set_reverted(3));
|
||||
assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_handle_reorg() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
// no effect if not reverted
|
||||
assert_eq!(manager.handle_on_reorg(), None);
|
||||
assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX);
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4);
|
||||
|
||||
// tx 5 reverted, but sync in future
|
||||
assert!(manager.set_reverted(5));
|
||||
assert_eq!(manager.handle_on_reorg(), None);
|
||||
assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX);
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4);
|
||||
|
||||
// tx 3 reverted, should terminate tx 4 and re-sync files since tx 3
|
||||
assert!(manager.set_reverted(3));
|
||||
assert_eq!(manager.handle_on_reorg(), Some(4));
|
||||
assert_eq!(manager.reverted_tx_seq.load(Ordering::Relaxed), u64::MAX);
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_update_on_announcement() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
// no effect if tx 10 announced
|
||||
manager.update_on_announcement(10).await;
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12);
|
||||
|
||||
// `max_tx_seq` enlarged if tx 20 announced
|
||||
manager.update_on_announcement(20).await;
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 20);
|
||||
|
||||
// no effect if announced for a non-pending tx
|
||||
manager.update_on_announcement(2).await;
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 4);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 20);
|
||||
assert_eq!(manager.sync_store.random_tx().await.unwrap(), None);
|
||||
|
||||
// pending tx upgraded if announcement received
|
||||
assert!(manager.sync_store.add_pending_tx(1).await.unwrap());
|
||||
assert!(manager.sync_store.add_pending_tx(2).await.unwrap());
|
||||
manager.update_on_announcement(2).await;
|
||||
assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_move_forward() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
// move forward from 4 to 5
|
||||
assert!(manager.move_forward(false).await.unwrap());
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 5);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12);
|
||||
assert_eq!(
|
||||
manager.sync_store.get_tx_seq_range().await.unwrap(),
|
||||
(Some(5), Some(12))
|
||||
);
|
||||
|
||||
// move forward and add tx 5 to pending list
|
||||
assert!(manager.move_forward(true).await.unwrap());
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 6);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 12);
|
||||
assert_eq!(
|
||||
manager.sync_store.get_tx_seq_range().await.unwrap(),
|
||||
(Some(6), Some(12))
|
||||
);
|
||||
assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(5));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_move_forward_failed() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 5, 5).await;
|
||||
|
||||
// 5 -> 6
|
||||
assert!(manager.move_forward(false).await.unwrap());
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 6);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 5);
|
||||
|
||||
// cannot move forward anymore
|
||||
assert!(!manager.move_forward(false).await.unwrap());
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 6);
|
||||
assert_eq!(manager.max_tx_seq.load(Ordering::Relaxed), 5);
|
||||
assert_eq!(
|
||||
manager.sync_store.get_tx_seq_range().await.unwrap(),
|
||||
(Some(6), Some(5))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_tx_unavailable() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
assert!(!manager.sync_tx(4).await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_tx_status_none() {
|
||||
let (_, store, _, _) = create_2_store(vec![1314, 1324]);
|
||||
let runtime = TestStoreRuntime::new(store);
|
||||
let (manager, mut sync_recv) = new_manager(&runtime, 1, 5).await;
|
||||
|
||||
let (_, sync_result) = tokio::join!(
|
||||
sync_recv.expect_responses(vec![
|
||||
SyncResponse::SyncStatus { status: None },
|
||||
// cause to file sync started
|
||||
SyncResponse::SyncFile { err: String::new() },
|
||||
]),
|
||||
manager.sync_tx(1)
|
||||
);
|
||||
assert!(!sync_result.unwrap());
|
||||
assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_tx_in_progress() {
|
||||
let (_, store, _, _) = create_2_store(vec![1314, 1324]);
|
||||
let runtime = TestStoreRuntime::new(store);
|
||||
let (manager, mut sync_recv) = new_manager(&runtime, 1, 5).await;
|
||||
|
||||
let (_, sync_result) = tokio::join!(
|
||||
// unnecessary to start file sync again
|
||||
sync_recv.expect_response(SyncResponse::SyncStatus {
|
||||
status: Some(SyncState::ConnectingPeers)
|
||||
}),
|
||||
manager.sync_tx(1)
|
||||
);
|
||||
assert!(!sync_result.unwrap());
|
||||
assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty)));
|
||||
}
|
||||
|
||||
async fn expect_no_peer_found(
|
||||
sync_recv: &mut TestReceiver<SyncMessage, SyncRequest, SyncResponse>,
|
||||
) {
|
||||
let responses = vec![
|
||||
// no peers for file sync for a long time
|
||||
SyncResponse::SyncStatus {
|
||||
status: Some(SyncState::FindingPeers {
|
||||
origin: Instant::now().sub(Duration::from_secs(10000)),
|
||||
since: Instant::now(),
|
||||
}),
|
||||
},
|
||||
// required to terminate the file sync
|
||||
SyncResponse::TerminateFileSync { count: 1 },
|
||||
];
|
||||
sync_recv.expect_responses(responses).await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_tx_no_peer_found() {
|
||||
let (_, store, _, _) = create_2_store(vec![1314, 1324]);
|
||||
let runtime = TestStoreRuntime::new(store);
|
||||
let (manager, mut sync_recv) = new_manager(&runtime, 1, 5).await;
|
||||
|
||||
let (_, sync_result) =
|
||||
tokio::join!(expect_no_peer_found(&mut sync_recv), manager.sync_tx(1));
|
||||
assert!(sync_result.unwrap());
|
||||
assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_once_already_latest() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 6, 5).await;
|
||||
|
||||
assert!(!manager.sync_once().await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_once_finalized() {
|
||||
let (_, store, _, _) = create_2_store(vec![1314, 1324]);
|
||||
let runtime = TestStoreRuntime::new(store);
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 1, 5).await;
|
||||
|
||||
assert!(manager.sync_once().await.unwrap());
|
||||
assert_eq!(manager.next_tx_seq.load(Ordering::Relaxed), 2);
|
||||
assert_eq!(manager.sync_store.random_tx().await.unwrap(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_once_no_peer_found() {
|
||||
let (store, _, _, _) = create_2_store(vec![1314]);
|
||||
let runtime = TestStoreRuntime::new(store);
|
||||
let (manager, mut sync_recv) = new_manager(&runtime, 0, 5).await;
|
||||
|
||||
let (_, sync_result) =
|
||||
tokio::join!(expect_no_peer_found(&mut sync_recv), manager.sync_once(),);
|
||||
assert!(sync_result.unwrap());
|
||||
assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty)));
|
||||
assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(0));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_pending_tx_finalized() {
|
||||
let (_, store, _, _) = create_2_store(vec![1314, 1324]);
|
||||
let runtime = TestStoreRuntime::new(store);
|
||||
let (manager, _sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
assert!(manager.sync_store.add_pending_tx(0).await.unwrap());
|
||||
assert!(manager.sync_store.add_pending_tx(1).await.unwrap());
|
||||
|
||||
assert!(manager.sync_pending_tx(1).await.unwrap());
|
||||
assert_eq!(manager.sync_store.random_tx().await.unwrap(), Some(0));
|
||||
assert!(manager.sync_store.add_pending_tx(1).await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manager_sync_pending_tx_no_peer_found() {
|
||||
let (store, _, _, _) = create_2_store(vec![1314, 1324]);
|
||||
let runtime = TestStoreRuntime::new(store);
|
||||
let (manager, mut sync_recv) = new_manager(&runtime, 4, 12).await;
|
||||
|
||||
assert!(manager.sync_store.add_pending_tx(0).await.unwrap());
|
||||
assert!(manager.sync_store.add_pending_tx(1).await.unwrap());
|
||||
assert!(manager.sync_store.upgrade_tx_to_ready(1).await.unwrap());
|
||||
|
||||
let (_, sync_result) = tokio::join!(
|
||||
expect_no_peer_found(&mut sync_recv),
|
||||
manager.sync_pending_tx(1),
|
||||
);
|
||||
assert!(sync_result.unwrap());
|
||||
assert!(matches!(sync_recv.try_recv(), Err(TryRecvError::Empty)));
|
||||
assert!(manager.sync_store.upgrade_tx_to_ready(1).await.unwrap());
|
||||
}
|
||||
}
|
@ -1,5 +1,10 @@
|
||||
mod manager;
|
||||
mod batcher;
|
||||
pub mod batcher_random;
|
||||
pub mod batcher_serial;
|
||||
mod sync_store;
|
||||
mod tx_store;
|
||||
|
||||
pub use manager::Manager as AutoSyncManager;
|
||||
use std::time::Duration;
|
||||
|
||||
const INTERVAL_IDLE: Duration = Duration::from_secs(3);
|
||||
const INTERVAL_ERROR: Duration = Duration::from_secs(10);
|
||||
|
@ -1,6 +1,6 @@
|
||||
use super::tx_store::TxStore;
|
||||
use anyhow::Result;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use storage::log_store::config::{ConfigTx, ConfigurableExt};
|
||||
use storage_async::Store;
|
||||
|
||||
@ -19,6 +19,27 @@ pub struct SyncStore {
|
||||
ready_txs: TxStore,
|
||||
}
|
||||
|
||||
impl Debug for SyncStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let store = self.store.get_store();
|
||||
|
||||
let pendings = match self.pending_txs.count(store) {
|
||||
Ok(count) => format!("{}", count),
|
||||
Err(err) => format!("Err: {:?}", err),
|
||||
};
|
||||
|
||||
let ready = match self.ready_txs.count(store) {
|
||||
Ok(count) => format!("{}", count),
|
||||
Err(err) => format!("Err: {:?}", err),
|
||||
};
|
||||
|
||||
f.debug_struct("SyncStore")
|
||||
.field("pending_txs", &pendings)
|
||||
.field("ready_txs", &ready)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncStore {
|
||||
pub fn new(store: Store) -> Self {
|
||||
Self {
|
||||
|
@ -1,7 +1,7 @@
|
||||
#[macro_use]
|
||||
extern crate tracing;
|
||||
|
||||
mod auto_sync;
|
||||
pub mod auto_sync;
|
||||
mod context;
|
||||
mod controllers;
|
||||
mod service;
|
||||
@ -18,10 +18,13 @@ use std::time::Duration;
|
||||
pub struct Config {
|
||||
pub auto_sync_enabled: bool,
|
||||
pub max_sync_files: usize,
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
pub find_peer_timeout: Duration,
|
||||
pub sync_file_by_rpc_enabled: bool,
|
||||
pub sync_file_on_announcement_enabled: bool,
|
||||
|
||||
// auto_sync config
|
||||
pub max_sequential_workers: usize,
|
||||
#[serde(deserialize_with = "deserialize_duration")]
|
||||
pub find_peer_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@ -29,9 +32,11 @@ impl Default for Config {
|
||||
Self {
|
||||
auto_sync_enabled: false,
|
||||
max_sync_files: 8,
|
||||
find_peer_timeout: Duration::from_secs(10),
|
||||
sync_file_by_rpc_enabled: true,
|
||||
sync_file_on_announcement_enabled: false,
|
||||
|
||||
max_sequential_workers: 8,
|
||||
find_peer_timeout: Duration::from_secs(10),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
use crate::auto_sync::AutoSyncManager;
|
||||
use crate::auto_sync::batcher_random::RandomBatcher;
|
||||
use crate::auto_sync::batcher_serial::SerialBatcher;
|
||||
use crate::context::SyncNetworkContext;
|
||||
use crate::controllers::{
|
||||
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
|
||||
@ -23,6 +24,7 @@ use storage::config::ShardConfig;
|
||||
use storage::error::Result as StorageResult;
|
||||
use storage::log_store::Store as LogStore;
|
||||
use storage_async::Store;
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
const HEARTBEAT_INTERVAL_SEC: u64 = 5;
|
||||
@ -122,7 +124,7 @@ pub struct SyncService {
|
||||
/// Heartbeat interval for executing periodic tasks.
|
||||
heartbeat: tokio::time::Interval,
|
||||
|
||||
manager: AutoSyncManager,
|
||||
file_announcement_send: Option<UnboundedSender<u64>>,
|
||||
}
|
||||
|
||||
impl SyncService {
|
||||
@ -159,10 +161,23 @@ impl SyncService {
|
||||
|
||||
let store = Store::new(store, executor.clone());
|
||||
|
||||
let manager = AutoSyncManager::new(store.clone(), sync_send.clone(), config).await?;
|
||||
if config.auto_sync_enabled {
|
||||
manager.spwn(&executor, event_recv);
|
||||
}
|
||||
// init auto sync
|
||||
let file_announcement_send = if config.auto_sync_enabled {
|
||||
let (send, recv) = unbounded_channel();
|
||||
|
||||
// sync in sequence
|
||||
let serial_batcher =
|
||||
SerialBatcher::new(config, store.clone(), sync_send.clone()).await?;
|
||||
executor.spawn(serial_batcher.start(recv, event_recv), "auto_sync_serial");
|
||||
|
||||
// sync randomly
|
||||
let random_batcher = RandomBatcher::new(config, store.clone(), sync_send.clone());
|
||||
executor.spawn(random_batcher.start(), "auto_sync_random");
|
||||
|
||||
Some(send)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut sync = SyncService {
|
||||
config,
|
||||
@ -172,7 +187,7 @@ impl SyncService {
|
||||
file_location_cache,
|
||||
controllers: Default::default(),
|
||||
heartbeat,
|
||||
manager,
|
||||
file_announcement_send,
|
||||
};
|
||||
|
||||
info!("Starting sync service");
|
||||
@ -606,7 +621,9 @@ impl SyncService {
|
||||
let tx_seq = tx_id.seq;
|
||||
debug!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip");
|
||||
|
||||
self.manager.update_on_announcement(tx_seq).await;
|
||||
if let Some(send) = &self.file_announcement_send {
|
||||
let _ = send.send(tx_seq);
|
||||
}
|
||||
|
||||
// File already in sync
|
||||
if let Some(controller) = self.controllers.get_mut(&tx_seq) {
|
||||
@ -827,12 +844,9 @@ mod tests {
|
||||
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
||||
|
||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
let (sync_send, sync_recv) = channel::Channel::unbounded();
|
||||
let (_, sync_recv) = channel::Channel::unbounded();
|
||||
|
||||
let heartbeat = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SEC));
|
||||
let manager = AutoSyncManager::new(store.clone(), sync_send, Config::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut sync = SyncService {
|
||||
config: Config::default(),
|
||||
@ -842,7 +856,7 @@ mod tests {
|
||||
file_location_cache,
|
||||
controllers: Default::default(),
|
||||
heartbeat,
|
||||
manager,
|
||||
file_announcement_send: None,
|
||||
};
|
||||
|
||||
sync.on_peer_connected(init_peer_id);
|
||||
@ -862,12 +876,9 @@ mod tests {
|
||||
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
||||
|
||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
let (sync_send, sync_recv) = channel::Channel::unbounded();
|
||||
let (_, sync_recv) = channel::Channel::unbounded();
|
||||
|
||||
let heartbeat = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL_SEC));
|
||||
let manager = AutoSyncManager::new(store.clone(), sync_send, Config::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut sync = SyncService {
|
||||
config: Config::default(),
|
||||
@ -877,7 +888,7 @@ mod tests {
|
||||
file_location_cache,
|
||||
controllers: Default::default(),
|
||||
heartbeat,
|
||||
manager,
|
||||
file_announcement_send: None,
|
||||
};
|
||||
|
||||
sync.on_peer_disconnected(init_peer_id);
|
||||
|
@ -237,3 +237,6 @@ miner_key = ""
|
||||
|
||||
# Enable to start a file sync automatically when a file announcement P2P message received.
|
||||
# sync_file_on_announcement_enabled = false
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 8
|
||||
|
Loading…
Reference in New Issue
Block a user