mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-25 07:45:17 +00:00
Compare commits
2 Commits
4b48d25fb4
...
e912522386
Author | SHA1 | Date | |
---|---|---|---|
|
e912522386 | ||
|
4566eadb3e |
@ -128,6 +128,12 @@ impl DataRange for Subtree {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EntryBatchData {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl EntryBatchData {
|
||||
pub fn new() -> Self {
|
||||
EntryBatchData::Incomplete(IncompleteData {
|
||||
|
@ -20,7 +20,7 @@ use zgs_spec::{
|
||||
};
|
||||
|
||||
use super::SealAnswer;
|
||||
use chunk_data::EntryBatchData;
|
||||
pub use chunk_data::EntryBatchData;
|
||||
use seal::SealInfo;
|
||||
|
||||
#[derive(Debug, Encode, Decode, Deserialize, Serialize)]
|
||||
|
@ -14,7 +14,7 @@ use self::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
|
||||
|
||||
pub mod config;
|
||||
mod flow_store;
|
||||
mod load_chunk;
|
||||
pub mod load_chunk;
|
||||
pub mod log_manager;
|
||||
mod metrics;
|
||||
mod seal_task_manager;
|
||||
|
@ -23,6 +23,7 @@ const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
||||
const NEXT_TX_KEY: &str = "next_tx_seq";
|
||||
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TxStatus {
|
||||
Finalized,
|
||||
Pruned,
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
|
||||
use anyhow::{bail, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use shared_types::TxSeqOrRoot;
|
||||
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
|
||||
use storage_async::Store;
|
||||
use tokio::sync::RwLock;
|
||||
@ -84,14 +85,16 @@ impl Batcher {
|
||||
}
|
||||
|
||||
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
||||
// file already exists
|
||||
if self.store.check_tx_completed(tx_seq).await?
|
||||
|| self.store.check_tx_pruned(tx_seq).await?
|
||||
// file already finalized or even pruned
|
||||
if let Some(tx_status) = self
|
||||
.store
|
||||
.get_store()
|
||||
.get_tx_status(TxSeqOrRoot::TxSeq(tx_seq))?
|
||||
{
|
||||
// File may be finalized during file sync, e.g. user uploaded file via RPC.
|
||||
// In this case, just terminate the file sync.
|
||||
let num_terminated = self.terminate_file_sync(tx_seq, false).await;
|
||||
info!(%tx_seq, %num_terminated, "Terminate file sync due to file already finalized in db");
|
||||
let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await;
|
||||
if num_terminated > 0 {
|
||||
info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db");
|
||||
}
|
||||
return Ok(Some(SyncResult::Completed));
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ use tokio::time::sleep;
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct RandomBatcherState {
|
||||
pub name: String,
|
||||
pub tasks: Vec<u64>,
|
||||
pub pending_txs: usize,
|
||||
pub ready_txs: usize,
|
||||
@ -22,6 +23,7 @@ pub struct RandomBatcherState {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RandomBatcher {
|
||||
name: String,
|
||||
config: Config,
|
||||
batcher: Batcher,
|
||||
sync_store: Arc<SyncStore>,
|
||||
@ -29,12 +31,14 @@ pub struct RandomBatcher {
|
||||
|
||||
impl RandomBatcher {
|
||||
pub fn new(
|
||||
name: String,
|
||||
config: Config,
|
||||
store: Store,
|
||||
sync_send: SyncSender,
|
||||
sync_store: Arc<SyncStore>,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
config,
|
||||
batcher: Batcher::new(
|
||||
config.max_random_workers,
|
||||
@ -50,6 +54,7 @@ impl RandomBatcher {
|
||||
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
|
||||
|
||||
Ok(RandomBatcherState {
|
||||
name: self.name.clone(),
|
||||
tasks: self.batcher.tasks().await,
|
||||
pending_txs,
|
||||
ready_txs,
|
||||
@ -57,7 +62,7 @@ impl RandomBatcher {
|
||||
}
|
||||
|
||||
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
|
||||
info!("Start to sync files");
|
||||
info!("Start to sync files, state = {:?}", self.get_state().await);
|
||||
|
||||
// wait for log entry sync catched up
|
||||
while !catched_up.load(Ordering::Relaxed) {
|
||||
@ -66,11 +71,11 @@ impl RandomBatcher {
|
||||
}
|
||||
|
||||
loop {
|
||||
if let Ok(state) = self.get_state().await {
|
||||
metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
|
||||
metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
|
||||
metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64);
|
||||
}
|
||||
// if let Ok(state) = self.get_state().await {
|
||||
// metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
|
||||
// metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
|
||||
// metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64);
|
||||
// }
|
||||
|
||||
match self.sync_once().await {
|
||||
Ok(true) => {}
|
||||
|
108
node/sync/src/auto_sync/historical_tx_writer.rs
Normal file
108
node/sync/src/auto_sync/historical_tx_writer.rs
Normal file
@ -0,0 +1,108 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use storage::log_store::log_manager::DATA_DB_KEY;
|
||||
use storage_async::Store;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::Config;
|
||||
|
||||
use super::sync_store::{Queue, SyncStore};
|
||||
|
||||
const KEY_NEXT_TX_SEQ: &str = "sync.manager.historical.next_tx_seq";
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct HistoricalTxWriterState {
|
||||
pub next_tx_seq: u64,
|
||||
pub pending_txs: usize,
|
||||
pub ready_txs: usize,
|
||||
}
|
||||
|
||||
pub struct HistoricalTxWriter {
|
||||
config: Config,
|
||||
store: Store,
|
||||
sync_store: Arc<SyncStore>,
|
||||
next_tx_seq: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl HistoricalTxWriter {
|
||||
pub async fn new(config: Config, store: Store, sync_store: Arc<SyncStore>) -> Result<Self> {
|
||||
let next_tx_seq = store
|
||||
.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
store,
|
||||
sync_store,
|
||||
next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_state(&self) -> Result<HistoricalTxWriterState> {
|
||||
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
|
||||
|
||||
Ok(HistoricalTxWriterState {
|
||||
next_tx_seq: self.next_tx_seq.load(Ordering::Relaxed),
|
||||
pending_txs,
|
||||
ready_txs,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(mut self) {
|
||||
info!(
|
||||
"Start to write historical files into sync store, state = {:?}",
|
||||
self.get_state().await
|
||||
);
|
||||
|
||||
loop {
|
||||
match self.write_once().await {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
trace!(
|
||||
"There is no tx to write in sync store, state = {:?}",
|
||||
self.get_state().await
|
||||
);
|
||||
sleep(self.config.auto_sync_idle_interval).await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%err, "Failed to write tx once, state = {:?}", self.get_state().await);
|
||||
sleep(self.config.auto_sync_error_interval).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_once(&mut self) -> Result<bool> {
|
||||
let mut next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
|
||||
// no tx to write in sync store
|
||||
if next_tx_seq >= self.store.get_store().next_tx_seq() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// write tx in sync store if not finalized or pruned
|
||||
if self
|
||||
.store
|
||||
.get_store()
|
||||
.get_tx_status(shared_types::TxSeqOrRoot::TxSeq(next_tx_seq))?
|
||||
.is_none()
|
||||
{
|
||||
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
|
||||
}
|
||||
|
||||
// move forward
|
||||
next_tx_seq += 1;
|
||||
self.store
|
||||
.set_config_encoded(&KEY_NEXT_TX_SEQ, &next_tx_seq, DATA_DB_KEY)
|
||||
.await?;
|
||||
self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed);
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
@ -18,6 +18,7 @@ use crate::{Config, SyncSender};
|
||||
use super::{
|
||||
batcher_random::RandomBatcher,
|
||||
batcher_serial::SerialBatcher,
|
||||
historical_tx_writer::HistoricalTxWriter,
|
||||
sync_store::{Queue, SyncStore},
|
||||
};
|
||||
|
||||
@ -76,7 +77,13 @@ impl AutoSyncManager {
|
||||
};
|
||||
|
||||
// sync randomly
|
||||
let random = RandomBatcher::new(config, store, sync_send, sync_store);
|
||||
let random = RandomBatcher::new(
|
||||
"random".into(),
|
||||
config,
|
||||
store.clone(),
|
||||
sync_send.clone(),
|
||||
sync_store,
|
||||
);
|
||||
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
|
||||
|
||||
// handle on catched up notification
|
||||
@ -85,6 +92,32 @@ impl AutoSyncManager {
|
||||
"auto_sync_wait_for_catchup",
|
||||
);
|
||||
|
||||
// sync randomly for files without NewFile announcement
|
||||
if config.neighbors_only {
|
||||
let historical_sync_store = Arc::new(SyncStore::new_with_name(
|
||||
store.clone(),
|
||||
"pendingv2_historical",
|
||||
"readyv2_historical",
|
||||
));
|
||||
|
||||
let writer =
|
||||
HistoricalTxWriter::new(config, store.clone(), historical_sync_store.clone())
|
||||
.await?;
|
||||
executor.spawn(writer.start(), "auto_sync_historical_writer");
|
||||
|
||||
let random_historical = RandomBatcher::new(
|
||||
"random_historical".into(),
|
||||
config,
|
||||
store,
|
||||
sync_send,
|
||||
historical_sync_store,
|
||||
);
|
||||
executor.spawn(
|
||||
random_historical.start(catched_up.clone()),
|
||||
"auto_sync_random_historical",
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
serial,
|
||||
random,
|
||||
|
@ -14,9 +14,9 @@ lazy_static::lazy_static! {
|
||||
pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout");
|
||||
|
||||
// random auto sync
|
||||
pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024);
|
||||
pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
|
||||
pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);
|
||||
// pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024);
|
||||
// pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
|
||||
// pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);
|
||||
|
||||
pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed");
|
||||
pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed");
|
||||
|
@ -1,6 +1,7 @@
|
||||
mod batcher;
|
||||
pub mod batcher_random;
|
||||
pub mod batcher_serial;
|
||||
mod historical_tx_writer;
|
||||
pub mod manager;
|
||||
mod metrics;
|
||||
pub mod sync_store;
|
||||
|
@ -19,16 +19,27 @@ class AutoRandomSyncV2Test(TestFramework):
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
# Stop the last node to verify historical file sync
|
||||
self.stop_storage_node(self.num_nodes - 1)
|
||||
|
||||
# Submit and upload files on node 0
|
||||
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
||||
|
||||
# Files should be available on other nodes via auto sync
|
||||
for i in range(1, self.num_nodes):
|
||||
for i in range(1, self.num_nodes - 1):
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
|
||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
|
||||
|
||||
# Start the last node to verify historical file sync
|
||||
self.start_storage_node(self.num_nodes - 1)
|
||||
self.nodes[self.num_nodes - 1].wait_for_rpc_connection()
|
||||
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1) is not None)
|
||||
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)["finalized"])
|
||||
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2) is not None)
|
||||
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)["finalized"])
|
||||
|
||||
if __name__ == "__main__":
|
||||
AutoRandomSyncV2Test().main()
|
||||
|
Loading…
Reference in New Issue
Block a user