Compare commits

..

2 Commits

Author SHA1 Message Date
Bo QIU
e912522386
Supports to sync historical files without NewFile gossip message (#269)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Supports to randomly sync historical files

* Add name for random file sync batcher

* Remove sync store metrics since multiple random batcher created

* opt log

* ignore pruned or finalized historical file

* Add python tests for historical file sync
2024-11-15 10:00:58 +08:00
MiniFrenchBread
4566eadb3e
chore: expose chunk data (#270)
* chore: expose chunk data
2024-11-15 08:26:34 +08:00
11 changed files with 188 additions and 20 deletions

View File

@ -128,6 +128,12 @@ impl DataRange for Subtree {
} }
} }
impl Default for EntryBatchData {
fn default() -> Self {
Self::new()
}
}
impl EntryBatchData { impl EntryBatchData {
pub fn new() -> Self { pub fn new() -> Self {
EntryBatchData::Incomplete(IncompleteData { EntryBatchData::Incomplete(IncompleteData {

View File

@ -20,7 +20,7 @@ use zgs_spec::{
}; };
use super::SealAnswer; use super::SealAnswer;
use chunk_data::EntryBatchData; pub use chunk_data::EntryBatchData;
use seal::SealInfo; use seal::SealInfo;
#[derive(Debug, Encode, Decode, Deserialize, Serialize)] #[derive(Debug, Encode, Decode, Deserialize, Serialize)]

View File

@ -14,7 +14,7 @@ use self::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
pub mod config; pub mod config;
mod flow_store; mod flow_store;
mod load_chunk; pub mod load_chunk;
pub mod log_manager; pub mod log_manager;
mod metrics; mod metrics;
mod seal_task_manager; mod seal_task_manager;

View File

@ -23,6 +23,7 @@ const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq"; const NEXT_TX_KEY: &str = "next_tx_seq";
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key"; const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
#[derive(Debug)]
pub enum TxStatus { pub enum TxStatus {
Finalized, Finalized,
Pruned, Pruned,

View File

@ -1,6 +1,7 @@
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender}; use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use shared_types::TxSeqOrRoot;
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration}; use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
use storage_async::Store; use storage_async::Store;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -84,14 +85,16 @@ impl Batcher {
} }
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> { async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
// file already exists // file already finalized or even pruned
if self.store.check_tx_completed(tx_seq).await? if let Some(tx_status) = self
|| self.store.check_tx_pruned(tx_seq).await? .store
.get_store()
.get_tx_status(TxSeqOrRoot::TxSeq(tx_seq))?
{ {
// File may be finalized during file sync, e.g. user uploaded file via RPC. let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await;
// In this case, just terminate the file sync. if num_terminated > 0 {
let num_terminated = self.terminate_file_sync(tx_seq, false).await; info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db");
info!(%tx_seq, %num_terminated, "Terminate file sync due to file already finalized in db"); }
return Ok(Some(SyncResult::Completed)); return Ok(Some(SyncResult::Completed));
} }

View File

@ -15,6 +15,7 @@ use tokio::time::sleep;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct RandomBatcherState { pub struct RandomBatcherState {
pub name: String,
pub tasks: Vec<u64>, pub tasks: Vec<u64>,
pub pending_txs: usize, pub pending_txs: usize,
pub ready_txs: usize, pub ready_txs: usize,
@ -22,6 +23,7 @@ pub struct RandomBatcherState {
#[derive(Clone)] #[derive(Clone)]
pub struct RandomBatcher { pub struct RandomBatcher {
name: String,
config: Config, config: Config,
batcher: Batcher, batcher: Batcher,
sync_store: Arc<SyncStore>, sync_store: Arc<SyncStore>,
@ -29,12 +31,14 @@ pub struct RandomBatcher {
impl RandomBatcher { impl RandomBatcher {
pub fn new( pub fn new(
name: String,
config: Config, config: Config,
store: Store, store: Store,
sync_send: SyncSender, sync_send: SyncSender,
sync_store: Arc<SyncStore>, sync_store: Arc<SyncStore>,
) -> Self { ) -> Self {
Self { Self {
name,
config, config,
batcher: Batcher::new( batcher: Batcher::new(
config.max_random_workers, config.max_random_workers,
@ -50,6 +54,7 @@ impl RandomBatcher {
let (pending_txs, ready_txs) = self.sync_store.stat().await?; let (pending_txs, ready_txs) = self.sync_store.stat().await?;
Ok(RandomBatcherState { Ok(RandomBatcherState {
name: self.name.clone(),
tasks: self.batcher.tasks().await, tasks: self.batcher.tasks().await,
pending_txs, pending_txs,
ready_txs, ready_txs,
@ -57,7 +62,7 @@ impl RandomBatcher {
} }
pub async fn start(mut self, catched_up: Arc<AtomicBool>) { pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
info!("Start to sync files"); info!("Start to sync files, state = {:?}", self.get_state().await);
// wait for log entry sync catched up // wait for log entry sync catched up
while !catched_up.load(Ordering::Relaxed) { while !catched_up.load(Ordering::Relaxed) {
@ -66,11 +71,11 @@ impl RandomBatcher {
} }
loop { loop {
if let Ok(state) = self.get_state().await { // if let Ok(state) = self.get_state().await {
metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64); // metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64); // metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64); // metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64);
} // }
match self.sync_once().await { match self.sync_once().await {
Ok(true) => {} Ok(true) => {}

View File

@ -0,0 +1,108 @@
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use storage::log_store::log_manager::DATA_DB_KEY;
use storage_async::Store;
use tokio::time::sleep;
use crate::Config;
use super::sync_store::{Queue, SyncStore};
const KEY_NEXT_TX_SEQ: &str = "sync.manager.historical.next_tx_seq";
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HistoricalTxWriterState {
pub next_tx_seq: u64,
pub pending_txs: usize,
pub ready_txs: usize,
}
pub struct HistoricalTxWriter {
config: Config,
store: Store,
sync_store: Arc<SyncStore>,
next_tx_seq: Arc<AtomicU64>,
}
impl HistoricalTxWriter {
pub async fn new(config: Config, store: Store, sync_store: Arc<SyncStore>) -> Result<Self> {
let next_tx_seq = store
.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)
.await?;
Ok(Self {
config,
store,
sync_store,
next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
})
}
pub async fn get_state(&self) -> Result<HistoricalTxWriterState> {
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
Ok(HistoricalTxWriterState {
next_tx_seq: self.next_tx_seq.load(Ordering::Relaxed),
pending_txs,
ready_txs,
})
}
pub async fn start(mut self) {
info!(
"Start to write historical files into sync store, state = {:?}",
self.get_state().await
);
loop {
match self.write_once().await {
Ok(true) => {}
Ok(false) => {
trace!(
"There is no tx to write in sync store, state = {:?}",
self.get_state().await
);
sleep(self.config.auto_sync_idle_interval).await;
}
Err(err) => {
warn!(%err, "Failed to write tx once, state = {:?}", self.get_state().await);
sleep(self.config.auto_sync_error_interval).await;
}
}
}
}
async fn write_once(&mut self) -> Result<bool> {
let mut next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
// no tx to write in sync store
if next_tx_seq >= self.store.get_store().next_tx_seq() {
return Ok(false);
}
// write tx in sync store if not finalized or pruned
if self
.store
.get_store()
.get_tx_status(shared_types::TxSeqOrRoot::TxSeq(next_tx_seq))?
.is_none()
{
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
}
// move forward
next_tx_seq += 1;
self.store
.set_config_encoded(&KEY_NEXT_TX_SEQ, &next_tx_seq, DATA_DB_KEY)
.await?;
self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed);
Ok(true)
}
}

View File

@ -18,6 +18,7 @@ use crate::{Config, SyncSender};
use super::{ use super::{
batcher_random::RandomBatcher, batcher_random::RandomBatcher,
batcher_serial::SerialBatcher, batcher_serial::SerialBatcher,
historical_tx_writer::HistoricalTxWriter,
sync_store::{Queue, SyncStore}, sync_store::{Queue, SyncStore},
}; };
@ -76,7 +77,13 @@ impl AutoSyncManager {
}; };
// sync randomly // sync randomly
let random = RandomBatcher::new(config, store, sync_send, sync_store); let random = RandomBatcher::new(
"random".into(),
config,
store.clone(),
sync_send.clone(),
sync_store,
);
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random"); executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
// handle on catched up notification // handle on catched up notification
@ -85,6 +92,32 @@ impl AutoSyncManager {
"auto_sync_wait_for_catchup", "auto_sync_wait_for_catchup",
); );
// sync randomly for files without NewFile announcement
if config.neighbors_only {
let historical_sync_store = Arc::new(SyncStore::new_with_name(
store.clone(),
"pendingv2_historical",
"readyv2_historical",
));
let writer =
HistoricalTxWriter::new(config, store.clone(), historical_sync_store.clone())
.await?;
executor.spawn(writer.start(), "auto_sync_historical_writer");
let random_historical = RandomBatcher::new(
"random_historical".into(),
config,
store,
sync_send,
historical_sync_store,
);
executor.spawn(
random_historical.start(catched_up.clone()),
"auto_sync_random_historical",
);
}
Ok(Self { Ok(Self {
serial, serial,
random, random,

View File

@ -14,9 +14,9 @@ lazy_static::lazy_static! {
pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout"); pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout");
// random auto sync // random auto sync
pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024); // pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024);
pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024); // pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024); // pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);
pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed"); pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed");
pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed"); pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed");

View File

@ -1,6 +1,7 @@
mod batcher; mod batcher;
pub mod batcher_random; pub mod batcher_random;
pub mod batcher_serial; pub mod batcher_serial;
mod historical_tx_writer;
pub mod manager; pub mod manager;
mod metrics; mod metrics;
pub mod sync_store; pub mod sync_store;

View File

@ -19,16 +19,27 @@ class AutoRandomSyncV2Test(TestFramework):
} }
def run_test(self): def run_test(self):
# Stop the last node to verify historical file sync
self.stop_storage_node(self.num_nodes - 1)
# Submit and upload files on node 0 # Submit and upload files on node 0
data_root_1 = self.__upload_file__(0, 256 * 1024) data_root_1 = self.__upload_file__(0, 256 * 1024)
data_root_2 = self.__upload_file__(0, 256 * 1024) data_root_2 = self.__upload_file__(0, 256 * 1024)
# Files should be available on other nodes via auto sync # Files should be available on other nodes via auto sync
for i in range(1, self.num_nodes): for i in range(1, self.num_nodes - 1):
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"]) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
# Start the last node to verify historical file sync
self.start_storage_node(self.num_nodes - 1)
self.nodes[self.num_nodes - 1].wait_for_rpc_connection()
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__": if __name__ == "__main__":
AutoRandomSyncV2Test().main() AutoRandomSyncV2Test().main()