From 84f455d1d4a6fb46eef153a59796bd20239cc8b8 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Mon, 8 Jul 2024 11:47:59 +0800 Subject: [PATCH] Fix some issues in sync package (#111) * Change default concurrent sync threads * enhance log level in sync store * Add rwlock in sync store to avoid concurrency issue * enhance serial sync state machine log level * wrap Instant for debug purpose * add timestamp for connecting peers state * Select random peer to sync chunks * return sync peers states instead of connected peers only * Use tokio async RwLock for sync store --- node/sync/src/auto_sync/batcher_random.rs | 37 ++++++-- node/sync/src/auto_sync/batcher_serial.rs | 12 ++- node/sync/src/auto_sync/mod.rs | 2 +- node/sync/src/auto_sync/sync_store.rs | 71 +++++++------- node/sync/src/controllers/mod.rs | 5 +- node/sync/src/controllers/peers.rs | 26 +++-- node/sync/src/controllers/serial.rs | 110 ++++++++++++---------- node/sync/src/lib.rs | 28 +++++- node/sync/src/service.rs | 8 +- run/config.toml | 2 +- 10 files changed, 188 insertions(+), 113 deletions(-) diff --git a/node/sync/src/auto_sync/batcher_random.rs b/node/sync/src/auto_sync/batcher_random.rs index 1967c4f..14073c4 100644 --- a/node/sync/src/auto_sync/batcher_random.rs +++ b/node/sync/src/auto_sync/batcher_random.rs @@ -4,19 +4,22 @@ use crate::{ Config, SyncSender, }; use anyhow::Result; +use std::sync::Arc; use storage_async::Store; use tokio::time::sleep; -#[derive(Debug)] pub struct RandomBatcher { batcher: Batcher, - sync_store: SyncStore, + sync_store: Arc, } impl RandomBatcher { - pub fn new(config: Config, store: Store, sync_send: SyncSender) -> Self { - let sync_store = SyncStore::new(store.clone()); - + pub fn new( + config: Config, + store: Store, + sync_send: SyncSender, + sync_store: Arc, + ) -> Self { Self { // now, only 1 thread to sync file randomly batcher: Batcher::new(config, 1, store, sync_send), @@ -31,11 +34,14 @@ impl RandomBatcher { match self.sync_once().await { Ok(true) => {} Ok(false) => { - trace!(?self, "File sync still in progress or idle"); + trace!( + "File sync still in progress or idle, {:?}", + self.stat().await + ); sleep(INTERVAL_IDLE).await; } Err(err) => { - warn!(%err, ?self, "Failed to sync file once"); + warn!(%err, "Failed to sync file once, {:?}", self.stat().await); sleep(INTERVAL_ERROR).await; } } @@ -53,7 +59,7 @@ impl RandomBatcher { None => return Ok(false), }; - debug!(%tx_seq, ?sync_result, ?self, "Completed to sync file"); + debug!(%tx_seq, ?sync_result, "Completed to sync file, {:?}", self.stat().await); match sync_result { SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?, @@ -77,8 +83,21 @@ impl RandomBatcher { return Ok(false); } - debug!(?self, "Pick a file to sync"); + debug!("Pick a file to sync, {:?}", self.stat().await); Ok(true) } + + async fn stat(&self) -> String { + match self.sync_store.stat().await { + Ok((num_pending_txs, num_ready_txs)) => format!( + "RandomBatcher {{ batcher = {:?}, pending_txs = {}, ready_txs = {}}}", + self.batcher, num_pending_txs, num_ready_txs + ), + Err(err) => format!( + "RandomBatcher {{ batcher = {:?}, pending_txs/ready_txs = Error({:?})}}", + self.batcher, err + ), + } + } } diff --git a/node/sync/src/auto_sync/batcher_serial.rs b/node/sync/src/auto_sync/batcher_serial.rs index 86b491f..64f60fe 100644 --- a/node/sync/src/auto_sync/batcher_serial.rs +++ b/node/sync/src/auto_sync/batcher_serial.rs @@ -8,7 +8,7 @@ use crate::{ }; use anyhow::Result; use log_entry_sync::LogSyncEvent; -use std::{collections::HashMap, fmt::Debug}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; use storage_async::Store; use tokio::{ sync::{broadcast::Receiver, mpsc::UnboundedReceiver}, @@ -30,7 +30,7 @@ pub struct SerialBatcher { /// break point when program restarted. next_tx_seq_in_db: u64, - sync_store: SyncStore, + sync_store: Arc, } impl Debug for SerialBatcher { @@ -58,9 +58,13 @@ impl Debug for SerialBatcher { } impl SerialBatcher { - pub async fn new(config: Config, store: Store, sync_send: SyncSender) -> Result { + pub async fn new( + config: Config, + store: Store, + sync_send: SyncSender, + sync_store: Arc, + ) -> Result { let capacity = config.max_sequential_workers; - let sync_store = SyncStore::new(store.clone()); // continue file sync from break point in db let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?; diff --git a/node/sync/src/auto_sync/mod.rs b/node/sync/src/auto_sync/mod.rs index cb1cfef..0f1ce41 100644 --- a/node/sync/src/auto_sync/mod.rs +++ b/node/sync/src/auto_sync/mod.rs @@ -1,7 +1,7 @@ mod batcher; pub mod batcher_random; pub mod batcher_serial; -mod sync_store; +pub mod sync_store; mod tx_store; use std::time::Duration; diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index f42f4a3..8c59448 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -1,15 +1,15 @@ use super::tx_store::TxStore; use anyhow::Result; -use std::fmt::Debug; +use std::sync::Arc; use storage::log_store::config::{ConfigTx, ConfigurableExt}; use storage_async::Store; +use tokio::sync::RwLock; const KEY_NEXT_TX_SEQ: &str = "sync.manager.next_tx_seq"; const KEY_MAX_TX_SEQ: &str = "sync.manager.max_tx_seq"; -#[derive(Clone)] pub struct SyncStore { - store: Store, + store: Arc>, /// Pending transactions to sync with low priority. pending_txs: TxStore, @@ -19,38 +19,29 @@ pub struct SyncStore { ready_txs: TxStore, } -impl Debug for SyncStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let store = self.store.get_store(); - - let pendings = match self.pending_txs.count(store) { - Ok(count) => format!("{}", count), - Err(err) => format!("Err: {:?}", err), - }; - - let ready = match self.ready_txs.count(store) { - Ok(count) => format!("{}", count), - Err(err) => format!("Err: {:?}", err), - }; - - f.debug_struct("SyncStore") - .field("pending_txs", &pendings) - .field("ready_txs", &ready) - .finish() - } -} - impl SyncStore { pub fn new(store: Store) -> Self { Self { - store, + store: Arc::new(RwLock::new(store)), pending_txs: TxStore::new("pending"), ready_txs: TxStore::new("ready"), } } + /// Returns the number of pending txs and ready txs. + pub async fn stat(&self) -> Result<(usize, usize)> { + let async_store = self.store.read().await; + let store = async_store.get_store(); + + let num_pending_txs = self.pending_txs.count(store)?; + let num_ready_txs = self.ready_txs.count(store)?; + + Ok((num_pending_txs, num_ready_txs)) + } + pub async fn get_tx_seq_range(&self) -> Result<(Option, Option)> { - let store = self.store.get_store(); + let async_store = self.store.read().await; + let store = async_store.get_store(); // load next_tx_seq let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?; @@ -62,20 +53,20 @@ impl SyncStore { } pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> { - self.store - .get_store() - .set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq) + let async_store = self.store.write().await; + let store = async_store.get_store(); + store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq) } pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> { - debug!(%tx_seq, "set_max_tx_seq"); - self.store - .get_store() - .set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq) + let async_store = self.store.write().await; + let store = async_store.get_store(); + store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq) } pub async fn add_pending_tx(&self, tx_seq: u64) -> Result { - let store = self.store.get_store(); + let async_store = self.store.write().await; + let store = async_store.get_store(); // already in ready queue if self.ready_txs.has(store, tx_seq)? { @@ -87,7 +78,8 @@ impl SyncStore { } pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result { - let store = self.store.get_store(); + let async_store = self.store.write().await; + let store = async_store.get_store(); let mut tx = ConfigTx::default(); @@ -105,7 +97,8 @@ impl SyncStore { } pub async fn downgrade_tx_to_pending(&self, tx_seq: u64) -> Result { - let store = self.store.get_store(); + let async_store = self.store.write().await; + let store = async_store.get_store(); let mut tx = ConfigTx::default(); @@ -123,7 +116,8 @@ impl SyncStore { } pub async fn random_tx(&self) -> Result> { - let store = self.store.get_store(); + let async_store = self.store.read().await; + let store = async_store.get_store(); // try to find a tx in ready queue with high priority if let Some(val) = self.ready_txs.random(store)? { @@ -135,7 +129,8 @@ impl SyncStore { } pub async fn remove_tx(&self, tx_seq: u64) -> Result { - let store = self.store.get_store(); + let async_store = self.store.write().await; + let store = async_store.get_store(); // removed in ready queue if self.ready_txs.remove(store, None, tx_seq)? { diff --git a/node/sync/src/controllers/mod.rs b/node/sync/src/controllers/mod.rs index f99b65f..4e816f3 100644 --- a/node/sync/src/controllers/mod.rs +++ b/node/sync/src/controllers/mod.rs @@ -1,6 +1,9 @@ mod peers; mod serial; +use std::collections::HashMap; + +use peers::PeerState; use serde::{Deserialize, Serialize}; pub use serial::{FailureReason, SerialSyncController, SyncState, MAX_CHUNKS_TO_REQUEST}; @@ -42,7 +45,7 @@ impl FileSyncGoal { #[serde(rename_all = "camelCase")] pub struct FileSyncInfo { pub elapsed_secs: u64, - pub peers: usize, + pub peers: HashMap, pub goal: FileSyncGoal, pub next_chunks: u64, pub state: String, diff --git a/node/sync/src/controllers/peers.rs b/node/sync/src/controllers/peers.rs index efc6442..cc166a9 100644 --- a/node/sync/src/controllers/peers.rs +++ b/node/sync/src/controllers/peers.rs @@ -1,20 +1,23 @@ use file_location_cache::FileLocationCache; use network::{Multiaddr, PeerAction, PeerId}; use rand::seq::IteratorRandom; +use serde::{Deserialize, Serialize}; use shared_types::TxID; use std::cmp::Ordering; use std::collections::{BTreeSet, HashMap}; +use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec; use storage::config::ShardConfig; use crate::context::SyncNetworkContext; +use crate::InstantWrapper; const PEER_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const PEER_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(5); -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum PeerState { Found, Connecting, @@ -34,13 +37,13 @@ struct PeerInfo { pub shard_config: ShardConfig, /// Timestamp of the last state change. - pub since: Instant, + pub since: InstantWrapper, } impl PeerInfo { fn update_state(&mut self, new_state: PeerState) { self.state = new_state; - self.since = Instant::now(); + self.since = Instant::now().into(); } } @@ -64,6 +67,17 @@ impl SyncPeers { } } + pub fn states(&self) -> HashMap { + let mut states: HashMap = HashMap::new(); + + for info in self.peers.values() { + let num = states.get(&info.state).map_or(0, |x| *x); + states.insert(info.state, num + 1); + } + + states + } + pub fn add_new_peer_with_config( &mut self, peer_id: PeerId, @@ -82,7 +96,7 @@ impl SyncPeers { addr, state: PeerState::Found, shard_config, - since: Instant::now(), + since: Instant::now().into(), }, ); @@ -364,7 +378,7 @@ mod tests { sync_peers.add_new_peer(peer_id_connecting, addr.clone()); sync_peers.update_state_force(&peer_id_connecting, PeerState::Connecting); sync_peers.peers.get_mut(&peer_id_connecting).unwrap().since = - Instant::now() - PEER_CONNECT_TIMEOUT; + (Instant::now() - PEER_CONNECT_TIMEOUT).into(); let peer_id_disconnecting = identity::Keypair::generate_ed25519().public().to_peer_id(); sync_peers.add_new_peer(peer_id_disconnecting, addr.clone()); @@ -373,7 +387,7 @@ mod tests { .peers .get_mut(&peer_id_disconnecting) .unwrap() - .since = Instant::now() - PEER_DISCONNECT_TIMEOUT; + .since = (Instant::now() - PEER_DISCONNECT_TIMEOUT).into(); let peer_id_disconnected = identity::Keypair::generate_ed25519().public().to_peer_id(); sync_peers.add_new_peer(peer_id_disconnected, addr); diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index c13ec75..2ec58f5 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -1,6 +1,7 @@ use crate::context::SyncNetworkContext; use crate::controllers::peers::{PeerState, SyncPeers}; use crate::controllers::{FileSyncGoal, FileSyncInfo}; +use crate::InstantWrapper; use file_location_cache::FileLocationCache; use libp2p::swarm::DialError; use network::types::FindChunks; @@ -8,6 +9,7 @@ use network::{ multiaddr::Protocol, rpc::GetChunksRequest, types::FindFile, Multiaddr, NetworkMessage, PeerAction, PeerId, PubsubMessage, SyncId as RequestId, }; +use rand::Rng; use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE}; use std::{ sync::Arc, @@ -33,22 +35,24 @@ pub enum FailureReason { pub enum SyncState { Idle, FindingPeers { - origin: Instant, - since: Instant, + origin: InstantWrapper, + since: InstantWrapper, }, FoundPeers, - ConnectingPeers, + ConnectingPeers { + since: InstantWrapper, + }, AwaitingOutgoingConnection { - since: Instant, + since: InstantWrapper, }, AwaitingDownload { - since: Instant, + since: InstantWrapper, }, Downloading { peer_id: PeerId, from_chunk: u64, to_chunk: u64, - since: Instant, + since: InstantWrapper, }, Completed, Failed { @@ -65,7 +69,7 @@ pub struct SerialSyncController { tx_start_chunk_in_flow: u64, - since: Instant, + since: InstantWrapper, /// File sync goal. goal: FileSyncGoal, @@ -105,7 +109,7 @@ impl SerialSyncController { tx_seq: tx_id.seq, tx_id, tx_start_chunk_in_flow, - since: Instant::now(), + since: Instant::now().into(), goal, next_chunk: goal.index_start, failures: 0, @@ -120,7 +124,7 @@ impl SerialSyncController { pub fn get_sync_info(&self) -> FileSyncInfo { FileSyncInfo { elapsed_secs: self.since.elapsed().as_secs(), - peers: self.peers.count(&[PeerState::Connected]), + peers: self.peers.states(), goal: self.goal, next_chunks: self.next_chunk, state: format!("{:?}", self.state), @@ -144,10 +148,7 @@ impl SerialSyncController { self.next_chunk = start; } else if self.goal.is_all_chunks() { // retry the failed file sync at break point - debug!( - "Continue to sync failed file, tx_seq = {}, next_chunk = {}", - self.tx_seq, self.next_chunk - ); + debug!(%self.tx_seq, %self.next_chunk, "Continue to sync failed file"); } else { // Ignore the failed chunks sync, and change to file sync. self.goal = FileSyncGoal::new_file(self.goal.num_chunks); @@ -171,7 +172,7 @@ impl SerialSyncController { self.state = SyncState::FindingPeers { origin: self.since, - since: Instant::now(), + since: Instant::now().into(), }; } @@ -229,13 +230,15 @@ impl SerialSyncController { }; // connect to peer - info!(%peer_id, %address, "Attempting to connect to peer"); + info!(%self.tx_seq, %peer_id, %address, "Attempting to connect to peer"); self.ctx.send(NetworkMessage::DialPeer { address, peer_id }); self.peers .update_state(&peer_id, PeerState::Found, PeerState::Connecting); } - self.state = SyncState::ConnectingPeers; + self.state = SyncState::ConnectingPeers { + since: Instant::now().into(), + }; } fn try_request_next(&mut self) { @@ -268,7 +271,7 @@ impl SerialSyncController { peer_id, from_chunk, to_chunk, - since: Instant::now(), + since: Instant::now().into(), }; } @@ -307,7 +310,7 @@ impl SerialSyncController { { info!(%self.tx_seq, %peer_id, "Failed to dail peer due to outgoing connection limitation"); self.state = SyncState::AwaitingOutgoingConnection { - since: Instant::now(), + since: Instant::now().into(), }; } } @@ -425,14 +428,14 @@ impl SerialSyncController { match validation_result { Ok(true) => {} Ok(false) => { - info!("Failed to validate chunks response due to no root found"); + info!(%self.tx_seq, "Failed to validate chunks response due to no root found"); self.state = SyncState::AwaitingDownload { - since: Instant::now() + NEXT_REQUEST_WAIT_TIME, + since: (Instant::now() + NEXT_REQUEST_WAIT_TIME).into(), }; return; } Err(err) => { - warn!(%err, "Failed to validate chunks response"); + warn!(%err, %self.tx_seq, "Failed to validate chunks response"); self.ban_peer(from_peer_id, "Chunk array validation failed"); self.state = SyncState::Idle; return; @@ -454,14 +457,14 @@ impl SerialSyncController { { Ok(true) => self.next_chunk = next_chunk as u64, Ok(false) => { - warn!(?self.tx_id, "Transaction reverted while storing chunks"); + warn!(%self.tx_seq, ?self.tx_id, "Transaction reverted while storing chunks"); self.state = SyncState::Failed { reason: FailureReason::TxReverted(self.tx_id), }; return; } Err(err) => { - error!(%err, "Unexpected DB error while storing chunks"); + error!(%err, %self.tx_seq, "Unexpected DB error while storing chunks"); self.state = SyncState::Failed { reason: FailureReason::DBError(err.to_string()), }; @@ -489,13 +492,13 @@ impl SerialSyncController { { Ok(true) => self.state = SyncState::Completed, Ok(false) => { - warn!(?self.tx_id, "Transaction reverted during finalize_tx"); + warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx"); self.state = SyncState::Failed { reason: FailureReason::TxReverted(self.tx_id), }; } Err(err) => { - error!(%err, "Unexpected error during finalize_tx"); + error!(%err, %self.tx_seq, "Unexpected error during finalize_tx"); self.state = SyncState::Failed { reason: FailureReason::DBError(err.to_string()), }; @@ -524,7 +527,7 @@ impl SerialSyncController { if self.failures <= MAX_REQUEST_FAILURES { // try again self.state = SyncState::AwaitingDownload { - since: Instant::now() + NEXT_REQUEST_WAIT_TIME, + since: (Instant::now() + NEXT_REQUEST_WAIT_TIME).into(), }; } else { // ban and find new peer to download @@ -536,16 +539,20 @@ impl SerialSyncController { fn select_peer_for_request(&self, request: &GetChunksRequest) -> Option { let segment_index = (request.index_start + self.tx_start_chunk_in_flow) / PORA_CHUNK_SIZE as u64; - let peers = self.peers.filter_peers(vec![PeerState::Connected]); - // TODO: Add randomness - for peer in peers { - if let Some(shard_config) = self.peers.shard_config(&peer) { - if shard_config.in_range(segment_index) { - return Some(peer); - } - } + let mut peers = self.peers.filter_peers(vec![PeerState::Connected]); + + peers.retain(|peer_id| match self.peers.shard_config(peer_id) { + Some(v) => v.in_range(segment_index), + None => false, + }); + + let len = peers.len(); + if len == 0 { + return None; } - None + + let index = rand::thread_rng().gen_range(0..len); + Some(peers[index]) } pub fn transition(&mut self) { @@ -592,16 +599,18 @@ impl SerialSyncController { SyncState::FoundPeers => { if self.peers.all_shards_available(vec![Connecting, Connected]) { - self.state = SyncState::ConnectingPeers; + self.state = SyncState::ConnectingPeers { + since: Instant::now().into(), + }; } else { self.try_connect(); } } - SyncState::ConnectingPeers => { + SyncState::ConnectingPeers { .. } => { if self.peers.all_shards_available(vec![Connected]) { self.state = SyncState::AwaitingDownload { - since: Instant::now(), + since: Instant::now().into(), }; } else if self.peers.count(&[Connecting]) == 0 { debug!(%self.tx_seq, "Connecting to peers timeout and try to find other peers to dial"); @@ -622,7 +631,7 @@ impl SerialSyncController { } SyncState::AwaitingDownload { since } => { - if Instant::now() < since { + if Instant::now() < since.0 { completed = true; } else { self.try_request_next(); @@ -787,7 +796,10 @@ mod tests { } } - assert_eq!(controller.state, SyncState::ConnectingPeers); + assert!(matches!( + controller.state, + SyncState::ConnectingPeers { .. } + )); } #[tokio::test] @@ -797,7 +809,7 @@ mod tests { let (mut controller, mut network_recv) = create_default_controller(task_executor, None); controller.state = SyncState::AwaitingDownload { - since: Instant::now(), + since: Instant::now().into(), }; controller.try_request_next(); assert_eq!(controller.state, SyncState::Idle); @@ -1053,7 +1065,7 @@ mod tests { peer_id, from_chunk: 0, to_chunk: 1, - since: Instant::now(), + since: Instant::now().into(), }; assert!(controller.handle_on_response_mismatch(peer_id_1)); if let Some(msg) = network_recv.recv().await { @@ -1117,7 +1129,7 @@ mod tests { peer_id, from_chunk: 0, to_chunk: 0, - since: Instant::now(), + since: Instant::now().into(), }; controller.on_response(peer_id, chunks).await; } @@ -1149,7 +1161,7 @@ mod tests { peer_id, from_chunk: 0, to_chunk: chunk_count as u64, - since: Instant::now(), + since: Instant::now().into(), }; chunks.chunks.data = Vec::new(); @@ -1216,7 +1228,7 @@ mod tests { peer_id, from_chunk: 1, to_chunk: chunk_count as u64, - since: Instant::now(), + since: Instant::now().into(), }; controller.on_response(peer_id, chunks).await; @@ -1279,7 +1291,7 @@ mod tests { peer_id, from_chunk: 0, to_chunk: chunk_count as u64, - since: Instant::now(), + since: Instant::now().into(), }; controller.tx_seq = 1; @@ -1351,7 +1363,7 @@ mod tests { peer_id, from_chunk: 0, to_chunk: chunk_count as u64, - since: Instant::now(), + since: Instant::now().into(), }; controller.on_response(peer_id, chunks).await; @@ -1394,7 +1406,7 @@ mod tests { peer_id, from_chunk: 0, to_chunk: 1024, - since: Instant::now(), + since: Instant::now().into(), }; controller.goal.num_chunks = 1024; @@ -1440,7 +1452,7 @@ mod tests { peer_id, from_chunk: 0, to_chunk: chunk_count as u64, - since: Instant::now(), + since: Instant::now().into(), }; controller.on_response(peer_id, chunks).await; diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index a6154df..f42b0f6 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -11,7 +11,10 @@ pub use controllers::FileSyncInfo; use duration_str::deserialize_duration; use serde::Deserialize; pub use service::{SyncMessage, SyncReceiver, SyncRequest, SyncResponse, SyncSender, SyncService}; -use std::time::Duration; +use std::{ + fmt::Debug, + time::{Duration, Instant}, +}; #[derive(Clone, Copy, Debug, Deserialize)] #[serde(default)] @@ -31,7 +34,7 @@ impl Default for Config { fn default() -> Self { Self { auto_sync_enabled: false, - max_sync_files: 8, + max_sync_files: 16, sync_file_by_rpc_enabled: true, sync_file_on_announcement_enabled: false, @@ -40,3 +43,24 @@ impl Default for Config { } } } + +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct InstantWrapper(Instant); + +impl Debug for InstantWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "\"{} seconds ago\"", self.0.elapsed().as_secs()) + } +} + +impl From for InstantWrapper { + fn from(value: Instant) -> Self { + Self(value) + } +} + +impl InstantWrapper { + pub fn elapsed(&self) -> Duration { + self.0.elapsed() + } +} diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index cb77f63..996cb0a 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1,5 +1,6 @@ use crate::auto_sync::batcher_random::RandomBatcher; use crate::auto_sync::batcher_serial::SerialBatcher; +use crate::auto_sync::sync_store::SyncStore; use crate::context::SyncNetworkContext; use crate::controllers::{ FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState, @@ -164,14 +165,17 @@ impl SyncService { // init auto sync let file_announcement_send = if config.auto_sync_enabled { let (send, recv) = unbounded_channel(); + let sync_store = Arc::new(SyncStore::new(store.clone())); // sync in sequence let serial_batcher = - SerialBatcher::new(config, store.clone(), sync_send.clone()).await?; + SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone()) + .await?; executor.spawn(serial_batcher.start(recv, event_recv), "auto_sync_serial"); // sync randomly - let random_batcher = RandomBatcher::new(config, store.clone(), sync_send.clone()); + let random_batcher = + RandomBatcher::new(config, store.clone(), sync_send.clone(), sync_store); executor.spawn(random_batcher.start(), "auto_sync_random"); Some(send) diff --git a/run/config.toml b/run/config.toml index a799577..71ea6d4 100644 --- a/run/config.toml +++ b/run/config.toml @@ -226,7 +226,7 @@ miner_key = "" # auto_sync_enabled = false # Maximum number of files in sync from other peers simultaneously. -# max_sync_files = 8 +# max_sync_files = 16 # Timeout to terminate a file sync when automatically sync from other peers. # If timeout, terminated file sync will be triggered later.