mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-26 16:25:18 +00:00
Compare commits
1 Commits
7c3a42aefc
...
8dd229f28a
Author | SHA1 | Date | |
---|---|---|---|
|
8dd229f28a |
@ -1,11 +1,9 @@
|
|||||||
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender};
|
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use serde::{Deserialize, Serialize};
|
use std::fmt::Debug;
|
||||||
use std::{fmt::Debug, sync::Arc};
|
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::sync::RwLock;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
#[derive(Debug)]
|
||||||
pub enum SyncResult {
|
pub enum SyncResult {
|
||||||
Completed,
|
Completed,
|
||||||
Failed,
|
Failed,
|
||||||
@ -13,15 +11,20 @@ pub enum SyncResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Supports to sync files concurrently.
|
/// Supports to sync files concurrently.
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Batcher {
|
pub struct Batcher {
|
||||||
config: Config,
|
config: Config,
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
tasks: Arc<RwLock<Vec<u64>>>, // files to sync
|
tasks: Vec<u64>, // files to sync
|
||||||
store: Store,
|
store: Store,
|
||||||
sync_send: SyncSender,
|
sync_send: SyncSender,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Debug for Batcher {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{:?}", self.tasks)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Batcher {
|
impl Batcher {
|
||||||
pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self {
|
pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -33,43 +36,36 @@ impl Batcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.tasks.read().await.len()
|
self.tasks.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn tasks(&self) -> Vec<u64> {
|
pub async fn add(&mut self, tx_seq: u64) -> Result<bool> {
|
||||||
self.tasks.read().await.clone()
|
// limits the number of threads
|
||||||
}
|
if self.tasks.len() >= self.capacity {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn add(&self, tx_seq: u64) -> Result<bool> {
|
|
||||||
// requires log entry available before file sync
|
// requires log entry available before file sync
|
||||||
if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() {
|
if self.store.get_tx_by_seq_number(tx_seq).await?.is_none() {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut tasks = self.tasks.write().await;
|
self.tasks.push(tx_seq);
|
||||||
|
|
||||||
// limits the number of threads
|
|
||||||
if tasks.len() >= self.capacity {
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks.push(tx_seq);
|
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn reorg(&self, reverted_tx_seq: u64) {
|
pub fn reorg(&mut self, reverted_tx_seq: u64) {
|
||||||
self.tasks.write().await.retain(|&x| x < reverted_tx_seq);
|
self.tasks.retain(|&x| x < reverted_tx_seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Poll the sync result of any completed file sync.
|
/// Poll the sync result of any completed file sync.
|
||||||
pub async fn poll(&self) -> Result<Option<(u64, SyncResult)>> {
|
pub async fn poll(&mut self) -> Result<Option<(u64, SyncResult)>> {
|
||||||
let mut result = None;
|
let mut result = None;
|
||||||
let tasks = self.tasks.read().await.clone();
|
let mut index = self.tasks.len();
|
||||||
let mut index = tasks.len();
|
|
||||||
|
|
||||||
for (i, tx_seq) in tasks.iter().enumerate() {
|
for (i, tx_seq) in self.tasks.iter().enumerate() {
|
||||||
if let Some(ret) = self.poll_tx(*tx_seq).await? {
|
if let Some(ret) = self.poll_tx(*tx_seq).await? {
|
||||||
result = Some((*tx_seq, ret));
|
result = Some((*tx_seq, ret));
|
||||||
index = i;
|
index = i;
|
||||||
@ -77,9 +73,8 @@ impl Batcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut tasks = self.tasks.write().await;
|
if index < self.tasks.len() {
|
||||||
if index < tasks.len() {
|
self.tasks.swap_remove(index);
|
||||||
tasks.swap_remove(index);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
|
@ -4,7 +4,6 @@ use crate::{
|
|||||||
Config, SyncSender,
|
Config, SyncSender,
|
||||||
};
|
};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
@ -12,15 +11,6 @@ use std::sync::{
|
|||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct RandomBatcherState {
|
|
||||||
pub tasks: Vec<u64>,
|
|
||||||
pub pending_txs: usize,
|
|
||||||
pub ready_txs: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct RandomBatcher {
|
pub struct RandomBatcher {
|
||||||
batcher: Batcher,
|
batcher: Batcher,
|
||||||
sync_store: Arc<SyncStore>,
|
sync_store: Arc<SyncStore>,
|
||||||
@ -40,16 +30,6 @@ impl RandomBatcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_state(&self) -> Result<RandomBatcherState> {
|
|
||||||
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
|
|
||||||
|
|
||||||
Ok(RandomBatcherState {
|
|
||||||
tasks: self.batcher.tasks().await,
|
|
||||||
pending_txs,
|
|
||||||
ready_txs,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
|
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
|
||||||
info!("Start to sync files");
|
info!("Start to sync files");
|
||||||
|
|
||||||
@ -65,13 +45,13 @@ impl RandomBatcher {
|
|||||||
Ok(true) => {}
|
Ok(true) => {}
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
trace!(
|
trace!(
|
||||||
"File sync still in progress or idle, state = {:?}",
|
"File sync still in progress or idle, {:?}",
|
||||||
self.get_state().await
|
self.stat().await
|
||||||
);
|
);
|
||||||
sleep(INTERVAL_IDLE).await;
|
sleep(INTERVAL_IDLE).await;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
|
warn!(%err, "Failed to sync file once, {:?}", self.stat().await);
|
||||||
sleep(INTERVAL_ERROR).await;
|
sleep(INTERVAL_ERROR).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -89,7 +69,7 @@ impl RandomBatcher {
|
|||||||
None => return Ok(false),
|
None => return Ok(false),
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
|
debug!(%tx_seq, ?sync_result, "Completed to sync file, {:?}", self.stat().await);
|
||||||
|
|
||||||
match sync_result {
|
match sync_result {
|
||||||
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,
|
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,
|
||||||
@ -100,7 +80,7 @@ impl RandomBatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn schedule(&mut self) -> Result<bool> {
|
async fn schedule(&mut self) -> Result<bool> {
|
||||||
if self.batcher.len().await > 0 {
|
if self.batcher.len() > 0 {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,8 +93,21 @@ impl RandomBatcher {
|
|||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Pick a file to sync, state = {:?}", self.get_state().await);
|
debug!("Pick a file to sync, {:?}", self.stat().await);
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn stat(&self) -> String {
|
||||||
|
match self.sync_store.stat().await {
|
||||||
|
Ok((num_pending_txs, num_ready_txs)) => format!(
|
||||||
|
"RandomBatcher {{ batcher = {:?}, pending_txs = {}, ready_txs = {}}}",
|
||||||
|
self.batcher, num_pending_txs, num_ready_txs
|
||||||
|
),
|
||||||
|
Err(err) => format!(
|
||||||
|
"RandomBatcher {{ batcher = {:?}, pending_txs/ready_txs = Error({:?})}}",
|
||||||
|
self.batcher, err
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,70 +8,58 @@ use crate::{
|
|||||||
};
|
};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log_entry_sync::LogSyncEvent;
|
use log_entry_sync::LogSyncEvent;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{broadcast::Receiver, mpsc::UnboundedReceiver, RwLock},
|
sync::{broadcast::Receiver, mpsc::UnboundedReceiver},
|
||||||
time::sleep,
|
time::sleep,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Supports to sync files in sequence concurrently.
|
/// Supports to sync files in sequence concurrently.
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct SerialBatcher {
|
pub struct SerialBatcher {
|
||||||
batcher: Batcher,
|
batcher: Batcher,
|
||||||
|
|
||||||
/// Next tx seq to sync.
|
/// Next tx seq to sync.
|
||||||
next_tx_seq: Arc<AtomicU64>,
|
next_tx_seq: u64,
|
||||||
/// Maximum tx seq to sync.
|
/// Maximum tx seq to sync.
|
||||||
max_tx_seq: Arc<AtomicU64>,
|
max_tx_seq: u64,
|
||||||
|
|
||||||
/// Completed txs that pending to update sync db in sequence.
|
/// Completed txs that pending to update sync db in sequence.
|
||||||
pending_completed_txs: Arc<RwLock<HashMap<u64, SyncResult>>>,
|
pending_completed_txs: HashMap<u64, SyncResult>,
|
||||||
/// Next tx seq to sync in db, so as to continue file sync from
|
/// Next tx seq to sync in db, so as to continue file sync from
|
||||||
/// break point when program restarted.
|
/// break point when program restarted.
|
||||||
next_tx_seq_in_db: Arc<AtomicU64>,
|
next_tx_seq_in_db: u64,
|
||||||
|
|
||||||
sync_store: Arc<SyncStore>,
|
sync_store: Arc<SyncStore>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
impl Debug for SerialBatcher {
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct SerialBatcherState {
|
|
||||||
pub tasks: Vec<u64>,
|
|
||||||
pub next: u64,
|
|
||||||
pub max: u64,
|
|
||||||
pub pendings: HashMap<u64, SyncResult>,
|
|
||||||
pub next_in_db: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Debug for SerialBatcherState {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
let max_tx_seq_desc = if self.max == u64::MAX {
|
let max_tx_seq_desc = if self.max_tx_seq == u64::MAX {
|
||||||
"N/A".into()
|
"N/A".into()
|
||||||
} else {
|
} else {
|
||||||
format!("{}", self.max)
|
format!("{}", self.max_tx_seq)
|
||||||
};
|
};
|
||||||
|
|
||||||
let pendings_desc = if self.pendings.len() <= 5 {
|
let pendings_desc = if self.pending_completed_txs.len() <= 5 {
|
||||||
format!("{:?}", self.pendings)
|
format!("{:?}", self.pending_completed_txs)
|
||||||
} else {
|
} else {
|
||||||
format!("{}", self.pendings.len())
|
format!("{}", self.pending_completed_txs.len())
|
||||||
};
|
};
|
||||||
|
|
||||||
f.debug_struct("SerialBatcher")
|
f.debug_struct("SerialBatcher")
|
||||||
.field("tasks", &self.tasks)
|
.field("batcher", &self.batcher)
|
||||||
.field("next", &self.next)
|
.field("next", &self.next_tx_seq)
|
||||||
.field("max", &max_tx_seq_desc)
|
.field("max", &max_tx_seq_desc)
|
||||||
.field("pendings", &pendings_desc)
|
.field("pendings", &pendings_desc)
|
||||||
.field("next_in_db", &self.next_in_db)
|
.field("next_in_db", &self.next_tx_seq_in_db)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -90,37 +78,21 @@ impl SerialBatcher {
|
|||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
batcher: Batcher::new(config, capacity, store, sync_send),
|
batcher: Batcher::new(config, capacity, store, sync_send),
|
||||||
next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
|
next_tx_seq: next_tx_seq.unwrap_or(0),
|
||||||
max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq.unwrap_or(u64::MAX))),
|
max_tx_seq: max_tx_seq.unwrap_or(u64::MAX),
|
||||||
pending_completed_txs: Default::default(),
|
pending_completed_txs: Default::default(),
|
||||||
next_tx_seq_in_db: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
|
next_tx_seq_in_db: next_tx_seq.unwrap_or(0),
|
||||||
sync_store,
|
sync_store,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_state(&self) -> SerialBatcherState {
|
|
||||||
SerialBatcherState {
|
|
||||||
tasks: self.batcher.tasks().await,
|
|
||||||
next: self.next_tx_seq.load(Ordering::Relaxed),
|
|
||||||
max: self.max_tx_seq.load(Ordering::Relaxed),
|
|
||||||
pendings: self
|
|
||||||
.pending_completed_txs
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.iter()
|
|
||||||
.map(|(k, v)| (*k, *v))
|
|
||||||
.collect(),
|
|
||||||
next_in_db: self.next_tx_seq_in_db.load(Ordering::Relaxed),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(
|
pub async fn start(
|
||||||
mut self,
|
mut self,
|
||||||
mut file_announcement_recv: UnboundedReceiver<u64>,
|
mut file_announcement_recv: UnboundedReceiver<u64>,
|
||||||
mut log_sync_recv: Receiver<LogSyncEvent>,
|
mut log_sync_recv: Receiver<LogSyncEvent>,
|
||||||
catched_up: Arc<AtomicBool>,
|
catched_up: Arc<AtomicBool>,
|
||||||
) {
|
) {
|
||||||
info!("Start to sync files, state = {:?}", self.get_state().await);
|
info!(?self, "Start to sync files");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// handle all pending file announcements
|
// handle all pending file announcements
|
||||||
@ -147,14 +119,11 @@ impl SerialBatcher {
|
|||||||
match self.sync_once().await {
|
match self.sync_once().await {
|
||||||
Ok(true) => {}
|
Ok(true) => {}
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
trace!(
|
trace!(?self, "File sync still in progress or idle");
|
||||||
"File sync still in progress or idle, state = {:?}",
|
|
||||||
self.get_state().await
|
|
||||||
);
|
|
||||||
sleep(INTERVAL_IDLE).await;
|
sleep(INTERVAL_IDLE).await;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
|
warn!(%err, ?self, "Failed to sync file once");
|
||||||
sleep(INTERVAL_ERROR).await;
|
sleep(INTERVAL_ERROR).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -170,27 +139,26 @@ impl SerialBatcher {
|
|||||||
trace!(%announced_tx_seq, "Received file announcement");
|
trace!(%announced_tx_seq, "Received file announcement");
|
||||||
|
|
||||||
// new file announced
|
// new file announced
|
||||||
let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
|
if self.max_tx_seq == u64::MAX || announced_tx_seq > self.max_tx_seq {
|
||||||
if max_tx_seq == u64::MAX || announced_tx_seq > max_tx_seq {
|
debug!(%announced_tx_seq, ?self, "Update for new file announcement");
|
||||||
debug!(%announced_tx_seq, "Update for new file announcement, state = {:?}", self.get_state().await);
|
|
||||||
|
|
||||||
if let Err(err) = self.sync_store.set_max_tx_seq(announced_tx_seq).await {
|
if let Err(err) = self.sync_store.set_max_tx_seq(announced_tx_seq).await {
|
||||||
error!(%err, %announced_tx_seq, "Failed to set max_tx_seq in store, state = {:?}", self.get_state().await);
|
error!(%err, %announced_tx_seq, ?self, "Failed to set max_tx_seq in store");
|
||||||
}
|
}
|
||||||
|
|
||||||
self.max_tx_seq.store(announced_tx_seq, Ordering::Relaxed);
|
self.max_tx_seq = announced_tx_seq;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// already wait for sequential sync
|
// already wait for sequential sync
|
||||||
if announced_tx_seq >= self.next_tx_seq.load(Ordering::Relaxed) {
|
if announced_tx_seq >= self.next_tx_seq {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise, mark tx as ready for sync
|
// otherwise, mark tx as ready for sync
|
||||||
if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await {
|
if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await {
|
||||||
error!(%err, %announced_tx_seq, "Failed to promote announced tx to ready, state = {:?}", self.get_state().await);
|
error!(%err, %announced_tx_seq, ?self, "Failed to promote announced tx to ready");
|
||||||
}
|
}
|
||||||
|
|
||||||
true
|
true
|
||||||
@ -205,11 +173,11 @@ impl SerialBatcher {
|
|||||||
debug!(%reverted_tx_seq, "Reorg detected");
|
debug!(%reverted_tx_seq, "Reorg detected");
|
||||||
|
|
||||||
// reorg happened, but no impact on file sync
|
// reorg happened, but no impact on file sync
|
||||||
if reverted_tx_seq >= self.next_tx_seq.load(Ordering::Relaxed) {
|
if reverted_tx_seq >= self.next_tx_seq {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(%reverted_tx_seq, "Handle reorg started, state = {:?}", self.get_state().await);
|
info!(%reverted_tx_seq, ?self, "Handle reorg started");
|
||||||
|
|
||||||
// terminate all files in progress
|
// terminate all files in progress
|
||||||
self.batcher
|
self.batcher
|
||||||
@ -217,22 +185,23 @@ impl SerialBatcher {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
// update states
|
// update states
|
||||||
self.batcher.reorg(reverted_tx_seq).await;
|
self.batcher.reorg(reverted_tx_seq);
|
||||||
self.next_tx_seq.store(reverted_tx_seq, Ordering::Relaxed);
|
self.next_tx_seq = reverted_tx_seq;
|
||||||
self.pending_completed_txs
|
self.pending_completed_txs
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.retain(|&k, _| k < reverted_tx_seq);
|
.retain(|&k, _| k < reverted_tx_seq);
|
||||||
if self.next_tx_seq_in_db.load(Ordering::Relaxed) > reverted_tx_seq {
|
if self.next_tx_seq_in_db > reverted_tx_seq {
|
||||||
self.next_tx_seq_in_db
|
self.next_tx_seq_in_db = reverted_tx_seq;
|
||||||
.store(reverted_tx_seq, Ordering::Relaxed);
|
|
||||||
|
|
||||||
if let Err(err) = self.sync_store.set_next_tx_seq(reverted_tx_seq).await {
|
if let Err(err) = self
|
||||||
error!(%err, %reverted_tx_seq, "Failed to set next tx seq due to tx reverted, state = {:?}", self.get_state().await);
|
.sync_store
|
||||||
|
.set_next_tx_seq(self.next_tx_seq_in_db)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!(%err, %reverted_tx_seq, ?self, "Failed to set next tx seq due to tx reverted");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(%reverted_tx_seq, "Handle reorg ended, state = {:?}", self.get_state().await);
|
info!(%reverted_tx_seq, ?self, "Handle reorg ended");
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@ -249,11 +218,8 @@ impl SerialBatcher {
|
|||||||
None => return Ok(false),
|
None => return Ok(false),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
|
info!(%tx_seq, ?sync_result, ?self, "Completed to sync file");
|
||||||
self.pending_completed_txs
|
self.pending_completed_txs.insert(tx_seq, sync_result);
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.insert(tx_seq, sync_result);
|
|
||||||
|
|
||||||
// update sync db
|
// update sync db
|
||||||
self.update_completed_txs_in_db().await?;
|
self.update_completed_txs_in_db().await?;
|
||||||
@ -263,49 +229,45 @@ impl SerialBatcher {
|
|||||||
|
|
||||||
/// Schedule file sync in sequence.
|
/// Schedule file sync in sequence.
|
||||||
async fn schedule_next(&mut self) -> Result<bool> {
|
async fn schedule_next(&mut self) -> Result<bool> {
|
||||||
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
if self.next_tx_seq > self.max_tx_seq {
|
||||||
if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) {
|
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.batcher.add(next_tx_seq).await? {
|
if !self.batcher.add(self.next_tx_seq).await? {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.next_tx_seq.store(next_tx_seq + 1, Ordering::Relaxed);
|
self.next_tx_seq += 1;
|
||||||
|
|
||||||
info!("Move forward, state = {:?}", self.get_state().await);
|
info!(?self, "Move forward");
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update file sync index in db.
|
/// Update file sync index in db.
|
||||||
async fn update_completed_txs_in_db(&mut self) -> Result<()> {
|
async fn update_completed_txs_in_db(&mut self) -> Result<()> {
|
||||||
let origin = self.next_tx_seq_in_db.load(Ordering::Relaxed);
|
let origin = self.next_tx_seq_in_db;
|
||||||
let mut current = origin;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let sync_result = match self.pending_completed_txs.read().await.get(¤t) {
|
|
||||||
Some(&v) => v,
|
|
||||||
None => break,
|
|
||||||
};
|
|
||||||
|
|
||||||
|
while let Some(sync_result) = self.pending_completed_txs.get(&self.next_tx_seq_in_db) {
|
||||||
// downgrade to random sync if file sync failed or timeout
|
// downgrade to random sync if file sync failed or timeout
|
||||||
if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) {
|
if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) {
|
||||||
self.sync_store.add_pending_tx(current).await?;
|
self.sync_store
|
||||||
|
.add_pending_tx(self.next_tx_seq_in_db)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// always move forward in db
|
// always move forward in db
|
||||||
self.sync_store.set_next_tx_seq(current + 1).await?;
|
self.sync_store
|
||||||
|
.set_next_tx_seq(self.next_tx_seq_in_db + 1)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// update in memory after db updated
|
// update in memory after db updated
|
||||||
self.pending_completed_txs.write().await.remove(¤t);
|
self.pending_completed_txs.remove(&self.next_tx_seq_in_db);
|
||||||
current += 1;
|
self.next_tx_seq_in_db += 1;
|
||||||
self.next_tx_seq_in_db.store(current, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if current > origin {
|
if self.next_tx_seq_in_db > origin {
|
||||||
info!(%origin, %current, "Move forward in db");
|
info!(%origin, %self.next_tx_seq_in_db, "Move forward in db");
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,71 +0,0 @@
|
|||||||
use std::sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use log_entry_sync::LogSyncEvent;
|
|
||||||
use storage_async::Store;
|
|
||||||
use task_executor::TaskExecutor;
|
|
||||||
use tokio::sync::{
|
|
||||||
broadcast,
|
|
||||||
mpsc::{unbounded_channel, UnboundedSender},
|
|
||||||
oneshot,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{Config, SyncSender};
|
|
||||||
|
|
||||||
use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore};
|
|
||||||
|
|
||||||
pub struct AutoSyncManager {
|
|
||||||
pub serial: SerialBatcher,
|
|
||||||
pub random: RandomBatcher,
|
|
||||||
pub file_announcement_send: UnboundedSender<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AutoSyncManager {
|
|
||||||
pub async fn spawn(
|
|
||||||
config: Config,
|
|
||||||
executor: &TaskExecutor,
|
|
||||||
store: Store,
|
|
||||||
sync_send: SyncSender,
|
|
||||||
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
|
|
||||||
catch_up_end_recv: oneshot::Receiver<()>,
|
|
||||||
) -> Result<Self> {
|
|
||||||
let (send, recv) = unbounded_channel();
|
|
||||||
let sync_store = Arc::new(SyncStore::new(store.clone()));
|
|
||||||
let catched_up = Arc::new(AtomicBool::new(false));
|
|
||||||
|
|
||||||
// sync in sequence
|
|
||||||
let serial =
|
|
||||||
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
|
|
||||||
.await?;
|
|
||||||
executor.spawn(
|
|
||||||
serial
|
|
||||||
.clone()
|
|
||||||
.start(recv, log_sync_recv, catched_up.clone()),
|
|
||||||
"auto_sync_serial",
|
|
||||||
);
|
|
||||||
|
|
||||||
// sync randomly
|
|
||||||
let random = RandomBatcher::new(config, store, sync_send, sync_store);
|
|
||||||
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
|
|
||||||
|
|
||||||
// handle on catched up notification
|
|
||||||
executor.spawn(
|
|
||||||
async move {
|
|
||||||
if catch_up_end_recv.await.is_ok() {
|
|
||||||
info!("log entry catched up");
|
|
||||||
catched_up.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"auto_sync_wait_for_catchup",
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
serial,
|
|
||||||
random,
|
|
||||||
file_announcement_send: send,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,7 +1,6 @@
|
|||||||
mod batcher;
|
mod batcher;
|
||||||
pub mod batcher_random;
|
pub mod batcher_random;
|
||||||
pub mod batcher_serial;
|
pub mod batcher_serial;
|
||||||
pub mod manager;
|
|
||||||
pub mod sync_store;
|
pub mod sync_store;
|
||||||
mod tx_store;
|
mod tx_store;
|
||||||
|
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
use crate::auto_sync::manager::AutoSyncManager;
|
use crate::auto_sync::batcher_random::RandomBatcher;
|
||||||
|
use crate::auto_sync::batcher_serial::SerialBatcher;
|
||||||
|
use crate::auto_sync::sync_store::SyncStore;
|
||||||
use crate::context::SyncNetworkContext;
|
use crate::context::SyncNetworkContext;
|
||||||
use crate::controllers::{
|
use crate::controllers::{
|
||||||
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
|
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
|
||||||
@ -16,6 +18,7 @@ use network::{
|
|||||||
PeerRequestId, SyncId as RequestId,
|
PeerRequestId, SyncId as RequestId,
|
||||||
};
|
};
|
||||||
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID};
|
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID};
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{hash_map::Entry, HashMap},
|
collections::{hash_map::Entry, HashMap},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
@ -24,6 +27,7 @@ use storage::config::ShardConfig;
|
|||||||
use storage::error::Result as StorageResult;
|
use storage::error::Result as StorageResult;
|
||||||
use storage::log_store::Store as LogStore;
|
use storage::log_store::Store as LogStore;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
|
|
||||||
const HEARTBEAT_INTERVAL_SEC: u64 = 5;
|
const HEARTBEAT_INTERVAL_SEC: u64 = 5;
|
||||||
@ -127,7 +131,7 @@ pub struct SyncService {
|
|||||||
/// Heartbeat interval for executing periodic tasks.
|
/// Heartbeat interval for executing periodic tasks.
|
||||||
heartbeat: tokio::time::Interval,
|
heartbeat: tokio::time::Interval,
|
||||||
|
|
||||||
auto_sync_manager: Option<AutoSyncManager>,
|
file_announcement_send: Option<UnboundedSender<u64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SyncService {
|
impl SyncService {
|
||||||
@ -168,18 +172,35 @@ impl SyncService {
|
|||||||
let store = Store::new(store, executor.clone());
|
let store = Store::new(store, executor.clone());
|
||||||
|
|
||||||
// init auto sync
|
// init auto sync
|
||||||
let auto_sync_manager = if config.auto_sync_enabled {
|
let file_announcement_send = if config.auto_sync_enabled {
|
||||||
Some(
|
let (send, recv) = unbounded_channel();
|
||||||
AutoSyncManager::spawn(
|
let sync_store = Arc::new(SyncStore::new(store.clone()));
|
||||||
config,
|
let catched_up = Arc::new(AtomicBool::new(false));
|
||||||
&executor,
|
|
||||||
store.clone(),
|
// sync in sequence
|
||||||
sync_send.clone(),
|
let serial_batcher =
|
||||||
event_recv,
|
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
|
||||||
catch_up_end_recv,
|
.await?;
|
||||||
)
|
executor.spawn(
|
||||||
.await?,
|
serial_batcher.start(recv, event_recv, catched_up.clone()),
|
||||||
)
|
"auto_sync_serial",
|
||||||
|
);
|
||||||
|
|
||||||
|
// sync randomly
|
||||||
|
let random_batcher =
|
||||||
|
RandomBatcher::new(config, store.clone(), sync_send.clone(), sync_store);
|
||||||
|
executor.spawn(random_batcher.start(catched_up.clone()), "auto_sync_random");
|
||||||
|
|
||||||
|
// handle on catched up notification
|
||||||
|
executor.spawn(
|
||||||
|
async move {
|
||||||
|
catch_up_end_recv.await.expect("Catch up sender dropped");
|
||||||
|
catched_up.store(true, Ordering::Relaxed);
|
||||||
|
},
|
||||||
|
"auto_sync_wait_for_catchup",
|
||||||
|
);
|
||||||
|
|
||||||
|
Some(send)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@ -192,7 +213,7 @@ impl SyncService {
|
|||||||
file_location_cache,
|
file_location_cache,
|
||||||
controllers: Default::default(),
|
controllers: Default::default(),
|
||||||
heartbeat,
|
heartbeat,
|
||||||
auto_sync_manager,
|
file_announcement_send,
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Starting sync service");
|
info!("Starting sync service");
|
||||||
@ -655,8 +676,8 @@ impl SyncService {
|
|||||||
let tx_seq = tx_id.seq;
|
let tx_seq = tx_id.seq;
|
||||||
trace!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip");
|
trace!(%tx_seq, %peer_id, %addr, "Received AnnounceFile gossip");
|
||||||
|
|
||||||
if let Some(manager) = &self.auto_sync_manager {
|
if let Some(send) = &self.file_announcement_send {
|
||||||
let _ = manager.file_announcement_send.send(tx_seq);
|
let _ = send.send(tx_seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// File already in sync
|
// File already in sync
|
||||||
@ -894,7 +915,7 @@ mod tests {
|
|||||||
file_location_cache,
|
file_location_cache,
|
||||||
controllers: Default::default(),
|
controllers: Default::default(),
|
||||||
heartbeat,
|
heartbeat,
|
||||||
auto_sync_manager: None,
|
file_announcement_send: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
sync.on_peer_connected(init_peer_id);
|
sync.on_peer_connected(init_peer_id);
|
||||||
@ -926,7 +947,7 @@ mod tests {
|
|||||||
file_location_cache,
|
file_location_cache,
|
||||||
controllers: Default::default(),
|
controllers: Default::default(),
|
||||||
heartbeat,
|
heartbeat,
|
||||||
auto_sync_manager: None,
|
file_announcement_send: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
sync.on_peer_disconnected(init_peer_id);
|
sync.on_peer_disconnected(init_peer_id);
|
||||||
|
Loading…
Reference in New Issue
Block a user