mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Merge branch 'main' of https://github.com/0glabs/0g-storage-node into forcestart
This commit is contained in:
commit
93f99e3537
@ -16,9 +16,9 @@ pub struct Config {
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Config {
|
||||
max_entries_total: 256000,
|
||||
max_entries_total: 1000000,
|
||||
max_entries_per_file: 4,
|
||||
entry_expiration_time_secs: 3600,
|
||||
entry_expiration_time_secs: 86400,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,9 +17,12 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
pub struct LogEntryFetcher {
|
||||
@ -249,7 +252,10 @@ impl LogEntryFetcher {
|
||||
}) {
|
||||
Ok(event) => {
|
||||
if let Err(e) = recover_tx
|
||||
.send(submission_event_to_transaction(event))
|
||||
.send(submission_event_to_transaction(
|
||||
event,
|
||||
log.block_number.expect("block number exist").as_u64(),
|
||||
))
|
||||
.and_then(|_| match sync_progress {
|
||||
Some(b) => recover_tx.send(b),
|
||||
None => Ok(()),
|
||||
@ -285,12 +291,14 @@ impl LogEntryFetcher {
|
||||
executor: &TaskExecutor,
|
||||
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
|
||||
watch_loop_wait_time_ms: u64,
|
||||
mut watch_progress_rx: UnboundedReceiver<u64>,
|
||||
) -> UnboundedReceiver<LogFetchProgress> {
|
||||
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
|
||||
let provider = self.provider.clone();
|
||||
let confirmation_delay = self.confirmation_delay;
|
||||
let log_page_size = self.log_page_size;
|
||||
let mut progress_reset_history = BTreeMap::new();
|
||||
executor.spawn(
|
||||
async move {
|
||||
debug!("start_watch starts, start={}", start_block_number);
|
||||
@ -298,6 +306,13 @@ impl LogEntryFetcher {
|
||||
let mut parent_block_hash = parent_block_hash;
|
||||
|
||||
loop {
|
||||
check_watch_process(
|
||||
&mut watch_progress_rx,
|
||||
&mut progress,
|
||||
&mut progress_reset_history,
|
||||
watch_loop_wait_time_ms,
|
||||
);
|
||||
|
||||
match Self::watch_loop(
|
||||
provider.as_ref(),
|
||||
progress,
|
||||
@ -489,7 +504,8 @@ impl LogEntryFetcher {
|
||||
first_submission_index = Some(submit_filter.submission_index.as_u64());
|
||||
}
|
||||
|
||||
log_events.push(submission_event_to_transaction(submit_filter));
|
||||
log_events
|
||||
.push(submission_event_to_transaction(submit_filter, block_number));
|
||||
}
|
||||
|
||||
info!("synced {} events", log_events.len());
|
||||
@ -510,12 +526,21 @@ impl LogEntryFetcher {
|
||||
return Ok(progress);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(p) = &new_progress {
|
||||
if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
|
||||
warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e);
|
||||
return Ok(progress);
|
||||
} else {
|
||||
block_hash_cache.write().await.insert(p.0, None);
|
||||
let mut cache = block_hash_cache.write().await;
|
||||
match cache.get(&p.0) {
|
||||
Some(Some(v))
|
||||
if v.block_hash == p.1
|
||||
&& v.first_submission_index == p.2.unwrap() => {}
|
||||
_ => {
|
||||
cache.insert(p.0, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
progress = new_progress;
|
||||
@ -530,6 +555,59 @@ impl LogEntryFetcher {
|
||||
}
|
||||
}
|
||||
|
||||
fn check_watch_process(
|
||||
watch_progress_rx: &mut UnboundedReceiver<u64>,
|
||||
progress: &mut u64,
|
||||
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
|
||||
watch_loop_wait_time_ms: u64,
|
||||
) {
|
||||
let mut min_received_progress = None;
|
||||
while let Ok(v) = watch_progress_rx.try_recv() {
|
||||
min_received_progress = match min_received_progress {
|
||||
Some(min) if min > v => Some(v),
|
||||
None => Some(v),
|
||||
_ => min_received_progress,
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(v) = min_received_progress {
|
||||
if *progress <= v {
|
||||
error!(
|
||||
"received unexpected progress, current {}, received {}",
|
||||
*progress, v
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
match progress_reset_history.get_mut(&v) {
|
||||
Some((last_update, counter)) => {
|
||||
if *counter >= 3 {
|
||||
error!("maximum reset attempts have been reached.");
|
||||
watch_progress_rx.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if now.duration_since(*last_update)
|
||||
>= Duration::from_millis(watch_loop_wait_time_ms * 30)
|
||||
{
|
||||
info!("reset to progress from {} to {}", *progress, v);
|
||||
*progress = v;
|
||||
*last_update = now;
|
||||
*counter += 1;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("reset to progress from {} to {}", *progress, v);
|
||||
*progress = v;
|
||||
progress_reset_history.insert(v, (now, 1usize));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
progress_reset_history.retain(|k, _| k + 1000 >= *progress);
|
||||
}
|
||||
|
||||
async fn revert_one_block(
|
||||
block_hash: H256,
|
||||
block_number: u64,
|
||||
@ -583,26 +661,29 @@ async fn revert_one_block(
|
||||
#[derive(Debug)]
|
||||
pub enum LogFetchProgress {
|
||||
SyncedBlock((u64, H256, Option<Option<u64>>)),
|
||||
Transaction(Transaction),
|
||||
Transaction((Transaction, u64)),
|
||||
Reverted(u64),
|
||||
}
|
||||
|
||||
fn submission_event_to_transaction(e: SubmitFilter) -> LogFetchProgress {
|
||||
LogFetchProgress::Transaction(Transaction {
|
||||
stream_ids: vec![],
|
||||
data: vec![],
|
||||
data_merkle_root: nodes_to_root(&e.submission.nodes),
|
||||
merkle_nodes: e
|
||||
.submission
|
||||
.nodes
|
||||
.iter()
|
||||
// the submission height is the height of the root node starting from height 0.
|
||||
.map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into()))
|
||||
.collect(),
|
||||
start_entry_index: e.start_pos.as_u64(),
|
||||
size: e.submission.length.as_u64(),
|
||||
seq: e.submission_index.as_u64(),
|
||||
})
|
||||
fn submission_event_to_transaction(e: SubmitFilter, block_number: u64) -> LogFetchProgress {
|
||||
LogFetchProgress::Transaction((
|
||||
Transaction {
|
||||
stream_ids: vec![],
|
||||
data: vec![],
|
||||
data_merkle_root: nodes_to_root(&e.submission.nodes),
|
||||
merkle_nodes: e
|
||||
.submission
|
||||
.nodes
|
||||
.iter()
|
||||
// the submission height is the height of the root node starting from height 0.
|
||||
.map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into()))
|
||||
.collect(),
|
||||
start_entry_index: e.start_pos.as_u64(),
|
||||
size: e.submission.length.as_u64(),
|
||||
seq: e.submission_index.as_u64(),
|
||||
},
|
||||
block_number,
|
||||
))
|
||||
}
|
||||
|
||||
fn nodes_to_root(node_list: &[SubmissionNode]) -> DataRoot {
|
||||
|
@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::{ShutdownReason, TaskExecutor};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
const RETRY_WAIT_MS: u64 = 500;
|
||||
@ -102,25 +102,8 @@ impl LogSyncManager {
|
||||
block_hash_cache,
|
||||
};
|
||||
|
||||
let (mut start_block_number, mut start_block_hash) = if log_sync_manager
|
||||
.config
|
||||
.force_log_sync_from_start_block_number
|
||||
{
|
||||
let block_number = log_sync_manager.config.start_block_number;
|
||||
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
|
||||
(block_number, block_hash)
|
||||
} else {
|
||||
match log_sync_manager.store.get_sync_progress()? {
|
||||
// No previous progress, so just use config.
|
||||
None => {
|
||||
let block_number = log_sync_manager.config.start_block_number;
|
||||
let block_hash =
|
||||
log_sync_manager.get_block(block_number.into()).await?.1;
|
||||
(block_number, block_hash)
|
||||
}
|
||||
Some((block_number, block_hash)) => (block_number, block_hash),
|
||||
}
|
||||
};
|
||||
let (mut start_block_number, mut start_block_hash) =
|
||||
get_start_block_number_with_hash(&log_sync_manager).await?;
|
||||
|
||||
let (mut finalized_block_number, mut finalized_block_hash) =
|
||||
match log_sync_manager.get_block(BlockNumber::Finalized).await {
|
||||
@ -154,7 +137,7 @@ impl LogSyncManager {
|
||||
&executor_clone,
|
||||
log_sync_manager.block_hash_cache.clone(),
|
||||
);
|
||||
log_sync_manager.handle_data(reorg_rx).await?;
|
||||
log_sync_manager.handle_data(reorg_rx, &None).await?;
|
||||
if let Some((block_number, block_hash)) =
|
||||
log_sync_manager.store.get_sync_progress()?
|
||||
{
|
||||
@ -231,15 +214,20 @@ impl LogSyncManager {
|
||||
warn!("catch_up_end send fails, possibly auto_sync is not enabled");
|
||||
}
|
||||
|
||||
let (watch_progress_tx, watch_progress_rx) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let watch_rx = log_sync_manager.log_fetcher.start_watch(
|
||||
start_block_number,
|
||||
parent_block_hash,
|
||||
&executor_clone,
|
||||
log_sync_manager.block_hash_cache.clone(),
|
||||
log_sync_manager.config.watch_loop_wait_time_ms,
|
||||
watch_progress_rx,
|
||||
);
|
||||
// Syncing `watch_rx` is supposed to block forever.
|
||||
log_sync_manager.handle_data(watch_rx).await?;
|
||||
log_sync_manager
|
||||
.handle_data(watch_rx, &Some(watch_progress_tx))
|
||||
.await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
},
|
||||
)
|
||||
@ -249,20 +237,20 @@ impl LogSyncManager {
|
||||
Ok((event_send_cloned, catch_up_end_receiver))
|
||||
}
|
||||
|
||||
async fn put_tx(&mut self, tx: Transaction) -> bool {
|
||||
async fn put_tx(&mut self, tx: Transaction) -> Option<bool> {
|
||||
// We call this after process chain reorg, so the sequence number should match.
|
||||
match tx.seq.cmp(&self.next_tx_seq) {
|
||||
std::cmp::Ordering::Less => true,
|
||||
std::cmp::Ordering::Less => Some(true),
|
||||
std::cmp::Ordering::Equal => {
|
||||
debug!("log entry sync get entry: {:?}", tx);
|
||||
self.put_tx_inner(tx).await
|
||||
Some(self.put_tx_inner(tx).await)
|
||||
}
|
||||
std::cmp::Ordering::Greater => {
|
||||
error!(
|
||||
"Unexpected transaction seq: next={} get={}",
|
||||
self.next_tx_seq, tx.seq
|
||||
);
|
||||
false
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -304,7 +292,18 @@ impl LogSyncManager {
|
||||
let _ = self.event_send.send(LogSyncEvent::Reverted { tx_seq });
|
||||
}
|
||||
|
||||
async fn handle_data(&mut self, mut rx: UnboundedReceiver<LogFetchProgress>) -> Result<()> {
|
||||
async fn handle_data(
|
||||
&mut self,
|
||||
mut rx: UnboundedReceiver<LogFetchProgress>,
|
||||
watch_progress_tx: &Option<UnboundedSender<u64>>,
|
||||
) -> Result<()> {
|
||||
let mut log_latest_block_number =
|
||||
if let Some(block_number) = self.store.get_log_latest_block_number()? {
|
||||
block_number
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
while let Some(data) = rx.recv().await {
|
||||
debug!("handle_data: data={:?}", data);
|
||||
match data {
|
||||
@ -344,8 +343,30 @@ impl LogSyncManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
LogFetchProgress::Transaction(tx) => {
|
||||
if !self.put_tx(tx.clone()).await {
|
||||
LogFetchProgress::Transaction((tx, block_number)) => {
|
||||
let mut stop = false;
|
||||
match self.put_tx(tx.clone()).await {
|
||||
Some(false) => stop = true,
|
||||
Some(true) => {
|
||||
if let Err(e) = self.store.put_log_latest_block_number(block_number) {
|
||||
warn!("failed to put log latest block number, error={:?}", e);
|
||||
}
|
||||
|
||||
log_latest_block_number = block_number;
|
||||
}
|
||||
_ => {
|
||||
stop = true;
|
||||
if let Some(progress_tx) = watch_progress_tx {
|
||||
if let Err(e) = progress_tx.send(log_latest_block_number) {
|
||||
error!("failed to send watch progress, error={:?}", e);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if stop {
|
||||
// Unexpected error.
|
||||
bail!("log sync write error");
|
||||
}
|
||||
@ -434,7 +455,7 @@ impl LogSyncManager {
|
||||
&executor_clone,
|
||||
Duration::from_millis(self.config.recover_query_delay),
|
||||
);
|
||||
self.handle_data(recover_rx).await?;
|
||||
self.handle_data(recover_rx, &None).await?;
|
||||
}
|
||||
|
||||
self.log_fetcher.start_remove_finalized_block_task(
|
||||
@ -448,6 +469,33 @@ impl LogSyncManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_start_block_number_with_hash(
|
||||
log_sync_manager: &LogSyncManager,
|
||||
) -> Result<(u64, H256), anyhow::Error> {
|
||||
if let Some(block_number) = log_sync_manager.store.get_log_latest_block_number()? {
|
||||
if let Some(Some(val)) = log_sync_manager
|
||||
.block_hash_cache
|
||||
.read()
|
||||
.await
|
||||
.get(&block_number)
|
||||
{
|
||||
return Ok((block_number, val.block_hash));
|
||||
}
|
||||
}
|
||||
|
||||
let (start_block_number, start_block_hash) = match log_sync_manager.store.get_sync_progress()? {
|
||||
// No previous progress, so just use config.
|
||||
None => {
|
||||
let block_number = log_sync_manager.config.start_block_number;
|
||||
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
|
||||
(block_number, block_hash)
|
||||
}
|
||||
Some((block_number, block_hash)) => (block_number, block_hash),
|
||||
};
|
||||
|
||||
Ok((start_block_number, start_block_hash))
|
||||
}
|
||||
|
||||
async fn run_and_log<R, E>(
|
||||
mut on_error: impl FnMut(),
|
||||
f: impl Future<Output = std::result::Result<R, E>> + Send,
|
||||
|
@ -7,7 +7,8 @@ use merkle_tree::RawLeafSha3Algorithm;
|
||||
use network::Multiaddr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use shared_types::{
|
||||
compute_padded_chunk_size, compute_segment_size, DataRoot, FileProof, Transaction, CHUNK_SIZE,
|
||||
compute_padded_chunk_size, compute_segment_size, DataRoot, FileProof, NetworkIdentity,
|
||||
Transaction, CHUNK_SIZE,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
use std::hash::Hasher;
|
||||
@ -28,6 +29,7 @@ pub struct Status {
|
||||
pub connected_peers: usize,
|
||||
pub log_sync_height: u64,
|
||||
pub log_sync_block: H256,
|
||||
pub network_identity: NetworkIdentity,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
@ -30,6 +30,7 @@ impl RpcServer for RpcServerImpl {
|
||||
connected_peers: self.ctx.network_globals.connected_peers(),
|
||||
log_sync_height: sync_progress.0,
|
||||
log_sync_block: sync_progress.1,
|
||||
network_identity: self.ctx.network_globals.network_id(),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -370,6 +370,7 @@ impl TryFrom<FileProof> for FlowProof {
|
||||
#[derive(
|
||||
DeriveEncode, DeriveDecode, Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize,
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NetworkIdentity {
|
||||
/// The chain id of the blockchain network.
|
||||
pub chain_id: u64,
|
||||
@ -384,6 +385,7 @@ pub struct NetworkIdentity {
|
||||
#[derive(
|
||||
DeriveEncode, DeriveDecode, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize,
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProtocolVersion {
|
||||
pub major: u8,
|
||||
pub minor: u8,
|
||||
|
@ -345,6 +345,10 @@ impl LogStoreWrite for LogManager {
|
||||
self.tx_store.put_progress(progress)
|
||||
}
|
||||
|
||||
fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
|
||||
self.tx_store.put_log_latest_block_number(block_number)
|
||||
}
|
||||
|
||||
/// Return the reverted Transactions in order.
|
||||
/// `tx_seq == u64::MAX` is a special case for reverting all transactions.
|
||||
fn revert_to(&self, tx_seq: u64) -> Result<Vec<Transaction>> {
|
||||
@ -534,6 +538,10 @@ impl LogStoreRead for LogManager {
|
||||
self.tx_store.get_progress()
|
||||
}
|
||||
|
||||
fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
|
||||
self.tx_store.get_log_latest_block_number()
|
||||
}
|
||||
|
||||
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>> {
|
||||
self.tx_store.get_block_hash_by_number(block_number)
|
||||
}
|
||||
|
@ -59,6 +59,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
||||
|
||||
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
|
||||
|
||||
fn get_log_latest_block_number(&self) -> Result<Option<u64>>;
|
||||
|
||||
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>>;
|
||||
|
||||
fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>>;
|
||||
@ -126,6 +128,9 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
|
||||
/// Store the progress of synced block number and its hash.
|
||||
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
|
||||
|
||||
/// Store the latest block number which has log
|
||||
fn put_log_latest_block_number(&self, block_number: u64) -> Result<()>;
|
||||
|
||||
/// Revert the log state to a given tx seq.
|
||||
/// This is needed when transactions are reverted because of chain reorg.
|
||||
///
|
||||
|
@ -19,6 +19,7 @@ use tracing::{error, instrument};
|
||||
|
||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
||||
const NEXT_TX_KEY: &str = "next_tx_seq";
|
||||
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
|
||||
|
||||
const TX_STATUS_FINALIZED: u8 = 0;
|
||||
const TX_STATUS_PRUNED: u8 = 1;
|
||||
@ -214,6 +215,25 @@ impl TransactionStore {
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
|
||||
Ok(self.kvdb.put(
|
||||
COL_MISC,
|
||||
LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(),
|
||||
&block_number.as_ssz_bytes(),
|
||||
)?)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
|
||||
Ok(Some(
|
||||
<u64>::from_ssz_bytes(&try_option!(self
|
||||
.kvdb
|
||||
.get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?))
|
||||
.map_err(Error::from)?,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn get_block_hash_by_number(
|
||||
&self,
|
||||
block_number: u64,
|
||||
|
@ -1,6 +1,6 @@
|
||||
use super::{batcher::Batcher, sync_store::SyncStore};
|
||||
use crate::{
|
||||
auto_sync::{batcher::SyncResult, metrics},
|
||||
auto_sync::{batcher::SyncResult, metrics, sync_store::Queue},
|
||||
Config, SyncSender,
|
||||
};
|
||||
use anyhow::Result;
|
||||
@ -108,16 +108,17 @@ impl RandomBatcher {
|
||||
SyncResult::Timeout => metrics::RANDOM_SYNC_RESULT_TIMEOUT.inc(1),
|
||||
}
|
||||
|
||||
match sync_result {
|
||||
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,
|
||||
_ => self.sync_store.downgrade_tx_to_pending(tx_seq).await?,
|
||||
};
|
||||
if matches!(sync_result, SyncResult::Completed) {
|
||||
self.sync_store.remove(tx_seq).await?;
|
||||
} else {
|
||||
self.sync_store.insert(tx_seq, Queue::Pending).await?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn schedule(&mut self) -> Result<bool> {
|
||||
let tx_seq = match self.sync_store.random_tx().await? {
|
||||
let tx_seq = match self.sync_store.random().await? {
|
||||
Some(v) => v,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
@ -2,7 +2,10 @@ use super::{
|
||||
batcher::{Batcher, SyncResult},
|
||||
sync_store::SyncStore,
|
||||
};
|
||||
use crate::{auto_sync::metrics, Config, SyncSender};
|
||||
use crate::{
|
||||
auto_sync::{metrics, sync_store::Queue},
|
||||
Config, SyncSender,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use log_entry_sync::LogSyncEvent;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -200,7 +203,7 @@ impl SerialBatcher {
|
||||
}
|
||||
|
||||
// otherwise, mark tx as ready for sync
|
||||
if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await {
|
||||
if let Err(err) = self.sync_store.upgrade(announced_tx_seq).await {
|
||||
error!(%err, %announced_tx_seq, "Failed to promote announced tx to ready, state = {:?}", self.get_state().await);
|
||||
}
|
||||
|
||||
@ -280,11 +283,28 @@ impl SerialBatcher {
|
||||
|
||||
/// Schedule file sync in sequence.
|
||||
async fn schedule_next(&mut self) -> Result<bool> {
|
||||
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) {
|
||||
let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
|
||||
if max_tx_seq == u64::MAX {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let mut next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
|
||||
if next_tx_seq > max_tx_seq {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// If sequential sync disabled, delegate to random sync.
|
||||
if self.config.max_sequential_workers == 0 {
|
||||
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
|
||||
|
||||
next_tx_seq += 1;
|
||||
self.sync_store.set_next_tx_seq(next_tx_seq).await?;
|
||||
self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed);
|
||||
self.next_tx_seq_in_db.store(next_tx_seq, Ordering::Relaxed);
|
||||
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
if !self.batcher.add(next_tx_seq).await? {
|
||||
return Ok(false);
|
||||
}
|
||||
@ -309,7 +329,7 @@ impl SerialBatcher {
|
||||
|
||||
// downgrade to random sync if file sync failed or timeout
|
||||
if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) {
|
||||
self.sync_store.add_pending_tx(current).await?;
|
||||
self.sync_store.insert(current, Queue::Pending).await?;
|
||||
}
|
||||
|
||||
// always move forward in db
|
||||
|
@ -8,6 +8,20 @@ use tokio::sync::RwLock;
|
||||
const KEY_NEXT_TX_SEQ: &str = "sync.manager.next_tx_seq";
|
||||
const KEY_MAX_TX_SEQ: &str = "sync.manager.max_tx_seq";
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum Queue {
|
||||
Ready,
|
||||
Pending,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum InsertResult {
|
||||
NewAdded, // new added in target queue
|
||||
AlreadyExists, // already exists in target queue
|
||||
Upgraded, // upgraded from pending queue to ready queue
|
||||
Downgraded, // downgraged from ready queue to pending queue
|
||||
}
|
||||
|
||||
pub struct SyncStore {
|
||||
store: Arc<RwLock<Store>>,
|
||||
|
||||
@ -64,31 +78,69 @@ impl SyncStore {
|
||||
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
|
||||
}
|
||||
|
||||
pub async fn add_pending_tx(&self, tx_seq: u64) -> Result<bool> {
|
||||
let async_store = self.store.write().await;
|
||||
pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
|
||||
let async_store = self.store.read().await;
|
||||
let store = async_store.get_store();
|
||||
|
||||
// already in ready queue
|
||||
if self.ready_txs.has(store, tx_seq)? {
|
||||
return Ok(false);
|
||||
return Ok(Some(Queue::Ready));
|
||||
}
|
||||
|
||||
// always add in pending queue
|
||||
self.pending_txs.add(store, None, tx_seq)
|
||||
if self.pending_txs.has(store, tx_seq)? {
|
||||
return Ok(Some(Queue::Pending));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result<bool> {
|
||||
pub async fn insert(&self, tx_seq: u64, queue: Queue) -> Result<InsertResult> {
|
||||
let async_store = self.store.write().await;
|
||||
let store = async_store.get_store();
|
||||
|
||||
let mut tx = ConfigTx::default();
|
||||
|
||||
match queue {
|
||||
Queue::Ready => {
|
||||
if !self.ready_txs.add(store, Some(&mut tx), tx_seq)? {
|
||||
return Ok(InsertResult::AlreadyExists);
|
||||
}
|
||||
|
||||
let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
|
||||
store.exec_configs(tx)?;
|
||||
|
||||
if removed {
|
||||
Ok(InsertResult::Upgraded)
|
||||
} else {
|
||||
Ok(InsertResult::NewAdded)
|
||||
}
|
||||
}
|
||||
Queue::Pending => {
|
||||
if !self.pending_txs.add(store, Some(&mut tx), tx_seq)? {
|
||||
return Ok(InsertResult::AlreadyExists);
|
||||
}
|
||||
|
||||
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
|
||||
store.exec_configs(tx)?;
|
||||
|
||||
if removed {
|
||||
Ok(InsertResult::Downgraded)
|
||||
} else {
|
||||
Ok(InsertResult::NewAdded)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upgrade(&self, tx_seq: u64) -> Result<bool> {
|
||||
let async_store = self.store.write().await;
|
||||
let store = async_store.get_store();
|
||||
|
||||
let mut tx = ConfigTx::default();
|
||||
|
||||
// not in pending queue
|
||||
if !self.pending_txs.remove(store, Some(&mut tx), tx_seq)? {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// move from pending to ready queue
|
||||
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
|
||||
|
||||
store.exec_configs(tx)?;
|
||||
@ -96,26 +148,7 @@ impl SyncStore {
|
||||
Ok(added)
|
||||
}
|
||||
|
||||
pub async fn downgrade_tx_to_pending(&self, tx_seq: u64) -> Result<bool> {
|
||||
let async_store = self.store.write().await;
|
||||
let store = async_store.get_store();
|
||||
|
||||
let mut tx = ConfigTx::default();
|
||||
|
||||
// not in ready queue
|
||||
if !self.ready_txs.remove(store, Some(&mut tx), tx_seq)? {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// move from ready to pending queue
|
||||
let added = self.pending_txs.add(store, Some(&mut tx), tx_seq)?;
|
||||
|
||||
store.exec_configs(tx)?;
|
||||
|
||||
Ok(added)
|
||||
}
|
||||
|
||||
pub async fn random_tx(&self) -> Result<Option<u64>> {
|
||||
pub async fn random(&self) -> Result<Option<u64>> {
|
||||
let async_store = self.store.read().await;
|
||||
let store = async_store.get_store();
|
||||
|
||||
@ -128,17 +161,21 @@ impl SyncStore {
|
||||
self.pending_txs.random(store)
|
||||
}
|
||||
|
||||
pub async fn remove_tx(&self, tx_seq: u64) -> Result<bool> {
|
||||
pub async fn remove(&self, tx_seq: u64) -> Result<Option<Queue>> {
|
||||
let async_store = self.store.write().await;
|
||||
let store = async_store.get_store();
|
||||
|
||||
// removed in ready queue
|
||||
if self.ready_txs.remove(store, None, tx_seq)? {
|
||||
return Ok(true);
|
||||
return Ok(Some(Queue::Ready));
|
||||
}
|
||||
|
||||
// otherwise, try to remove in pending queue
|
||||
self.pending_txs.remove(store, None, tx_seq)
|
||||
// removed in pending queue
|
||||
if self.pending_txs.remove(store, None, tx_seq)? {
|
||||
return Ok(Some(Queue::Pending));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@ -146,7 +183,7 @@ impl SyncStore {
|
||||
mod tests {
|
||||
use crate::test_util::tests::TestStoreRuntime;
|
||||
|
||||
use super::SyncStore;
|
||||
use super::{InsertResult::*, Queue::*, SyncStore};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tx_seq_range() {
|
||||
@ -165,106 +202,79 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_pending_tx() {
|
||||
async fn test_insert() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let store = SyncStore::new(runtime.store.clone());
|
||||
|
||||
// add pending tx 3
|
||||
assert!(store.add_pending_tx(3).await.unwrap());
|
||||
assert_eq!(store.contains(1).await.unwrap(), None);
|
||||
assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded);
|
||||
assert_eq!(store.contains(1).await.unwrap(), Some(Pending));
|
||||
assert_eq!(store.insert(1, Pending).await.unwrap(), AlreadyExists);
|
||||
assert_eq!(store.insert(1, Ready).await.unwrap(), Upgraded);
|
||||
assert_eq!(store.contains(1).await.unwrap(), Some(Ready));
|
||||
|
||||
// cannot add pending tx 3 again
|
||||
assert!(!store.add_pending_tx(3).await.unwrap());
|
||||
assert_eq!(store.insert(2, Ready).await.unwrap(), NewAdded);
|
||||
assert_eq!(store.contains(2).await.unwrap(), Some(Ready));
|
||||
assert_eq!(store.insert(2, Ready).await.unwrap(), AlreadyExists);
|
||||
assert_eq!(store.insert(2, Pending).await.unwrap(), Downgraded);
|
||||
assert_eq!(store.contains(2).await.unwrap(), Some(Pending));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upgrade_tx() {
|
||||
async fn test_upgrade() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let store = SyncStore::new(runtime.store.clone());
|
||||
|
||||
// cannot upgrade by default
|
||||
assert!(!store.upgrade_tx_to_ready(3).await.unwrap());
|
||||
assert!(!store.upgrade(3).await.unwrap());
|
||||
|
||||
// add pending tx 3
|
||||
assert!(store.add_pending_tx(3).await.unwrap());
|
||||
assert_eq!(store.insert(3, Pending).await.unwrap(), NewAdded);
|
||||
|
||||
// can upgrade to ready
|
||||
assert!(store.upgrade_tx_to_ready(3).await.unwrap());
|
||||
|
||||
// cannot add pending tx 3 again event upgraded to ready
|
||||
assert!(!store.add_pending_tx(3).await.unwrap());
|
||||
assert!(store.upgrade(3).await.unwrap());
|
||||
assert_eq!(store.contains(3).await.unwrap(), Some(Ready));
|
||||
|
||||
// cannot upgrade again
|
||||
assert!(!store.upgrade_tx_to_ready(3).await.unwrap());
|
||||
assert!(!store.upgrade(3).await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_downgrade_tx() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let store = SyncStore::new(runtime.store.clone());
|
||||
|
||||
// cannot downgrade by default
|
||||
assert!(!store.downgrade_tx_to_pending(3).await.unwrap());
|
||||
|
||||
// add pending tx 3
|
||||
assert!(store.add_pending_tx(3).await.unwrap());
|
||||
|
||||
// cannot downgrade for non-ready tx
|
||||
assert!(!store.downgrade_tx_to_pending(3).await.unwrap());
|
||||
|
||||
// upgrade tx 3 to ready
|
||||
assert!(store.upgrade_tx_to_ready(3).await.unwrap());
|
||||
|
||||
// can downgrade now
|
||||
assert!(store.downgrade_tx_to_pending(3).await.unwrap());
|
||||
|
||||
// cannot downgrade now
|
||||
assert!(!store.downgrade_tx_to_pending(3).await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_random_tx() {
|
||||
async fn test_random() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let store = SyncStore::new(runtime.store.clone());
|
||||
|
||||
// no tx by default
|
||||
assert_eq!(store.random_tx().await.unwrap(), None);
|
||||
assert_eq!(store.random().await.unwrap(), None);
|
||||
|
||||
// add pending txs 1, 2, 3
|
||||
assert!(store.add_pending_tx(1).await.unwrap());
|
||||
assert!(store.add_pending_tx(2).await.unwrap());
|
||||
assert!(store.add_pending_tx(3).await.unwrap());
|
||||
let tx = store.random_tx().await.unwrap().unwrap();
|
||||
assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded);
|
||||
assert_eq!(store.insert(2, Pending).await.unwrap(), NewAdded);
|
||||
assert_eq!(store.insert(3, Pending).await.unwrap(), NewAdded);
|
||||
let tx = store.random().await.unwrap().unwrap();
|
||||
assert!((1..=3).contains(&tx));
|
||||
|
||||
// upgrade tx 1 to ready
|
||||
assert!(store.upgrade_tx_to_ready(2).await.unwrap());
|
||||
assert_eq!(store.random_tx().await.unwrap(), Some(2));
|
||||
// upgrade tx 2 to ready
|
||||
assert_eq!(store.insert(2, Ready).await.unwrap(), Upgraded);
|
||||
assert_eq!(store.random().await.unwrap(), Some(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_tx() {
|
||||
async fn test_remove() {
|
||||
let runtime = TestStoreRuntime::default();
|
||||
let store = SyncStore::new(runtime.store.clone());
|
||||
|
||||
// cannot remove by default
|
||||
assert!(!store.remove_tx(1).await.unwrap());
|
||||
assert_eq!(store.remove(1).await.unwrap(), None);
|
||||
|
||||
// add pending tx 1, 2
|
||||
assert!(store.add_pending_tx(1).await.unwrap());
|
||||
assert!(store.add_pending_tx(2).await.unwrap());
|
||||
// add tx 1, 2
|
||||
assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded);
|
||||
assert_eq!(store.insert(2, Ready).await.unwrap(), NewAdded);
|
||||
|
||||
// upgrade tx 1 to ready
|
||||
assert!(store.upgrade_tx_to_ready(1).await.unwrap());
|
||||
assert_eq!(store.random_tx().await.unwrap(), Some(1));
|
||||
|
||||
// remove tx 1
|
||||
assert!(store.remove_tx(1).await.unwrap());
|
||||
assert_eq!(store.random_tx().await.unwrap(), Some(2));
|
||||
assert!(!store.remove_tx(1).await.unwrap());
|
||||
|
||||
// remove tx 2
|
||||
assert!(store.remove_tx(2).await.unwrap());
|
||||
assert_eq!(store.random_tx().await.unwrap(), None);
|
||||
assert!(!store.remove_tx(2).await.unwrap());
|
||||
// remove txs
|
||||
assert_eq!(store.remove(1).await.unwrap(), Some(Pending));
|
||||
assert_eq!(store.remove(1).await.unwrap(), None);
|
||||
assert_eq!(store.remove(2).await.unwrap(), Some(Ready));
|
||||
assert_eq!(store.remove(2).await.unwrap(), None);
|
||||
}
|
||||
}
|
||||
|
@ -80,8 +80,8 @@ impl Default for Config {
|
||||
// auto sync config
|
||||
auto_sync_idle_interval: Duration::from_secs(3),
|
||||
auto_sync_error_interval: Duration::from_secs(10),
|
||||
max_sequential_workers: 24,
|
||||
max_random_workers: 8,
|
||||
max_sequential_workers: 0,
|
||||
max_random_workers: 30,
|
||||
sequential_find_peer_timeout: Duration::from_secs(60),
|
||||
random_find_peer_timeout: Duration::from_secs(500),
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ log_contract_address = "0x0460aA47b41a66694c0a73f667a1b795A5ED3556"
|
||||
log_sync_start_block_number = 595059
|
||||
|
||||
# Number of blocks to confirm a transaction.
|
||||
confirmation_block_count = 6
|
||||
# confirmation_block_count = 3
|
||||
|
||||
# Maximum number of event logs to poll at a time.
|
||||
# log_page_size = 999
|
||||
@ -264,10 +264,10 @@ auto_sync_enabled = true
|
||||
# peer_chunks_download_timeout = "15s"
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 24
|
||||
# max_sequential_workers = 0
|
||||
|
||||
# Maximum threads to sync files randomly.
|
||||
# max_random_workers = 8
|
||||
# max_random_workers = 30
|
||||
|
||||
# Timeout to terminate a file sync in sequence.
|
||||
# sequential_find_peer_timeout = "60s"
|
||||
@ -287,14 +287,14 @@ auto_sync_enabled = true
|
||||
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
||||
|
||||
# Global cache capacity.
|
||||
# max_entries_total = 256000
|
||||
# max_entries_total = 1000000
|
||||
|
||||
# Location information capacity for each file.
|
||||
# max_entries_per_file = 4
|
||||
|
||||
# Validity period of location information.
|
||||
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
|
||||
# entry_expiration_time_secs = 3600
|
||||
# entry_expiration_time_secs = 86400
|
||||
|
||||
#######################################################################
|
||||
### Metrics Options ###
|
||||
|
@ -87,7 +87,7 @@ log_contract_address = "0xbD2C3F0E65eDF5582141C35969d66e34629cC768"
|
||||
log_sync_start_block_number = 595059
|
||||
|
||||
# Number of blocks to confirm a transaction.
|
||||
confirmation_block_count = 6
|
||||
# confirmation_block_count = 3
|
||||
|
||||
# Maximum number of event logs to poll at a time.
|
||||
# log_page_size = 999
|
||||
@ -276,10 +276,10 @@ auto_sync_enabled = true
|
||||
# peer_chunks_download_timeout = "15s"
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 24
|
||||
# max_sequential_workers = 0
|
||||
|
||||
# Maximum threads to sync files randomly.
|
||||
# max_random_workers = 8
|
||||
# max_random_workers = 30
|
||||
|
||||
# Timeout to terminate a file sync in sequence.
|
||||
# sequential_find_peer_timeout = "60s"
|
||||
@ -299,14 +299,14 @@ auto_sync_enabled = true
|
||||
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
||||
|
||||
# Global cache capacity.
|
||||
# max_entries_total = 256000
|
||||
# max_entries_total = 1000000
|
||||
|
||||
# Location information capacity for each file.
|
||||
# max_entries_per_file = 4
|
||||
|
||||
# Validity period of location information.
|
||||
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
|
||||
# entry_expiration_time_secs = 3600
|
||||
# entry_expiration_time_secs = 86400
|
||||
|
||||
#######################################################################
|
||||
### Metrics Options ###
|
||||
|
@ -87,7 +87,7 @@
|
||||
# log_sync_start_block_number = 0
|
||||
|
||||
# Number of blocks to confirm a transaction.
|
||||
# confirmation_block_count = 12
|
||||
# confirmation_block_count = 3
|
||||
|
||||
# Maximum number of event logs to poll at a time.
|
||||
# log_page_size = 999
|
||||
@ -278,10 +278,10 @@
|
||||
# peer_chunks_download_timeout = "15s"
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 24
|
||||
# max_sequential_workers = 0
|
||||
|
||||
# Maximum threads to sync files randomly.
|
||||
# max_random_workers = 8
|
||||
# max_random_workers = 30
|
||||
|
||||
# Timeout to terminate a file sync in sequence.
|
||||
# sequential_find_peer_timeout = "60s"
|
||||
@ -301,14 +301,14 @@
|
||||
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
||||
|
||||
# Global cache capacity.
|
||||
# max_entries_total = 256000
|
||||
# max_entries_total = 1000000
|
||||
|
||||
# Location information capacity for each file.
|
||||
# max_entries_per_file = 4
|
||||
|
||||
# Validity period of location information.
|
||||
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
|
||||
# entry_expiration_time_secs = 3600
|
||||
# entry_expiration_time_secs = 86400
|
||||
|
||||
#######################################################################
|
||||
### Metrics Options ###
|
||||
|
@ -63,3 +63,13 @@ TX_PARAMS1 = {
|
||||
|
||||
NO_SEAL_FLAG = 0x1
|
||||
NO_MERKLE_PROOF_FLAG = 0x2
|
||||
|
||||
def update_config(default: dict, custom: dict):
|
||||
"""
|
||||
Supports to update configurations with dict value.
|
||||
"""
|
||||
for (key, value) in custom.items():
|
||||
if default.get(key) is None or type(value) != dict:
|
||||
default[key] = value
|
||||
else:
|
||||
update_config(default[key], value)
|
||||
|
@ -5,18 +5,14 @@ import random
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.submission import create_submission
|
||||
from utility.submission import submit_data
|
||||
from utility.utils import (
|
||||
assert_equal,
|
||||
wait_until,
|
||||
)
|
||||
|
||||
from utility.utils import wait_until
|
||||
|
||||
class RandomTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_blockchain_nodes = 1
|
||||
self.num_nodes = 4
|
||||
for i in range(self.num_nodes):
|
||||
self.zgs_node_configs[i] = {"find_peer_timeout_secs": 1, "confirmation_block_count": 1, "sync": {"auto_sync_enabled": True}}
|
||||
self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}}
|
||||
|
||||
def run_test(self):
|
||||
max_size = 256 * 1024 * 64
|
||||
|
32
tests/sync_auto_random_test.py
Normal file
32
tests/sync_auto_random_test.py
Normal file
@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.utils import wait_until
|
||||
|
||||
class AutoRandomSyncTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_nodes = 2
|
||||
|
||||
# Enable random auto sync only
|
||||
for i in range(self.num_nodes):
|
||||
self.zgs_node_configs[i] = {
|
||||
"sync": {
|
||||
"auto_sync_enabled": True,
|
||||
"max_sequential_workers": 0,
|
||||
"max_random_workers": 3,
|
||||
}
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
# Submit and upload files on node 0
|
||||
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
||||
|
||||
# Files should be available on node 1 via auto sync
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
|
||||
|
||||
if __name__ == "__main__":
|
||||
AutoRandomSyncTest().main()
|
32
tests/sync_auto_sequential_test.py
Normal file
32
tests/sync_auto_sequential_test.py
Normal file
@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.utils import wait_until
|
||||
|
||||
class AutoSequentialSyncTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_nodes = 2
|
||||
|
||||
# Enable sequential auto sync only
|
||||
for i in range(self.num_nodes):
|
||||
self.zgs_node_configs[i] = {
|
||||
"sync": {
|
||||
"auto_sync_enabled": True,
|
||||
"max_sequential_workers": 3,
|
||||
"max_random_workers": 0,
|
||||
}
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
# Submit and upload files on node 0
|
||||
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
||||
|
||||
# Files should be available on node 1 via auto sync
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
|
||||
|
||||
if __name__ == "__main__":
|
||||
AutoSequentialSyncTest().main()
|
32
tests/sync_auto_test.py
Normal file
32
tests/sync_auto_test.py
Normal file
@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.utils import wait_until
|
||||
|
||||
class AutoSyncTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_nodes = 2
|
||||
|
||||
# Enable auto sync
|
||||
for i in range(self.num_nodes):
|
||||
self.zgs_node_configs[i] = {
|
||||
"sync": {
|
||||
"auto_sync_enabled": True,
|
||||
"max_sequential_workers": 3,
|
||||
"max_random_workers": 3,
|
||||
}
|
||||
}
|
||||
|
||||
def run_test(self):
|
||||
# Submit and upload files on node 0
|
||||
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||
data_root_2 = self.__upload_file__(0, 256 * 1024)
|
||||
|
||||
# Files should be available on node 1 via auto sync
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
|
||||
|
||||
if __name__ == "__main__":
|
||||
AutoSyncTest().main()
|
@ -4,8 +4,7 @@ import random
|
||||
import time
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.submission import create_submission
|
||||
from utility.submission import submit_data, data_to_segments
|
||||
from utility.submission import data_to_segments
|
||||
from utility.utils import (
|
||||
assert_equal,
|
||||
wait_until,
|
||||
@ -13,9 +12,7 @@ from utility.utils import (
|
||||
|
||||
class SyncTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_blockchain_nodes = 2
|
||||
self.num_nodes = 2
|
||||
self.__deployed_contracts = 0
|
||||
|
||||
def run_test(self):
|
||||
# By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
|
||||
@ -32,18 +29,7 @@ class SyncTest(TestFramework):
|
||||
# stop client2, preventing it from receiving AnnounceFile
|
||||
client2.shutdown()
|
||||
|
||||
# Create submission
|
||||
chunk_data = random.randbytes(256 * 1024)
|
||||
data_root = self.__create_submission(chunk_data)
|
||||
|
||||
# Ensure log entry sync from blockchain node
|
||||
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
|
||||
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)
|
||||
|
||||
# Upload file to storage node
|
||||
segments = submit_data(client1, chunk_data)
|
||||
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
|
||||
wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"])
|
||||
data_root = self.__upload_file__(0, 256 * 1024)
|
||||
|
||||
# restart client2
|
||||
client2.start()
|
||||
@ -75,7 +61,7 @@ class SyncTest(TestFramework):
|
||||
|
||||
# Prepare 3 segments to upload
|
||||
chunk_data = random.randbytes(256 * 1024 * 3)
|
||||
data_root = self.__create_submission(chunk_data)
|
||||
data_root = self.__submit_file__(chunk_data)
|
||||
|
||||
# Ensure log entry sync from blockchain node
|
||||
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
|
||||
@ -111,13 +97,5 @@ class SyncTest(TestFramework):
|
||||
# Validate data
|
||||
assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
|
||||
|
||||
def __create_submission(self, chunk_data: bytes) -> str:
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
self.__deployed_contracts += 1
|
||||
wait_until(lambda: self.contract.num_submissions() == self.__deployed_contracts)
|
||||
self.log.info("Submission created, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
|
||||
return data_root
|
||||
|
||||
if __name__ == "__main__":
|
||||
SyncTest().main()
|
||||
|
@ -2,9 +2,7 @@ import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import rlp
|
||||
|
||||
from eth_utils import decode_hex, keccak
|
||||
from web3 import Web3, HTTPProvider
|
||||
from web3.middleware import construct_sign_and_send_raw_middleware
|
||||
from enum import Enum, unique
|
||||
@ -12,7 +10,6 @@ from config.node_config import (
|
||||
GENESIS_PRIV_KEY,
|
||||
GENESIS_PRIV_KEY1,
|
||||
TX_PARAMS,
|
||||
MINER_ID,
|
||||
)
|
||||
from utility.simple_rpc_proxy import SimpleRpcProxy
|
||||
from utility.utils import (
|
||||
|
@ -1,7 +1,5 @@
|
||||
from pathlib import Path
|
||||
import json
|
||||
from web3 import Web3
|
||||
|
||||
|
||||
def load_contract_metadata(path: str, name: str):
|
||||
path = Path(path)
|
||||
|
@ -20,8 +20,9 @@ from test_framework.zgs_node import ZgsNode
|
||||
from test_framework.blockchain_node import BlockChainNodeType
|
||||
from test_framework.conflux_node import ConfluxNode, connect_sample_nodes
|
||||
from test_framework.zg_node import ZGNode, zg_node_init_genesis
|
||||
from utility.utils import PortMin, is_windows_platform, wait_until
|
||||
from utility.utils import PortMin, is_windows_platform, wait_until, assert_equal
|
||||
from utility.build_binary import build_cli
|
||||
from utility.submission import create_submission, submit_data
|
||||
|
||||
__file_path__ = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
@ -40,8 +41,8 @@ class TestFramework:
|
||||
if "http_proxy" in os.environ:
|
||||
del os.environ["http_proxy"]
|
||||
|
||||
self.num_blockchain_nodes = None
|
||||
self.num_nodes = None
|
||||
self.num_blockchain_nodes = 1
|
||||
self.num_nodes = 1
|
||||
self.blockchain_nodes = []
|
||||
self.nodes = []
|
||||
self.contract = None
|
||||
@ -53,6 +54,7 @@ class TestFramework:
|
||||
self.mine_period = 100
|
||||
self.lifetime_seconds = 3600
|
||||
self.launch_wait_seconds = 1
|
||||
self.num_deployed_contracts = 0
|
||||
|
||||
# Set default binary path
|
||||
binary_ext = ".exe" if is_windows_platform() else ""
|
||||
@ -398,6 +400,31 @@ class TestFramework:
|
||||
|
||||
return root
|
||||
|
||||
def __submit_file__(self, chunk_data: bytes) -> str:
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
self.num_deployed_contracts += 1
|
||||
wait_until(lambda: self.contract.num_submissions() == self.num_deployed_contracts)
|
||||
self.log.info("Submission completed, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
|
||||
return data_root
|
||||
|
||||
def __upload_file__(self, node_index: int, random_data_size: int) -> str:
|
||||
# Create submission
|
||||
chunk_data = random.randbytes(random_data_size)
|
||||
data_root = self.__submit_file__(chunk_data)
|
||||
|
||||
# Ensure log entry sync from blockchain node
|
||||
client = self.nodes[node_index]
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
|
||||
assert_equal(client.zgs_get_file_info(data_root)["finalized"], False)
|
||||
|
||||
# Upload file to storage node
|
||||
segments = submit_data(client, chunk_data)
|
||||
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
|
||||
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
return data_root
|
||||
|
||||
def setup_params(self):
|
||||
self.num_blockchain_nodes = 1
|
||||
self.num_nodes = 1
|
||||
|
@ -2,9 +2,8 @@ import os
|
||||
import shutil
|
||||
import base64
|
||||
|
||||
from config.node_config import ZGS_CONFIG
|
||||
from config.node_config import ZGS_CONFIG, update_config
|
||||
from test_framework.blockchain_node import NodeType, TestNode
|
||||
from config.node_config import MINER_ID
|
||||
from utility.utils import (
|
||||
initialize_toml_config,
|
||||
p2p_port,
|
||||
@ -12,7 +11,6 @@ from utility.utils import (
|
||||
blockchain_rpc_port,
|
||||
)
|
||||
|
||||
|
||||
class ZgsNode(TestNode):
|
||||
def __init__(
|
||||
self,
|
||||
@ -48,9 +46,9 @@ class ZgsNode(TestNode):
|
||||
"blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}",
|
||||
}
|
||||
# Set configs for this specific node.
|
||||
local_conf.update(indexed_config)
|
||||
update_config(local_conf, indexed_config)
|
||||
# Overwrite with personalized configs.
|
||||
local_conf.update(updated_config)
|
||||
update_config(local_conf, updated_config)
|
||||
data_dir = os.path.join(root_dir, "zgs_node" + str(index))
|
||||
rpc_url = "http://" + local_conf["rpc_listen_address"]
|
||||
super().__init__(
|
||||
|
@ -1,6 +1,6 @@
|
||||
import base64
|
||||
|
||||
from eth_utils import encode_hex, decode_hex
|
||||
from eth_utils import decode_hex
|
||||
from math import log2
|
||||
from utility.merkle_tree import add_0x_prefix, Leaf, MerkleTree
|
||||
from utility.spec import ENTRY_SIZE, PORA_CHUNK_SIZE
|
||||
|
@ -1,15 +1,10 @@
|
||||
import base64
|
||||
import inspect
|
||||
import os
|
||||
import platform
|
||||
import rtoml
|
||||
import time
|
||||
import sha3
|
||||
|
||||
from config.node_config import ZGS_CONFIG
|
||||
from eth_utils import encode_hex
|
||||
|
||||
|
||||
class PortMin:
|
||||
# Must be initialized with a unique integer for each process
|
||||
n = 11000
|
||||
|
Loading…
Reference in New Issue
Block a user