From b6972b97af06afcb2d1aea8ba7bd7f40861654b2 Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:09:29 +0800 Subject: [PATCH] Adjust default value for testnet configs (#180) * Adjust default value for testnet configs * Supports to disable sequential auto sync * Add py tests for auto sync * fix py file name * rm dummy ;py code * change default block confirm count --- node/file_location_cache/src/lib.rs | 4 +- node/src/config/mod.rs | 2 +- node/sync/src/auto_sync/batcher_random.rs | 13 +- node/sync/src/auto_sync/batcher_serial.rs | 30 +++- node/sync/src/auto_sync/sync_store.rs | 206 ++++++++++++---------- node/sync/src/lib.rs | 4 +- run/config-testnet-standard.toml | 10 +- run/config-testnet-turbo.toml | 10 +- run/config.toml | 10 +- tests/config/node_config.py | 10 ++ tests/random_test.py | 8 +- tests/sync_auto_random_test.py | 32 ++++ tests/sync_auto_sequential_test.py | 32 ++++ tests/sync_auto_test.py | 32 ++++ tests/sync_test.py | 28 +-- tests/test_framework/blockchain_node.py | 3 - tests/test_framework/contracts.py | 2 - tests/test_framework/test_framework.py | 33 +++- tests/test_framework/zgs_node.py | 8 +- tests/utility/submission.py | 2 +- tests/utility/utils.py | 5 - 21 files changed, 305 insertions(+), 179 deletions(-) create mode 100644 tests/sync_auto_random_test.py create mode 100644 tests/sync_auto_sequential_test.py create mode 100644 tests/sync_auto_test.py diff --git a/node/file_location_cache/src/lib.rs b/node/file_location_cache/src/lib.rs index ff12c5e..d0e3248 100644 --- a/node/file_location_cache/src/lib.rs +++ b/node/file_location_cache/src/lib.rs @@ -16,9 +16,9 @@ pub struct Config { impl Default for Config { fn default() -> Self { Config { - max_entries_total: 256000, + max_entries_total: 1000000, max_entries_per_file: 4, - entry_expiration_time_secs: 3600, + entry_expiration_time_secs: 86400, } } } diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index ff6ab1c..0896a0a 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -33,7 +33,7 @@ build_config! { (blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string()) (log_contract_address, (String), "".to_string()) (log_sync_start_block_number, (u64), 0) - (confirmation_block_count, (u64), 12) + (confirmation_block_count, (u64), 3) (log_page_size, (u64), 999) (max_cache_data_size, (usize), 100 * 1024 * 1024) // 100 MB (cache_tx_seq_ttl, (usize), 500) diff --git a/node/sync/src/auto_sync/batcher_random.rs b/node/sync/src/auto_sync/batcher_random.rs index a070eb1..16cd5f2 100644 --- a/node/sync/src/auto_sync/batcher_random.rs +++ b/node/sync/src/auto_sync/batcher_random.rs @@ -1,6 +1,6 @@ use super::{batcher::Batcher, sync_store::SyncStore}; use crate::{ - auto_sync::{batcher::SyncResult, metrics}, + auto_sync::{batcher::SyncResult, metrics, sync_store::Queue}, Config, SyncSender, }; use anyhow::Result; @@ -108,16 +108,17 @@ impl RandomBatcher { SyncResult::Timeout => metrics::RANDOM_SYNC_RESULT_TIMEOUT.inc(1), } - match sync_result { - SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?, - _ => self.sync_store.downgrade_tx_to_pending(tx_seq).await?, - }; + if matches!(sync_result, SyncResult::Completed) { + self.sync_store.remove(tx_seq).await?; + } else { + self.sync_store.insert(tx_seq, Queue::Pending).await?; + } Ok(true) } async fn schedule(&mut self) -> Result { - let tx_seq = match self.sync_store.random_tx().await? { + let tx_seq = match self.sync_store.random().await? { Some(v) => v, None => return Ok(false), }; diff --git a/node/sync/src/auto_sync/batcher_serial.rs b/node/sync/src/auto_sync/batcher_serial.rs index e4a3986..25f74e0 100644 --- a/node/sync/src/auto_sync/batcher_serial.rs +++ b/node/sync/src/auto_sync/batcher_serial.rs @@ -2,7 +2,10 @@ use super::{ batcher::{Batcher, SyncResult}, sync_store::SyncStore, }; -use crate::{auto_sync::metrics, Config, SyncSender}; +use crate::{ + auto_sync::{metrics, sync_store::Queue}, + Config, SyncSender, +}; use anyhow::Result; use log_entry_sync::LogSyncEvent; use serde::{Deserialize, Serialize}; @@ -200,7 +203,7 @@ impl SerialBatcher { } // otherwise, mark tx as ready for sync - if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await { + if let Err(err) = self.sync_store.upgrade(announced_tx_seq).await { error!(%err, %announced_tx_seq, "Failed to promote announced tx to ready, state = {:?}", self.get_state().await); } @@ -280,11 +283,28 @@ impl SerialBatcher { /// Schedule file sync in sequence. async fn schedule_next(&mut self) -> Result { - let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed); - if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) { + let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed); + if max_tx_seq == u64::MAX { return Ok(false); } + let mut next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed); + if next_tx_seq > max_tx_seq { + return Ok(false); + } + + // If sequential sync disabled, delegate to random sync. + if self.config.max_sequential_workers == 0 { + self.sync_store.insert(next_tx_seq, Queue::Ready).await?; + + next_tx_seq += 1; + self.sync_store.set_next_tx_seq(next_tx_seq).await?; + self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed); + self.next_tx_seq_in_db.store(next_tx_seq, Ordering::Relaxed); + + return Ok(true); + } + if !self.batcher.add(next_tx_seq).await? { return Ok(false); } @@ -309,7 +329,7 @@ impl SerialBatcher { // downgrade to random sync if file sync failed or timeout if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) { - self.sync_store.add_pending_tx(current).await?; + self.sync_store.insert(current, Queue::Pending).await?; } // always move forward in db diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index 8c59448..796dc1a 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -8,6 +8,20 @@ use tokio::sync::RwLock; const KEY_NEXT_TX_SEQ: &str = "sync.manager.next_tx_seq"; const KEY_MAX_TX_SEQ: &str = "sync.manager.max_tx_seq"; +#[derive(Debug, PartialEq, Eq)] +pub enum Queue { + Ready, + Pending, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum InsertResult { + NewAdded, // new added in target queue + AlreadyExists, // already exists in target queue + Upgraded, // upgraded from pending queue to ready queue + Downgraded, // downgraged from ready queue to pending queue +} + pub struct SyncStore { store: Arc>, @@ -64,31 +78,69 @@ impl SyncStore { store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq) } - pub async fn add_pending_tx(&self, tx_seq: u64) -> Result { - let async_store = self.store.write().await; + pub async fn contains(&self, tx_seq: u64) -> Result> { + let async_store = self.store.read().await; let store = async_store.get_store(); - // already in ready queue if self.ready_txs.has(store, tx_seq)? { - return Ok(false); + return Ok(Some(Queue::Ready)); } - // always add in pending queue - self.pending_txs.add(store, None, tx_seq) + if self.pending_txs.has(store, tx_seq)? { + return Ok(Some(Queue::Pending)); + } + + Ok(None) } - pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result { + pub async fn insert(&self, tx_seq: u64, queue: Queue) -> Result { + let async_store = self.store.write().await; + let store = async_store.get_store(); + + let mut tx = ConfigTx::default(); + + match queue { + Queue::Ready => { + if !self.ready_txs.add(store, Some(&mut tx), tx_seq)? { + return Ok(InsertResult::AlreadyExists); + } + + let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?; + store.exec_configs(tx)?; + + if removed { + Ok(InsertResult::Upgraded) + } else { + Ok(InsertResult::NewAdded) + } + } + Queue::Pending => { + if !self.pending_txs.add(store, Some(&mut tx), tx_seq)? { + return Ok(InsertResult::AlreadyExists); + } + + let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?; + store.exec_configs(tx)?; + + if removed { + Ok(InsertResult::Downgraded) + } else { + Ok(InsertResult::NewAdded) + } + } + } + } + + pub async fn upgrade(&self, tx_seq: u64) -> Result { let async_store = self.store.write().await; let store = async_store.get_store(); let mut tx = ConfigTx::default(); - // not in pending queue if !self.pending_txs.remove(store, Some(&mut tx), tx_seq)? { return Ok(false); } - // move from pending to ready queue let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?; store.exec_configs(tx)?; @@ -96,26 +148,7 @@ impl SyncStore { Ok(added) } - pub async fn downgrade_tx_to_pending(&self, tx_seq: u64) -> Result { - let async_store = self.store.write().await; - let store = async_store.get_store(); - - let mut tx = ConfigTx::default(); - - // not in ready queue - if !self.ready_txs.remove(store, Some(&mut tx), tx_seq)? { - return Ok(false); - } - - // move from ready to pending queue - let added = self.pending_txs.add(store, Some(&mut tx), tx_seq)?; - - store.exec_configs(tx)?; - - Ok(added) - } - - pub async fn random_tx(&self) -> Result> { + pub async fn random(&self) -> Result> { let async_store = self.store.read().await; let store = async_store.get_store(); @@ -128,17 +161,21 @@ impl SyncStore { self.pending_txs.random(store) } - pub async fn remove_tx(&self, tx_seq: u64) -> Result { + pub async fn remove(&self, tx_seq: u64) -> Result> { let async_store = self.store.write().await; let store = async_store.get_store(); // removed in ready queue if self.ready_txs.remove(store, None, tx_seq)? { - return Ok(true); + return Ok(Some(Queue::Ready)); } - // otherwise, try to remove in pending queue - self.pending_txs.remove(store, None, tx_seq) + // removed in pending queue + if self.pending_txs.remove(store, None, tx_seq)? { + return Ok(Some(Queue::Pending)); + } + + Ok(None) } } @@ -146,7 +183,7 @@ impl SyncStore { mod tests { use crate::test_util::tests::TestStoreRuntime; - use super::SyncStore; + use super::{InsertResult::*, Queue::*, SyncStore}; #[tokio::test] async fn test_tx_seq_range() { @@ -165,106 +202,79 @@ mod tests { } #[tokio::test] - async fn test_add_pending_tx() { + async fn test_insert() { let runtime = TestStoreRuntime::default(); let store = SyncStore::new(runtime.store.clone()); - // add pending tx 3 - assert!(store.add_pending_tx(3).await.unwrap()); + assert_eq!(store.contains(1).await.unwrap(), None); + assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded); + assert_eq!(store.contains(1).await.unwrap(), Some(Pending)); + assert_eq!(store.insert(1, Pending).await.unwrap(), AlreadyExists); + assert_eq!(store.insert(1, Ready).await.unwrap(), Upgraded); + assert_eq!(store.contains(1).await.unwrap(), Some(Ready)); - // cannot add pending tx 3 again - assert!(!store.add_pending_tx(3).await.unwrap()); + assert_eq!(store.insert(2, Ready).await.unwrap(), NewAdded); + assert_eq!(store.contains(2).await.unwrap(), Some(Ready)); + assert_eq!(store.insert(2, Ready).await.unwrap(), AlreadyExists); + assert_eq!(store.insert(2, Pending).await.unwrap(), Downgraded); + assert_eq!(store.contains(2).await.unwrap(), Some(Pending)); } #[tokio::test] - async fn test_upgrade_tx() { + async fn test_upgrade() { let runtime = TestStoreRuntime::default(); let store = SyncStore::new(runtime.store.clone()); // cannot upgrade by default - assert!(!store.upgrade_tx_to_ready(3).await.unwrap()); + assert!(!store.upgrade(3).await.unwrap()); // add pending tx 3 - assert!(store.add_pending_tx(3).await.unwrap()); + assert_eq!(store.insert(3, Pending).await.unwrap(), NewAdded); // can upgrade to ready - assert!(store.upgrade_tx_to_ready(3).await.unwrap()); - - // cannot add pending tx 3 again event upgraded to ready - assert!(!store.add_pending_tx(3).await.unwrap()); + assert!(store.upgrade(3).await.unwrap()); + assert_eq!(store.contains(3).await.unwrap(), Some(Ready)); // cannot upgrade again - assert!(!store.upgrade_tx_to_ready(3).await.unwrap()); + assert!(!store.upgrade(3).await.unwrap()); } #[tokio::test] - async fn test_downgrade_tx() { - let runtime = TestStoreRuntime::default(); - let store = SyncStore::new(runtime.store.clone()); - - // cannot downgrade by default - assert!(!store.downgrade_tx_to_pending(3).await.unwrap()); - - // add pending tx 3 - assert!(store.add_pending_tx(3).await.unwrap()); - - // cannot downgrade for non-ready tx - assert!(!store.downgrade_tx_to_pending(3).await.unwrap()); - - // upgrade tx 3 to ready - assert!(store.upgrade_tx_to_ready(3).await.unwrap()); - - // can downgrade now - assert!(store.downgrade_tx_to_pending(3).await.unwrap()); - - // cannot downgrade now - assert!(!store.downgrade_tx_to_pending(3).await.unwrap()); - } - - #[tokio::test] - async fn test_random_tx() { + async fn test_random() { let runtime = TestStoreRuntime::default(); let store = SyncStore::new(runtime.store.clone()); // no tx by default - assert_eq!(store.random_tx().await.unwrap(), None); + assert_eq!(store.random().await.unwrap(), None); // add pending txs 1, 2, 3 - assert!(store.add_pending_tx(1).await.unwrap()); - assert!(store.add_pending_tx(2).await.unwrap()); - assert!(store.add_pending_tx(3).await.unwrap()); - let tx = store.random_tx().await.unwrap().unwrap(); + assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded); + assert_eq!(store.insert(2, Pending).await.unwrap(), NewAdded); + assert_eq!(store.insert(3, Pending).await.unwrap(), NewAdded); + let tx = store.random().await.unwrap().unwrap(); assert!((1..=3).contains(&tx)); - // upgrade tx 1 to ready - assert!(store.upgrade_tx_to_ready(2).await.unwrap()); - assert_eq!(store.random_tx().await.unwrap(), Some(2)); + // upgrade tx 2 to ready + assert_eq!(store.insert(2, Ready).await.unwrap(), Upgraded); + assert_eq!(store.random().await.unwrap(), Some(2)); } #[tokio::test] - async fn test_remove_tx() { + async fn test_remove() { let runtime = TestStoreRuntime::default(); let store = SyncStore::new(runtime.store.clone()); // cannot remove by default - assert!(!store.remove_tx(1).await.unwrap()); + assert_eq!(store.remove(1).await.unwrap(), None); - // add pending tx 1, 2 - assert!(store.add_pending_tx(1).await.unwrap()); - assert!(store.add_pending_tx(2).await.unwrap()); + // add tx 1, 2 + assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded); + assert_eq!(store.insert(2, Ready).await.unwrap(), NewAdded); - // upgrade tx 1 to ready - assert!(store.upgrade_tx_to_ready(1).await.unwrap()); - assert_eq!(store.random_tx().await.unwrap(), Some(1)); - - // remove tx 1 - assert!(store.remove_tx(1).await.unwrap()); - assert_eq!(store.random_tx().await.unwrap(), Some(2)); - assert!(!store.remove_tx(1).await.unwrap()); - - // remove tx 2 - assert!(store.remove_tx(2).await.unwrap()); - assert_eq!(store.random_tx().await.unwrap(), None); - assert!(!store.remove_tx(2).await.unwrap()); + // remove txs + assert_eq!(store.remove(1).await.unwrap(), Some(Pending)); + assert_eq!(store.remove(1).await.unwrap(), None); + assert_eq!(store.remove(2).await.unwrap(), Some(Ready)); + assert_eq!(store.remove(2).await.unwrap(), None); } } diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 5c2feb0..9d0863e 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -80,8 +80,8 @@ impl Default for Config { // auto sync config auto_sync_idle_interval: Duration::from_secs(3), auto_sync_error_interval: Duration::from_secs(10), - max_sequential_workers: 24, - max_random_workers: 8, + max_sequential_workers: 0, + max_random_workers: 30, sequential_find_peer_timeout: Duration::from_secs(60), random_find_peer_timeout: Duration::from_secs(500), } diff --git a/run/config-testnet-standard.toml b/run/config-testnet-standard.toml index c92e212..aac7455 100644 --- a/run/config-testnet-standard.toml +++ b/run/config-testnet-standard.toml @@ -87,7 +87,7 @@ log_contract_address = "0x0460aA47b41a66694c0a73f667a1b795A5ED3556" log_sync_start_block_number = 595059 # Number of blocks to confirm a transaction. -confirmation_block_count = 6 +# confirmation_block_count = 3 # Maximum number of event logs to poll at a time. # log_page_size = 999 @@ -264,10 +264,10 @@ auto_sync_enabled = true # peer_chunks_download_timeout = "15s" # Maximum threads to sync files in sequence. -# max_sequential_workers = 24 +# max_sequential_workers = 0 # Maximum threads to sync files randomly. -# max_random_workers = 8 +# max_random_workers = 30 # Timeout to terminate a file sync in sequence. # sequential_find_peer_timeout = "60s" @@ -287,14 +287,14 @@ auto_sync_enabled = true # When the cache is full, the storage position information with oldest timestamp will be replaced. # Global cache capacity. -# max_entries_total = 256000 +# max_entries_total = 1000000 # Location information capacity for each file. # max_entries_per_file = 4 # Validity period of location information. # If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache. -# entry_expiration_time_secs = 3600 +# entry_expiration_time_secs = 86400 ####################################################################### ### Metrics Options ### diff --git a/run/config-testnet-turbo.toml b/run/config-testnet-turbo.toml index 6d011cd..4ba6fbf 100644 --- a/run/config-testnet-turbo.toml +++ b/run/config-testnet-turbo.toml @@ -87,7 +87,7 @@ log_contract_address = "0xbD2C3F0E65eDF5582141C35969d66e34629cC768" log_sync_start_block_number = 595059 # Number of blocks to confirm a transaction. -confirmation_block_count = 6 +# confirmation_block_count = 3 # Maximum number of event logs to poll at a time. # log_page_size = 999 @@ -276,10 +276,10 @@ auto_sync_enabled = true # peer_chunks_download_timeout = "15s" # Maximum threads to sync files in sequence. -# max_sequential_workers = 24 +# max_sequential_workers = 0 # Maximum threads to sync files randomly. -# max_random_workers = 8 +# max_random_workers = 30 # Timeout to terminate a file sync in sequence. # sequential_find_peer_timeout = "60s" @@ -299,14 +299,14 @@ auto_sync_enabled = true # When the cache is full, the storage position information with oldest timestamp will be replaced. # Global cache capacity. -# max_entries_total = 256000 +# max_entries_total = 1000000 # Location information capacity for each file. # max_entries_per_file = 4 # Validity period of location information. # If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache. -# entry_expiration_time_secs = 3600 +# entry_expiration_time_secs = 86400 ####################################################################### ### Metrics Options ### diff --git a/run/config.toml b/run/config.toml index ff6a472..48b93b5 100644 --- a/run/config.toml +++ b/run/config.toml @@ -87,7 +87,7 @@ # log_sync_start_block_number = 0 # Number of blocks to confirm a transaction. -# confirmation_block_count = 12 +# confirmation_block_count = 3 # Maximum number of event logs to poll at a time. # log_page_size = 999 @@ -278,10 +278,10 @@ # peer_chunks_download_timeout = "15s" # Maximum threads to sync files in sequence. -# max_sequential_workers = 24 +# max_sequential_workers = 0 # Maximum threads to sync files randomly. -# max_random_workers = 8 +# max_random_workers = 30 # Timeout to terminate a file sync in sequence. # sequential_find_peer_timeout = "60s" @@ -301,14 +301,14 @@ # When the cache is full, the storage position information with oldest timestamp will be replaced. # Global cache capacity. -# max_entries_total = 256000 +# max_entries_total = 1000000 # Location information capacity for each file. # max_entries_per_file = 4 # Validity period of location information. # If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache. -# entry_expiration_time_secs = 3600 +# entry_expiration_time_secs = 86400 ####################################################################### ### Metrics Options ### diff --git a/tests/config/node_config.py b/tests/config/node_config.py index ed53377..35f0754 100644 --- a/tests/config/node_config.py +++ b/tests/config/node_config.py @@ -63,3 +63,13 @@ TX_PARAMS1 = { NO_SEAL_FLAG = 0x1 NO_MERKLE_PROOF_FLAG = 0x2 + +def update_config(default: dict, custom: dict): + """ + Supports to update configurations with dict value. + """ + for (key, value) in custom.items(): + if default.get(key) is None or type(value) != dict: + default[key] = value + else: + update_config(default[key], value) diff --git a/tests/random_test.py b/tests/random_test.py index fc1f9b1..2edb882 100755 --- a/tests/random_test.py +++ b/tests/random_test.py @@ -5,18 +5,14 @@ import random from test_framework.test_framework import TestFramework from utility.submission import create_submission from utility.submission import submit_data -from utility.utils import ( - assert_equal, - wait_until, -) - +from utility.utils import wait_until class RandomTest(TestFramework): def setup_params(self): self.num_blockchain_nodes = 1 self.num_nodes = 4 for i in range(self.num_nodes): - self.zgs_node_configs[i] = {"find_peer_timeout_secs": 1, "confirmation_block_count": 1, "sync": {"auto_sync_enabled": True}} + self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}} def run_test(self): max_size = 256 * 1024 * 64 diff --git a/tests/sync_auto_random_test.py b/tests/sync_auto_random_test.py new file mode 100644 index 0000000..5b572f2 --- /dev/null +++ b/tests/sync_auto_random_test.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +from test_framework.test_framework import TestFramework +from utility.utils import wait_until + +class AutoRandomSyncTest(TestFramework): + def setup_params(self): + self.num_nodes = 2 + + # Enable random auto sync only + for i in range(self.num_nodes): + self.zgs_node_configs[i] = { + "sync": { + "auto_sync_enabled": True, + "max_sequential_workers": 0, + "max_random_workers": 3, + } + } + + def run_test(self): + # Submit and upload files on node 0 + data_root_1 = self.__upload_file__(0, 256 * 1024) + data_root_2 = self.__upload_file__(0, 256 * 1024) + + # Files should be available on node 1 via auto sync + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"]) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"]) + +if __name__ == "__main__": + AutoRandomSyncTest().main() diff --git a/tests/sync_auto_sequential_test.py b/tests/sync_auto_sequential_test.py new file mode 100644 index 0000000..c2103bd --- /dev/null +++ b/tests/sync_auto_sequential_test.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +from test_framework.test_framework import TestFramework +from utility.utils import wait_until + +class AutoSequentialSyncTest(TestFramework): + def setup_params(self): + self.num_nodes = 2 + + # Enable sequential auto sync only + for i in range(self.num_nodes): + self.zgs_node_configs[i] = { + "sync": { + "auto_sync_enabled": True, + "max_sequential_workers": 3, + "max_random_workers": 0, + } + } + + def run_test(self): + # Submit and upload files on node 0 + data_root_1 = self.__upload_file__(0, 256 * 1024) + data_root_2 = self.__upload_file__(0, 256 * 1024) + + # Files should be available on node 1 via auto sync + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"]) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"]) + +if __name__ == "__main__": + AutoSequentialSyncTest().main() diff --git a/tests/sync_auto_test.py b/tests/sync_auto_test.py new file mode 100644 index 0000000..cc7dd30 --- /dev/null +++ b/tests/sync_auto_test.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +from test_framework.test_framework import TestFramework +from utility.utils import wait_until + +class AutoSyncTest(TestFramework): + def setup_params(self): + self.num_nodes = 2 + + # Enable auto sync + for i in range(self.num_nodes): + self.zgs_node_configs[i] = { + "sync": { + "auto_sync_enabled": True, + "max_sequential_workers": 3, + "max_random_workers": 3, + } + } + + def run_test(self): + # Submit and upload files on node 0 + data_root_1 = self.__upload_file__(0, 256 * 1024) + data_root_2 = self.__upload_file__(0, 256 * 1024) + + # Files should be available on node 1 via auto sync + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"]) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None) + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"]) + +if __name__ == "__main__": + AutoSyncTest().main() diff --git a/tests/sync_test.py b/tests/sync_test.py index 074b01f..a07f923 100755 --- a/tests/sync_test.py +++ b/tests/sync_test.py @@ -4,8 +4,7 @@ import random import time from test_framework.test_framework import TestFramework -from utility.submission import create_submission -from utility.submission import submit_data, data_to_segments +from utility.submission import data_to_segments from utility.utils import ( assert_equal, wait_until, @@ -13,9 +12,7 @@ from utility.utils import ( class SyncTest(TestFramework): def setup_params(self): - self.num_blockchain_nodes = 2 self.num_nodes = 2 - self.__deployed_contracts = 0 def run_test(self): # By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false, @@ -32,19 +29,8 @@ class SyncTest(TestFramework): # stop client2, preventing it from receiving AnnounceFile client2.shutdown() - # Create submission - chunk_data = random.randbytes(256 * 1024) - data_root = self.__create_submission(chunk_data) + data_root = self.__upload_file__(0, 256 * 1024) - # Ensure log entry sync from blockchain node - wait_until(lambda: client1.zgs_get_file_info(data_root) is not None) - assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False) - - # Upload file to storage node - segments = submit_data(client1, chunk_data) - self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]) - wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"]) - # restart client2 client2.start() client2.wait_for_rpc_connection() @@ -75,7 +61,7 @@ class SyncTest(TestFramework): # Prepare 3 segments to upload chunk_data = random.randbytes(256 * 1024 * 3) - data_root = self.__create_submission(chunk_data) + data_root = self.__submit_file__(chunk_data) # Ensure log entry sync from blockchain node wait_until(lambda: client1.zgs_get_file_info(data_root) is not None) @@ -111,13 +97,5 @@ class SyncTest(TestFramework): # Validate data assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256]) - def __create_submission(self, chunk_data: bytes) -> str: - submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions) - self.__deployed_contracts += 1 - wait_until(lambda: self.contract.num_submissions() == self.__deployed_contracts) - self.log.info("Submission created, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions) - return data_root - if __name__ == "__main__": SyncTest().main() diff --git a/tests/test_framework/blockchain_node.py b/tests/test_framework/blockchain_node.py index 0fbe1a5..54b2c26 100644 --- a/tests/test_framework/blockchain_node.py +++ b/tests/test_framework/blockchain_node.py @@ -2,9 +2,7 @@ import os import subprocess import tempfile import time -import rlp -from eth_utils import decode_hex, keccak from web3 import Web3, HTTPProvider from web3.middleware import construct_sign_and_send_raw_middleware from enum import Enum, unique @@ -12,7 +10,6 @@ from config.node_config import ( GENESIS_PRIV_KEY, GENESIS_PRIV_KEY1, TX_PARAMS, - MINER_ID, ) from utility.simple_rpc_proxy import SimpleRpcProxy from utility.utils import ( diff --git a/tests/test_framework/contracts.py b/tests/test_framework/contracts.py index 8df3811..cea6f12 100644 --- a/tests/test_framework/contracts.py +++ b/tests/test_framework/contracts.py @@ -1,7 +1,5 @@ from pathlib import Path import json -from web3 import Web3 - def load_contract_metadata(path: str, name: str): path = Path(path) diff --git a/tests/test_framework/test_framework.py b/tests/test_framework/test_framework.py index c7c4619..b693eea 100644 --- a/tests/test_framework/test_framework.py +++ b/tests/test_framework/test_framework.py @@ -20,8 +20,9 @@ from test_framework.zgs_node import ZgsNode from test_framework.blockchain_node import BlockChainNodeType from test_framework.conflux_node import ConfluxNode, connect_sample_nodes from test_framework.zg_node import ZGNode, zg_node_init_genesis -from utility.utils import PortMin, is_windows_platform, wait_until +from utility.utils import PortMin, is_windows_platform, wait_until, assert_equal from utility.build_binary import build_cli +from utility.submission import create_submission, submit_data __file_path__ = os.path.dirname(os.path.realpath(__file__)) @@ -40,8 +41,8 @@ class TestFramework: if "http_proxy" in os.environ: del os.environ["http_proxy"] - self.num_blockchain_nodes = None - self.num_nodes = None + self.num_blockchain_nodes = 1 + self.num_nodes = 1 self.blockchain_nodes = [] self.nodes = [] self.contract = None @@ -53,6 +54,7 @@ class TestFramework: self.mine_period = 100 self.lifetime_seconds = 3600 self.launch_wait_seconds = 1 + self.num_deployed_contracts = 0 # Set default binary path binary_ext = ".exe" if is_windows_platform() else "" @@ -398,6 +400,31 @@ class TestFramework: return root + def __submit_file__(self, chunk_data: bytes) -> str: + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + self.num_deployed_contracts += 1 + wait_until(lambda: self.contract.num_submissions() == self.num_deployed_contracts) + self.log.info("Submission completed, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions) + return data_root + + def __upload_file__(self, node_index: int, random_data_size: int) -> str: + # Create submission + chunk_data = random.randbytes(random_data_size) + data_root = self.__submit_file__(chunk_data) + + # Ensure log entry sync from blockchain node + client = self.nodes[node_index] + wait_until(lambda: client.zgs_get_file_info(data_root) is not None) + assert_equal(client.zgs_get_file_info(data_root)["finalized"], False) + + # Upload file to storage node + segments = submit_data(client, chunk_data) + self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]) + wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + + return data_root + def setup_params(self): self.num_blockchain_nodes = 1 self.num_nodes = 1 diff --git a/tests/test_framework/zgs_node.py b/tests/test_framework/zgs_node.py index 8ea9122..dbeea11 100644 --- a/tests/test_framework/zgs_node.py +++ b/tests/test_framework/zgs_node.py @@ -2,9 +2,8 @@ import os import shutil import base64 -from config.node_config import ZGS_CONFIG +from config.node_config import ZGS_CONFIG, update_config from test_framework.blockchain_node import NodeType, TestNode -from config.node_config import MINER_ID from utility.utils import ( initialize_toml_config, p2p_port, @@ -12,7 +11,6 @@ from utility.utils import ( blockchain_rpc_port, ) - class ZgsNode(TestNode): def __init__( self, @@ -48,9 +46,9 @@ class ZgsNode(TestNode): "blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}", } # Set configs for this specific node. - local_conf.update(indexed_config) + update_config(local_conf, indexed_config) # Overwrite with personalized configs. - local_conf.update(updated_config) + update_config(local_conf, updated_config) data_dir = os.path.join(root_dir, "zgs_node" + str(index)) rpc_url = "http://" + local_conf["rpc_listen_address"] super().__init__( diff --git a/tests/utility/submission.py b/tests/utility/submission.py index 5a7a87e..1421811 100644 --- a/tests/utility/submission.py +++ b/tests/utility/submission.py @@ -1,6 +1,6 @@ import base64 -from eth_utils import encode_hex, decode_hex +from eth_utils import decode_hex from math import log2 from utility.merkle_tree import add_0x_prefix, Leaf, MerkleTree from utility.spec import ENTRY_SIZE, PORA_CHUNK_SIZE diff --git a/tests/utility/utils.py b/tests/utility/utils.py index 163f53e..8165dae 100644 --- a/tests/utility/utils.py +++ b/tests/utility/utils.py @@ -1,15 +1,10 @@ import base64 import inspect -import os import platform import rtoml import time import sha3 -from config.node_config import ZGS_CONFIG -from eth_utils import encode_hex - - class PortMin: # Must be initialized with a unique integer for each process n = 11000