mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-25 07:45:17 +00:00
Compare commits
2 Commits
4b48d25fb4
...
e912522386
Author | SHA1 | Date | |
---|---|---|---|
|
e912522386 | ||
|
4566eadb3e |
@ -128,6 +128,12 @@ impl DataRange for Subtree {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for EntryBatchData {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl EntryBatchData {
|
impl EntryBatchData {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
EntryBatchData::Incomplete(IncompleteData {
|
EntryBatchData::Incomplete(IncompleteData {
|
||||||
|
@ -20,7 +20,7 @@ use zgs_spec::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use super::SealAnswer;
|
use super::SealAnswer;
|
||||||
use chunk_data::EntryBatchData;
|
pub use chunk_data::EntryBatchData;
|
||||||
use seal::SealInfo;
|
use seal::SealInfo;
|
||||||
|
|
||||||
#[derive(Debug, Encode, Decode, Deserialize, Serialize)]
|
#[derive(Debug, Encode, Decode, Deserialize, Serialize)]
|
||||||
|
@ -14,7 +14,7 @@ use self::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
|
|||||||
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
mod flow_store;
|
mod flow_store;
|
||||||
mod load_chunk;
|
pub mod load_chunk;
|
||||||
pub mod log_manager;
|
pub mod log_manager;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
mod seal_task_manager;
|
mod seal_task_manager;
|
||||||
|
@ -23,6 +23,7 @@ const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
|||||||
const NEXT_TX_KEY: &str = "next_tx_seq";
|
const NEXT_TX_KEY: &str = "next_tx_seq";
|
||||||
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
|
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum TxStatus {
|
pub enum TxStatus {
|
||||||
Finalized,
|
Finalized,
|
||||||
Pruned,
|
Pruned,
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
|
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use shared_types::TxSeqOrRoot;
|
||||||
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
|
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
@ -84,14 +85,16 @@ impl Batcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
||||||
// file already exists
|
// file already finalized or even pruned
|
||||||
if self.store.check_tx_completed(tx_seq).await?
|
if let Some(tx_status) = self
|
||||||
|| self.store.check_tx_pruned(tx_seq).await?
|
.store
|
||||||
|
.get_store()
|
||||||
|
.get_tx_status(TxSeqOrRoot::TxSeq(tx_seq))?
|
||||||
{
|
{
|
||||||
// File may be finalized during file sync, e.g. user uploaded file via RPC.
|
let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await;
|
||||||
// In this case, just terminate the file sync.
|
if num_terminated > 0 {
|
||||||
let num_terminated = self.terminate_file_sync(tx_seq, false).await;
|
info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db");
|
||||||
info!(%tx_seq, %num_terminated, "Terminate file sync due to file already finalized in db");
|
}
|
||||||
return Ok(Some(SyncResult::Completed));
|
return Ok(Some(SyncResult::Completed));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ use tokio::time::sleep;
|
|||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct RandomBatcherState {
|
pub struct RandomBatcherState {
|
||||||
|
pub name: String,
|
||||||
pub tasks: Vec<u64>,
|
pub tasks: Vec<u64>,
|
||||||
pub pending_txs: usize,
|
pub pending_txs: usize,
|
||||||
pub ready_txs: usize,
|
pub ready_txs: usize,
|
||||||
@ -22,6 +23,7 @@ pub struct RandomBatcherState {
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RandomBatcher {
|
pub struct RandomBatcher {
|
||||||
|
name: String,
|
||||||
config: Config,
|
config: Config,
|
||||||
batcher: Batcher,
|
batcher: Batcher,
|
||||||
sync_store: Arc<SyncStore>,
|
sync_store: Arc<SyncStore>,
|
||||||
@ -29,12 +31,14 @@ pub struct RandomBatcher {
|
|||||||
|
|
||||||
impl RandomBatcher {
|
impl RandomBatcher {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
name: String,
|
||||||
config: Config,
|
config: Config,
|
||||||
store: Store,
|
store: Store,
|
||||||
sync_send: SyncSender,
|
sync_send: SyncSender,
|
||||||
sync_store: Arc<SyncStore>,
|
sync_store: Arc<SyncStore>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
name,
|
||||||
config,
|
config,
|
||||||
batcher: Batcher::new(
|
batcher: Batcher::new(
|
||||||
config.max_random_workers,
|
config.max_random_workers,
|
||||||
@ -50,6 +54,7 @@ impl RandomBatcher {
|
|||||||
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
|
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
|
||||||
|
|
||||||
Ok(RandomBatcherState {
|
Ok(RandomBatcherState {
|
||||||
|
name: self.name.clone(),
|
||||||
tasks: self.batcher.tasks().await,
|
tasks: self.batcher.tasks().await,
|
||||||
pending_txs,
|
pending_txs,
|
||||||
ready_txs,
|
ready_txs,
|
||||||
@ -57,7 +62,7 @@ impl RandomBatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
|
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
|
||||||
info!("Start to sync files");
|
info!("Start to sync files, state = {:?}", self.get_state().await);
|
||||||
|
|
||||||
// wait for log entry sync catched up
|
// wait for log entry sync catched up
|
||||||
while !catched_up.load(Ordering::Relaxed) {
|
while !catched_up.load(Ordering::Relaxed) {
|
||||||
@ -66,11 +71,11 @@ impl RandomBatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok(state) = self.get_state().await {
|
// if let Ok(state) = self.get_state().await {
|
||||||
metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
|
// metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
|
||||||
metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
|
// metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
|
||||||
metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64);
|
// metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64);
|
||||||
}
|
// }
|
||||||
|
|
||||||
match self.sync_once().await {
|
match self.sync_once().await {
|
||||||
Ok(true) => {}
|
Ok(true) => {}
|
||||||
|
108
node/sync/src/auto_sync/historical_tx_writer.rs
Normal file
108
node/sync/src/auto_sync/historical_tx_writer.rs
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
use std::sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use storage::log_store::log_manager::DATA_DB_KEY;
|
||||||
|
use storage_async::Store;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
use crate::Config;
|
||||||
|
|
||||||
|
use super::sync_store::{Queue, SyncStore};
|
||||||
|
|
||||||
|
const KEY_NEXT_TX_SEQ: &str = "sync.manager.historical.next_tx_seq";
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct HistoricalTxWriterState {
|
||||||
|
pub next_tx_seq: u64,
|
||||||
|
pub pending_txs: usize,
|
||||||
|
pub ready_txs: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HistoricalTxWriter {
|
||||||
|
config: Config,
|
||||||
|
store: Store,
|
||||||
|
sync_store: Arc<SyncStore>,
|
||||||
|
next_tx_seq: Arc<AtomicU64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HistoricalTxWriter {
|
||||||
|
pub async fn new(config: Config, store: Store, sync_store: Arc<SyncStore>) -> Result<Self> {
|
||||||
|
let next_tx_seq = store
|
||||||
|
.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
config,
|
||||||
|
store,
|
||||||
|
sync_store,
|
||||||
|
next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_state(&self) -> Result<HistoricalTxWriterState> {
|
||||||
|
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
|
||||||
|
|
||||||
|
Ok(HistoricalTxWriterState {
|
||||||
|
next_tx_seq: self.next_tx_seq.load(Ordering::Relaxed),
|
||||||
|
pending_txs,
|
||||||
|
ready_txs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start(mut self) {
|
||||||
|
info!(
|
||||||
|
"Start to write historical files into sync store, state = {:?}",
|
||||||
|
self.get_state().await
|
||||||
|
);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.write_once().await {
|
||||||
|
Ok(true) => {}
|
||||||
|
Ok(false) => {
|
||||||
|
trace!(
|
||||||
|
"There is no tx to write in sync store, state = {:?}",
|
||||||
|
self.get_state().await
|
||||||
|
);
|
||||||
|
sleep(self.config.auto_sync_idle_interval).await;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!(%err, "Failed to write tx once, state = {:?}", self.get_state().await);
|
||||||
|
sleep(self.config.auto_sync_error_interval).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_once(&mut self) -> Result<bool> {
|
||||||
|
let mut next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||||
|
|
||||||
|
// no tx to write in sync store
|
||||||
|
if next_tx_seq >= self.store.get_store().next_tx_seq() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// write tx in sync store if not finalized or pruned
|
||||||
|
if self
|
||||||
|
.store
|
||||||
|
.get_store()
|
||||||
|
.get_tx_status(shared_types::TxSeqOrRoot::TxSeq(next_tx_seq))?
|
||||||
|
.is_none()
|
||||||
|
{
|
||||||
|
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// move forward
|
||||||
|
next_tx_seq += 1;
|
||||||
|
self.store
|
||||||
|
.set_config_encoded(&KEY_NEXT_TX_SEQ, &next_tx_seq, DATA_DB_KEY)
|
||||||
|
.await?;
|
||||||
|
self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
}
|
@ -18,6 +18,7 @@ use crate::{Config, SyncSender};
|
|||||||
use super::{
|
use super::{
|
||||||
batcher_random::RandomBatcher,
|
batcher_random::RandomBatcher,
|
||||||
batcher_serial::SerialBatcher,
|
batcher_serial::SerialBatcher,
|
||||||
|
historical_tx_writer::HistoricalTxWriter,
|
||||||
sync_store::{Queue, SyncStore},
|
sync_store::{Queue, SyncStore},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -76,7 +77,13 @@ impl AutoSyncManager {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// sync randomly
|
// sync randomly
|
||||||
let random = RandomBatcher::new(config, store, sync_send, sync_store);
|
let random = RandomBatcher::new(
|
||||||
|
"random".into(),
|
||||||
|
config,
|
||||||
|
store.clone(),
|
||||||
|
sync_send.clone(),
|
||||||
|
sync_store,
|
||||||
|
);
|
||||||
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
|
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
|
||||||
|
|
||||||
// handle on catched up notification
|
// handle on catched up notification
|
||||||
@ -85,6 +92,32 @@ impl AutoSyncManager {
|
|||||||
"auto_sync_wait_for_catchup",
|
"auto_sync_wait_for_catchup",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// sync randomly for files without NewFile announcement
|
||||||
|
if config.neighbors_only {
|
||||||
|
let historical_sync_store = Arc::new(SyncStore::new_with_name(
|
||||||
|
store.clone(),
|
||||||
|
"pendingv2_historical",
|
||||||
|
"readyv2_historical",
|
||||||
|
));
|
||||||
|
|
||||||
|
let writer =
|
||||||
|
HistoricalTxWriter::new(config, store.clone(), historical_sync_store.clone())
|
||||||
|
.await?;
|
||||||
|
executor.spawn(writer.start(), "auto_sync_historical_writer");
|
||||||
|
|
||||||
|
let random_historical = RandomBatcher::new(
|
||||||
|
"random_historical".into(),
|
||||||
|
config,
|
||||||
|
store,
|
||||||
|
sync_send,
|
||||||
|
historical_sync_store,
|
||||||
|
);
|
||||||
|
executor.spawn(
|
||||||
|
random_historical.start(catched_up.clone()),
|
||||||
|
"auto_sync_random_historical",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
serial,
|
serial,
|
||||||
random,
|
random,
|
||||||
|
@ -14,9 +14,9 @@ lazy_static::lazy_static! {
|
|||||||
pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout");
|
pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout");
|
||||||
|
|
||||||
// random auto sync
|
// random auto sync
|
||||||
pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024);
|
// pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024);
|
||||||
pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
|
// pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
|
||||||
pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);
|
// pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);
|
||||||
|
|
||||||
pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed");
|
pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed");
|
||||||
pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed");
|
pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed");
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
mod batcher;
|
mod batcher;
|
||||||
pub mod batcher_random;
|
pub mod batcher_random;
|
||||||
pub mod batcher_serial;
|
pub mod batcher_serial;
|
||||||
|
mod historical_tx_writer;
|
||||||
pub mod manager;
|
pub mod manager;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
pub mod sync_store;
|
pub mod sync_store;
|
||||||
|
@ -19,16 +19,27 @@ class AutoRandomSyncV2Test(TestFramework):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def run_test(self):
|
def run_test(self):
|
||||||
|
# Stop the last node to verify historical file sync
|
||||||
|
self.stop_storage_node(self.num_nodes - 1)
|
||||||
|
|
||||||
# Submit and upload files on node 0
|
# Submit and upload files on node 0
|
||||||
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||||
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
||||||
|
|
||||||
# Files should be available on other nodes via auto sync
|
# Files should be available on other nodes via auto sync
|
||||||
for i in range(1, self.num_nodes):
|
for i in range(1, self.num_nodes - 1):
|
||||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
|
||||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
|
||||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
|
||||||
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
|
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
|
||||||
|
|
||||||
|
# Start the last node to verify historical file sync
|
||||||
|
self.start_storage_node(self.num_nodes - 1)
|
||||||
|
self.nodes[self.num_nodes - 1].wait_for_rpc_connection()
|
||||||
|
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1) is not None)
|
||||||
|
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)["finalized"])
|
||||||
|
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2) is not None)
|
||||||
|
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)["finalized"])
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
AutoRandomSyncV2Test().main()
|
AutoRandomSyncV2Test().main()
|
||||||
|
Loading…
Reference in New Issue
Block a user