From cfe4b45c41585c4b450fa7e87da12cdbe9349900 Mon Sep 17 00:00:00 2001 From: 0g-peterzhb <158457852+0g-peterzhb@users.noreply.github.com> Date: Mon, 24 Mar 2025 16:51:53 +0800 Subject: [PATCH] add api of getting available file info by root (#357) * add api of getting available file info by root --- .github/workflows/tests.yml | 5 ++ node/rpc/src/zgs/api.rs | 6 +- node/rpc/src/zgs/impl.rs | 31 ++++++-- node/storage-async/src/lib.rs | 16 +++- node/storage/src/log_store/log_manager.rs | 18 ++++- node/storage/src/log_store/mod.rs | 18 +++-- tests/cache_test.py | 5 +- tests/config/node_config.py | 9 +-- tests/crash_test.py | 50 ++++--------- tests/fuzz_test.py | 18 ++--- tests/long_time_mine_test_local.py | 6 +- tests/mine_test.py | 32 ++++++-- tests/mine_with_market_test.py | 58 +++++++++++---- tests/network_discovery_test.py | 12 ++- tests/network_discovery_upgrade_test.py | 13 +++- tests/network_tcp_shard_test.py | 18 +++-- tests/pruner_test.py | 11 ++- tests/random_test.py | 27 +++++-- tests/root_consistency_test.py | 3 - tests/rpc_test.py | 13 +--- tests/same_root_test.py | 11 ++- tests/shard_submission_test.py | 27 +++---- tests/shard_sync_test.py | 33 +++++--- tests/snapshot_test.py | 7 +- tests/submission_test.py | 12 +-- tests/sync_auto_historical_test.py | 38 +++++++--- tests/sync_auto_test.py | 8 +- tests/sync_chunks_test.py | 35 ++++++--- tests/sync_file_test.py | 6 +- tests/test_all.py | 4 +- tests/test_framework/blockchain_node.py | 85 +++++++++++++-------- tests/test_framework/contract_proxy.py | 43 ++++++----- tests/test_framework/contracts.py | 2 +- tests/test_framework/test_framework.py | 85 ++++++++++++++------- tests/test_framework/zg_node.py | 55 ++++++++++---- tests/test_framework/zgs_node.py | 29 +++++--- tests/utility/build_binary.py | 91 +++++++++++++++++------ tests/utility/merkle_tree.py | 11 ++- tests/utility/run_all.py | 30 +++++++- tests/utility/simple_rpc_proxy.py | 10 ++- tests/utility/spec.py | 2 +- tests/utility/submission.py | 11 ++- tests/utility/utils.py | 12 ++- 43 files changed, 676 insertions(+), 340 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4a937bf..20726c8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -45,6 +45,11 @@ jobs: python-version: '3.9' cache: 'pip' + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.22' + - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/node/rpc/src/zgs/api.rs b/node/rpc/src/zgs/api.rs index 32dcfb4..a922da5 100644 --- a/node/rpc/src/zgs/api.rs +++ b/node/rpc/src/zgs/api.rs @@ -63,7 +63,11 @@ pub trait Rpc { async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult>; #[method(name = "getFileInfo")] - async fn get_file_info(&self, data_root: DataRoot) -> RpcResult>; + async fn get_file_info( + &self, + data_root: DataRoot, + need_available: bool, + ) -> RpcResult>; #[method(name = "getFileInfoByTxSeq")] async fn get_file_info_by_tx_seq(&self, tx_seq: u64) -> RpcResult>; diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 190aa53..f0462b2 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -95,7 +95,7 @@ impl RpcServer for RpcServerImpl { let tx_seq = try_option!( self.ctx .log_store - .get_tx_seq_by_data_root(&data_root) + .get_tx_seq_by_data_root(&data_root, true) .await? ); @@ -121,7 +121,12 @@ impl RpcServer for RpcServerImpl { ) -> RpcResult> { info!(%data_root, %index, "zgs_downloadSegmentWithProof"); - let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?); + let tx = try_option!( + self.ctx + .log_store + .get_tx_by_data_root(&data_root, true) + .await? + ); self.get_segment_with_proof_by_tx(tx, index).await } @@ -144,7 +149,12 @@ impl RpcServer for RpcServerImpl { let seq = match tx_seq_or_root { TxSeqOrRoot::TxSeq(v) => v, TxSeqOrRoot::Root(v) => { - try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?) + try_option!( + self.ctx + .log_store + .get_tx_seq_by_data_root(&v, false) + .await? + ) } }; @@ -163,10 +173,19 @@ impl RpcServer for RpcServerImpl { } } - async fn get_file_info(&self, data_root: DataRoot) -> RpcResult> { + async fn get_file_info( + &self, + data_root: DataRoot, + need_available: bool, + ) -> RpcResult> { debug!(%data_root, "zgs_getFileInfo"); - let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?); + let tx = try_option!( + self.ctx + .log_store + .get_tx_by_data_root(&data_root, need_available) + .await? + ); Ok(Some(self.get_file_info_by_tx(tx).await?)) } @@ -288,7 +307,7 @@ impl RpcServerImpl { let maybe_tx = self .ctx .log_store - .get_tx_by_data_root(&segment.root) + .get_tx_by_data_root(&segment.root, false) .await?; self.put_segment_with_maybe_tx(segment, maybe_tx).await diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index b564f42..57de0e2 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -59,15 +59,23 @@ impl Store { delegate!(fn get_proof_at_root(root: Option, index: u64, length: u64) -> Result); delegate!(fn get_context() -> Result<(DataRoot, u64)>); - pub async fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result> { + pub async fn get_tx_seq_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result> { let root = *data_root; - self.spawn(move |store| store.get_tx_seq_by_data_root(&root)) + self.spawn(move |store| store.get_tx_seq_by_data_root(&root, need_available)) .await } - pub async fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result> { + pub async fn get_tx_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result> { let root = *data_root; - self.spawn(move |store| store.get_tx_by_data_root(&root)) + self.spawn(move |store| store.get_tx_by_data_root(&root, need_available)) .await } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 6a024e2..6e2f934 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -511,7 +511,7 @@ impl LogStoreChunkRead for LogManager { index_start: usize, index_end: usize, ) -> crate::error::Result> { - let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root)?); + let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root, true)?); self.get_chunks_by_tx_and_index_range(tx_seq, index_start, index_end) } @@ -536,13 +536,27 @@ impl LogStoreRead for LogManager { self.tx_store.get_tx_by_seq_number(seq) } - fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result> { + fn get_tx_seq_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> crate::error::Result> { let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?; + let mut available_seq = None; for tx_seq in &seq_list { if self.tx_store.check_tx_completed(*tx_seq)? { // Return the first finalized tx if possible. return Ok(Some(*tx_seq)); } + if need_available + && available_seq.is_none() + && !self.tx_store.check_tx_pruned(*tx_seq)? + { + available_seq = Some(*tx_seq); + } + } + if need_available { + return Ok(available_seq); } // No tx is finalized, return the first one. Ok(seq_list.first().cloned()) diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index fc2f82f..593f6b3 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -31,14 +31,22 @@ pub trait LogStoreRead: LogStoreChunkRead { fn get_tx_by_seq_number(&self, seq: u64) -> Result>; /// Get a transaction by the data root of its data. - /// If all txs are not finalized, return the first one. + /// If all txs are not finalized, return the first one if need available is false. /// Otherwise, return the first finalized tx. - fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result>; + fn get_tx_seq_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result>; - /// If all txs are not finalized, return the first one. + /// If all txs are not finalized, return the first one if need available is false. /// Otherwise, return the first finalized tx. - fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result> { - match self.get_tx_seq_by_data_root(data_root)? { + fn get_tx_by_data_root( + &self, + data_root: &DataRoot, + need_available: bool, + ) -> Result> { + match self.get_tx_seq_by_data_root(data_root, need_available)? { Some(seq) => self.get_tx_by_seq_number(seq), None => Ok(None), } diff --git a/tests/cache_test.py b/tests/cache_test.py index 79df003..5f01015 100755 --- a/tests/cache_test.py +++ b/tests/cache_test.py @@ -17,7 +17,10 @@ class ExampleTest(TestFramework): self.contract.submit(submissions) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) - wait_until(lambda: not client.zgs_get_file_info(data_root)["isCached"] and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1) + wait_until( + lambda: not client.zgs_get_file_info(data_root)["isCached"] + and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1 + ) client.zgs_upload_segment(segments[1]) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) diff --git a/tests/config/node_config.py b/tests/config/node_config.py index a24b213..bf0ad47 100644 --- a/tests/config/node_config.py +++ b/tests/config/node_config.py @@ -7,9 +7,7 @@ ZGS_CONFIG = { "log_config_file": "log_config", "confirmation_block_count": 1, "discv5_disable_ip_limit": True, - "network_peer_manager": { - "heartbeat_interval": "1s" - }, + "network_peer_manager": {"heartbeat_interval": "1s"}, "router": { "private_ip_enabled": True, }, @@ -22,7 +20,7 @@ ZGS_CONFIG = { "auto_sync_idle_interval": "1s", "sequential_find_peer_timeout": "10s", "random_find_peer_timeout": "10s", - } + }, } CONFIG_DIR = os.path.dirname(__file__) @@ -75,11 +73,12 @@ TX_PARAMS1 = { NO_SEAL_FLAG = 0x1 NO_MERKLE_PROOF_FLAG = 0x2 + def update_config(default: dict, custom: dict): """ Supports to update configurations with dict value. """ - for (key, value) in custom.items(): + for key, value in custom.items(): if default.get(key) is None or type(value) != dict: default[key] = value else: diff --git a/tests/crash_test.py b/tests/crash_test.py index 22c09c1..a484645 100755 --- a/tests/crash_test.py +++ b/tests/crash_test.py @@ -20,17 +20,15 @@ class CrashTest(TestFramework): segment = submit_data(self.nodes[0], chunk_data) self.log.info("segment: %s", segment) - wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True) + wait_until( + lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True + ) for i in range(1, self.num_nodes): - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root) is not None - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None) self.nodes[i].admin_start_sync_file(0) self.log.info("wait for node: %s", i) - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"] - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]) # 2: first node runnging, other nodes killed self.log.info("kill node") @@ -56,22 +54,16 @@ class CrashTest(TestFramework): for i in range(2, self.num_nodes): self.start_storage_node(i) self.nodes[i].wait_for_rpc_connection() - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root) is not None - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None) self.nodes[i].admin_start_sync_file(1) self.nodes[i].stop(kill=True) self.start_storage_node(i) self.nodes[i].wait_for_rpc_connection() - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root) is not None - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None) self.nodes[i].admin_start_sync_file(1) - - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"] - ) + + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]) # 4: node[1..] synced contract entries and killed self.log.info("kill node 0") @@ -96,13 +88,9 @@ class CrashTest(TestFramework): self.log.info("wait for node: %s", i) self.start_storage_node(i) self.nodes[i].wait_for_rpc_connection() - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root) is not None - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None) self.nodes[i].admin_start_sync_file(2) - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"] - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]) # 5: node[1..] synced contract entries and killed, sync disorder self.nodes[0].stop(kill=True) @@ -137,21 +125,13 @@ class CrashTest(TestFramework): self.log.info("wait for node: %s", i) self.start_storage_node(i) self.nodes[i].wait_for_rpc_connection() - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None) self.nodes[i].admin_start_sync_file(4) - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"] - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"]) - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root) is not None - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None) self.nodes[i].admin_start_sync_file(3) - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"] - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]) if __name__ == "__main__": diff --git a/tests/fuzz_test.py b/tests/fuzz_test.py index 0c17fe7..c1fefa4 100644 --- a/tests/fuzz_test.py +++ b/tests/fuzz_test.py @@ -45,13 +45,9 @@ class FuzzTest(TestFramework): lock.release() log.info("submit data via client %s", idx) - wait_until( - lambda: nodes[idx].zgs_get_file_info(data_root) is not None - ) + wait_until(lambda: nodes[idx].zgs_get_file_info(data_root) is not None) segment = submit_data(nodes[idx], chunk_data) - wait_until( - lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"] - ) + wait_until(lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"]) lock.acquire() nodes_index.append(idx) @@ -65,17 +61,15 @@ class FuzzTest(TestFramework): lambda: nodes[idx].zgs_get_file_info(data_root) is not None ) - def wait_finalized(): + def wait_finalized(): ret = nodes[idx].zgs_get_file_info(data_root) if ret["finalized"]: return True else: - nodes[idx].admin_start_sync_file(ret['tx']['seq']) + nodes[idx].admin_start_sync_file(ret["tx"]["seq"]) return False - wait_until( - lambda: wait_finalized(), timeout = 180 - ) + wait_until(lambda: wait_finalized(), timeout=180) def run_small_chunk_size(nodes, contract, log): sizes = [i for i in range(1, SAMLL_SIZE + 1)] @@ -84,7 +78,7 @@ class FuzzTest(TestFramework): run_chunk_size(sizes, nodes, contract, log) def run_large_chunk_size(nodes, contract, log): - sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256 )] + sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256)] random.shuffle(sizes) run_chunk_size(sizes, nodes, contract, log) diff --git a/tests/long_time_mine_test_local.py b/tests/long_time_mine_test_local.py index f9cb532..744bd75 100755 --- a/tests/long_time_mine_test_local.py +++ b/tests/long_time_mine_test_local.py @@ -18,7 +18,6 @@ class LongTimeMineTest(TestFramework): self.mine_period = 15 self.launch_wait_seconds = 15 - def submit_data(self, item, size): submissions_before = self.contract.num_submissions() client = self.nodes[0] @@ -44,7 +43,10 @@ class LongTimeMineTest(TestFramework): self.submit_data(b"\x11", 2000) self.log.info("Start mine") - wait_until(lambda: int(blockchain.eth_blockNumber(), 16) > self.mine_period, timeout=180) + wait_until( + lambda: int(blockchain.eth_blockNumber(), 16) > self.mine_period, + timeout=180, + ) self.log.info("Wait for the first mine answer") wait_until(lambda: self.mine_contract.last_mined_epoch() == 1) diff --git a/tests/mine_test.py b/tests/mine_test.py index b4f9536..4ca0a8b 100755 --- a/tests/mine_test.py +++ b/tests/mine_test.py @@ -15,7 +15,11 @@ class MineTest(TestFramework): } self.mine_period = int(45 / self.block_time) self.launch_wait_seconds = 15 - self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) + self.log.info( + "Contract Info: Est. block time %.2f, Mine period %d", + self.block_time, + self.mine_period, + ) def submit_data(self, item, size): submissions_before = self.contract.num_submissions() @@ -37,7 +41,11 @@ class MineTest(TestFramework): first_block = self.contract.first_block() self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16)) - self.log.info("Flow deployment block number %d, epoch 1 start %d", first_block, first_block + self.mine_period) + self.log.info( + "Flow deployment block number %d, epoch 1 start %d", + first_block, + first_block + self.mine_period, + ) wait_until(lambda: self.contract.epoch() >= 1, timeout=180) quality = int(2**256 / 100 / estimate_st_performance()) @@ -54,27 +62,39 @@ class MineTest(TestFramework): self.contract.update_context() self.log.info("Wait for the first mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=180) + wait_until( + lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 + and not self.mine_contract.can_submit(), + timeout=180, + ) self.log.info("Wait for the second mine context release") wait_until(lambda: self.contract.epoch() >= start_epoch + 2, timeout=180) self.contract.update_context() self.log.info("Wait for the second mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit(), timeout=180) + wait_until( + lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 + and not self.mine_contract.can_submit(), + timeout=180, + ) self.nodes[0].miner_stop() self.log.info("Wait for the third mine context release") wait_until(lambda: self.contract.epoch() >= start_epoch + 3, timeout=180) self.contract.update_context() - + self.log.info("Submit the second data chunk") self.submit_data(b"\x22", 2000) # Now the storage node should have the latest flow, but the mining context is using an old one. self.nodes[0].miner_start() self.log.info("Wait for the third mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 3 and not self.mine_contract.can_submit(), timeout=180) + wait_until( + lambda: self.mine_contract.last_mined_epoch() == start_epoch + 3 + and not self.mine_contract.can_submit(), + timeout=180, + ) self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16)) diff --git a/tests/mine_with_market_test.py b/tests/mine_with_market_test.py index a0e8ced..3ee258a 100755 --- a/tests/mine_with_market_test.py +++ b/tests/mine_with_market_test.py @@ -2,13 +2,18 @@ from test_framework.test_framework import TestFramework from config.node_config import MINER_ID, GENESIS_PRIV_KEY from utility.submission import create_submission, submit_data -from utility.utils import wait_until, assert_equal, assert_greater_than, estimate_st_performance +from utility.utils import ( + wait_until, + assert_equal, + assert_greater_than, + estimate_st_performance, +) from test_framework.blockchain_node import BlockChainNodeType import time import math -PRICE_PER_SECTOR = math.ceil(10 * (10 ** 18) / (2 ** 30) * 256 / 12) +PRICE_PER_SECTOR = math.ceil(10 * (10**18) / (2**30) * 256 / 12) class MineTest(TestFramework): @@ -23,17 +28,21 @@ class MineTest(TestFramework): self.enable_market = True self.mine_period = int(50 / self.block_time) self.launch_wait_seconds = 15 - self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) + self.log.info( + "Contract Info: Est. block time %.2f, Mine period %d", + self.block_time, + self.mine_period, + ) - def submit_data(self, item, size, no_submit = False): + def submit_data(self, item, size, no_submit=False): submissions_before = self.contract.num_submissions() client = self.nodes[0] chunk_data = item * 256 * size submissions, data_root = create_submission(chunk_data) value = int(size * PRICE_PER_SECTOR * 1.1) - self.contract.submit(submissions, tx_prarams = {"value": value}) + self.contract.submit(submissions, tx_prarams={"value": value}) wait_until(lambda: self.contract.num_submissions() == submissions_before + 1) - + if not no_submit: wait_until(lambda: client.zgs_get_file_info(data_root) is not None) segment = submit_data(client, chunk_data) @@ -48,11 +57,15 @@ class MineTest(TestFramework): difficulty = int(2**256 / 5 / estimate_st_performance()) self.mine_contract.set_quality(difficulty) - SECTORS_PER_PRICING = int(8 * ( 2 ** 30 ) / 256) + SECTORS_PER_PRICING = int(8 * (2**30) / 256) first_block = self.contract.first_block() self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16)) - self.log.info("Flow deployment block number %d, epoch 1 start %d, wait for epoch 1 start", first_block, first_block + self.mine_period) + self.log.info( + "Flow deployment block number %d, epoch 1 start %d, wait for epoch 1 start", + first_block, + first_block + self.mine_period, + ) wait_until(lambda: self.contract.epoch() >= 1, timeout=180) self.log.info("Submit the actual data chunk (256 MB)") @@ -63,8 +76,12 @@ class MineTest(TestFramework): # wait_until(lambda: self.contract.epoch() >= 1, timeout=180) start_epoch = self.contract.epoch() - - self.log.info("Submission Done, epoch is %d, current block number %d", start_epoch, int(blockchain.eth_blockNumber(), 16)) + + self.log.info( + "Submission Done, epoch is %d, current block number %d", + start_epoch, + int(blockchain.eth_blockNumber(), 16), + ) self.log.info("Wait for mine context release") wait_until(lambda: self.contract.epoch() >= start_epoch + 1, timeout=180) @@ -72,27 +89,38 @@ class MineTest(TestFramework): self.contract.update_context() self.log.info("Wait for mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=120) + wait_until( + lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 + and not self.mine_contract.can_submit(), + timeout=120, + ) rewards = self.reward_contract.reward_distributes() assert_equal(len(rewards), 4) firstReward = rewards[0].args.amount self.log.info("Received reward %d Gwei", firstReward / (10**9)) - self.reward_contract.donate(10000 * 10 ** 18) + self.reward_contract.donate(10000 * 10**18) self.log.info("Donation Done") self.log.info("Submit the data hash only (8 GB)") self.submit_data(b"\x11", int(SECTORS_PER_PRICING), no_submit=True) current_epoch = self.contract.epoch() - assert_equal(current_epoch, start_epoch + 1); - self.log.info("Sumission Done, epoch is %d, current block number %d", self.contract.epoch(), int(blockchain.eth_blockNumber(), 16)) + assert_equal(current_epoch, start_epoch + 1) + self.log.info( + "Sumission Done, epoch is %d, current block number %d", + self.contract.epoch(), + int(blockchain.eth_blockNumber(), 16), + ) self.log.info("Wait for mine context release") wait_until(lambda: self.contract.epoch() >= start_epoch + 2, timeout=180) self.contract.update_context() self.log.info("Wait for mine answer") - wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit()) + wait_until( + lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 + and not self.mine_contract.can_submit() + ) assert_equal(self.contract.epoch(), start_epoch + 2) rewards = self.reward_contract.reward_distributes() diff --git a/tests/network_discovery_test.py b/tests/network_discovery_test.py index 08d0156..3b8920f 100644 --- a/tests/network_discovery_test.py +++ b/tests/network_discovery_test.py @@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID from test_framework.test_framework import TestFramework from utility.utils import p2p_port + class NetworkDiscoveryTest(TestFramework): """ This is to test whether community nodes could connect to each other via UDP discovery. @@ -24,7 +25,6 @@ class NetworkDiscoveryTest(TestFramework): "network_enr_address": "127.0.0.1", "network_enr_tcp_port": bootnode_port, "network_enr_udp_port": bootnode_port, - # disable trusted nodes "network_libp2p_nodes": [], } @@ -37,7 +37,6 @@ class NetworkDiscoveryTest(TestFramework): "network_enr_address": "127.0.0.1", "network_enr_tcp_port": p2p_port(i), "network_enr_udp_port": p2p_port(i), - # disable trusted nodes and enable bootnodes "network_libp2p_nodes": [], "network_boot_nodes": bootnodes, @@ -57,7 +56,13 @@ class NetworkDiscoveryTest(TestFramework): total_connected += info["connectedPeers"] self.log.info( "Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)", - i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"], + i, + info["totalPeers"], + info["bannedPeers"], + info["disconnectedPeers"], + info["connectedPeers"], + info["connectedIncomingPeers"], + info["connectedOutgoingPeers"], ) if total_connected >= self.num_nodes * (self.num_nodes - 1): @@ -66,5 +71,6 @@ class NetworkDiscoveryTest(TestFramework): self.log.info("====================================") self.log.info("All nodes connected to each other successfully") + if __name__ == "__main__": NetworkDiscoveryTest().main() diff --git a/tests/network_discovery_upgrade_test.py b/tests/network_discovery_upgrade_test.py index f20067f..4179550 100644 --- a/tests/network_discovery_upgrade_test.py +++ b/tests/network_discovery_upgrade_test.py @@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID from test_framework.test_framework import TestFramework from utility.utils import p2p_port + class NetworkDiscoveryUpgradeTest(TestFramework): """ This is to test that low version community nodes could not connect to bootnodes. @@ -24,7 +25,6 @@ class NetworkDiscoveryUpgradeTest(TestFramework): "network_enr_address": "127.0.0.1", "network_enr_tcp_port": bootnode_port, "network_enr_udp_port": bootnode_port, - # disable trusted nodes "network_libp2p_nodes": [], } @@ -37,11 +37,9 @@ class NetworkDiscoveryUpgradeTest(TestFramework): "network_enr_address": "127.0.0.1", "network_enr_tcp_port": p2p_port(i), "network_enr_udp_port": p2p_port(i), - # disable trusted nodes and enable bootnodes "network_libp2p_nodes": [], "network_boot_nodes": bootnodes, - # disable network identity in ENR "discv5_disable_enr_network_id": True, } @@ -57,7 +55,13 @@ class NetworkDiscoveryUpgradeTest(TestFramework): total_connected += info["connectedPeers"] self.log.info( "Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)", - i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"], + i, + info["totalPeers"], + info["bannedPeers"], + info["disconnectedPeers"], + info["connectedPeers"], + info["connectedIncomingPeers"], + info["connectedOutgoingPeers"], ) # ENR incompatible and should not discover each other for TCP connection @@ -66,5 +70,6 @@ class NetworkDiscoveryUpgradeTest(TestFramework): self.log.info("====================================") self.log.info("ENR incompatible nodes do not connect to each other") + if __name__ == "__main__": NetworkDiscoveryUpgradeTest().main() diff --git a/tests/network_tcp_shard_test.py b/tests/network_tcp_shard_test.py index 8c3896b..e3d4fb0 100644 --- a/tests/network_tcp_shard_test.py +++ b/tests/network_tcp_shard_test.py @@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID from test_framework.test_framework import TestFramework from utility.utils import p2p_port + class NetworkTcpShardTest(TestFramework): """ This is to test TCP connection for shard config mismatched peers of UDP discovery. @@ -24,12 +25,10 @@ class NetworkTcpShardTest(TestFramework): "network_enr_address": "127.0.0.1", "network_enr_tcp_port": bootnode_port, "network_enr_udp_port": bootnode_port, - # disable trusted nodes "network_libp2p_nodes": [], - # custom shard config - "shard_position": "0/4" + "shard_position": "0/4", } # setup node 1 & 2 as community nodes @@ -40,13 +39,11 @@ class NetworkTcpShardTest(TestFramework): "network_enr_address": "127.0.0.1", "network_enr_tcp_port": p2p_port(i), "network_enr_udp_port": p2p_port(i), - # disable trusted nodes and enable bootnodes "network_libp2p_nodes": [], "network_boot_nodes": bootnodes, - # custom shard config - "shard_position": f"{i}/4" + "shard_position": f"{i}/4", } def run_test(self): @@ -60,7 +57,13 @@ class NetworkTcpShardTest(TestFramework): info = self.nodes[i].rpc.admin_getNetworkInfo() self.log.info( "Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)", - i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"], + i, + info["totalPeers"], + info["bannedPeers"], + info["disconnectedPeers"], + info["connectedPeers"], + info["connectedIncomingPeers"], + info["connectedOutgoingPeers"], ) if i == timeout_secs - 1: @@ -72,5 +75,6 @@ class NetworkTcpShardTest(TestFramework): self.log.info("====================================") self.log.info("All nodes discovered but not connected for each other") + if __name__ == "__main__": NetworkTcpShardTest().main() diff --git a/tests/pruner_test.py b/tests/pruner_test.py index bac2a76..3707caf 100755 --- a/tests/pruner_test.py +++ b/tests/pruner_test.py @@ -23,7 +23,11 @@ class PrunerTest(TestFramework): self.mine_period = int(45 / self.block_time) self.lifetime_seconds = 240 self.launch_wait_seconds = 15 - self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period) + self.log.info( + "Contract Info: Est. block time %.2f, Mine period %d", + self.block_time, + self.mine_period, + ) def run_test(self): client = self.nodes[0] @@ -31,7 +35,10 @@ class PrunerTest(TestFramework): chunk_data = b"\x02" * 16 * 256 * 1024 # chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024 submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) + self.contract.submit( + submissions, + tx_prarams={"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}, + ) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) diff --git a/tests/random_test.py b/tests/random_test.py index 2edb882..50f9555 100755 --- a/tests/random_test.py +++ b/tests/random_test.py @@ -7,6 +7,7 @@ from utility.submission import create_submission from utility.submission import submit_data from utility.utils import wait_until + class RandomTest(TestFramework): def setup_params(self): self.num_blockchain_nodes = 1 @@ -32,14 +33,18 @@ class RandomTest(TestFramework): else: size = random.randint(0, max_size) no_data = random.random() <= no_data_ratio - self.log.info(f"choose {chosen_node}, seq={i}, size={size}, no_data={no_data}") + self.log.info( + f"choose {chosen_node}, seq={i}, size={size}, no_data={no_data}" + ) client = self.nodes[chosen_node] chunk_data = random.randbytes(size) submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions) wait_until(lambda: self.contract.num_submissions() == i + 1) - wait_until(lambda: client.zgs_get_file_info(data_root) is not None, timeout=120) + wait_until( + lambda: client.zgs_get_file_info(data_root) is not None, timeout=120 + ) if not no_data: submit_data(client, chunk_data) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) @@ -47,8 +52,17 @@ class RandomTest(TestFramework): for node_index in range(len(self.nodes)): if node_index != chosen_node: self.log.debug(f"check {node_index}") - wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root) is not None, timeout=300) - wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root)["finalized"], timeout=300) + wait_until( + lambda: self.nodes[node_index].zgs_get_file_info(data_root) + is not None, + timeout=300, + ) + wait_until( + lambda: self.nodes[node_index].zgs_get_file_info(data_root)[ + "finalized" + ], + timeout=300, + ) # TODO(zz): This is a temp solution to trigger auto sync after all nodes started. if i >= tx_count - 2: continue @@ -72,7 +86,10 @@ class RandomTest(TestFramework): if not no_data: for node in self.nodes: self.log.debug(f"check {data_root}, {node.index}") - wait_until(lambda: node.zgs_get_file_info(data_root)["finalized"], timeout=300) + wait_until( + lambda: node.zgs_get_file_info(data_root)["finalized"], + timeout=300, + ) if __name__ == "__main__": diff --git a/tests/root_consistency_test.py b/tests/root_consistency_test.py index 737088b..039a6a5 100755 --- a/tests/root_consistency_test.py +++ b/tests/root_consistency_test.py @@ -32,8 +32,6 @@ class RootConsistencyTest(TestFramework): assert_equal(contract_length, expected_length) assert_equal(contract_root, node_root[2:]) - - def run_test(self): self.assert_flow_status(1) @@ -48,7 +46,6 @@ class RootConsistencyTest(TestFramework): self.submit_data(b"\x13", 512 + 256) self.assert_flow_status(1024 + 512 + 256) - if __name__ == "__main__": diff --git a/tests/rpc_test.py b/tests/rpc_test.py index 9f95c07..e7b9ee2 100755 --- a/tests/rpc_test.py +++ b/tests/rpc_test.py @@ -29,6 +29,7 @@ class RpcTest(TestFramework): wait_until(lambda: client1.zgs_get_file_info(data_root) is not None) assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False) + assert_equal(client1.zgs_get_file_info(data_root)["pruned"], False) wait_until(lambda: client2.zgs_get_file_info(data_root) is not None) assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False) @@ -37,17 +38,13 @@ class RpcTest(TestFramework): self.log.info("segment: %s", segment) wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"]) - assert_equal( - client1.zgs_download_segment(data_root, 0, 1), segment[0]["data"] - ) + assert_equal(client1.zgs_download_segment(data_root, 0, 1), segment[0]["data"]) client2.admin_start_sync_file(0) wait_until(lambda: client2.sync_status_is_completed_or_unknown(0)) wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"]) - assert_equal( - client2.zgs_download_segment(data_root, 0, 1), segment[0]["data"] - ) + assert_equal(client2.zgs_download_segment(data_root, 0, 1), segment[0]["data"]) self.__test_upload_file_with_cli(client1) @@ -89,9 +86,7 @@ class RpcTest(TestFramework): ) ) - wait_until( - lambda: self.nodes[i].zgs_get_file_info(root)["finalized"] - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(root)["finalized"]) assert_equal( client1.zgs_download_segment(root, 0, 2), diff --git a/tests/same_root_test.py b/tests/same_root_test.py index 7646559..6c98a08 100755 --- a/tests/same_root_test.py +++ b/tests/same_root_test.py @@ -41,7 +41,11 @@ class SubmissionTest(TestFramework): for tx_offset in range(same_root_tx_count + 1): tx_seq = next_tx_seq - 1 - tx_offset # old txs are finalized after finalizing the new tx, so we may need to wait here. - wait_until(lambda: self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)["finalized"]) + wait_until( + lambda: self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)[ + "finalized" + ] + ) # Send tx after uploading data for _ in range(same_root_tx_count): @@ -57,7 +61,10 @@ class SubmissionTest(TestFramework): client = self.nodes[node_idx] wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq) is not None) - wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"] == data_finalized) + wait_until( + lambda: client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"] + == data_finalized + ) def submit_data(self, chunk_data, node_idx=0): _, data_root = create_submission(chunk_data) diff --git a/tests/shard_submission_test.py b/tests/shard_submission_test.py index 01dd9c9..3f51274 100644 --- a/tests/shard_submission_test.py +++ b/tests/shard_submission_test.py @@ -18,30 +18,30 @@ class ShardSubmitTest(TestFramework): self.num_blockchain_nodes = 1 self.num_nodes = 4 self.zgs_node_configs[0] = { - "db_max_num_sectors": 2 ** 30, - "shard_position": "0/4" + "db_max_num_sectors": 2**30, + "shard_position": "0/4", } self.zgs_node_configs[1] = { - "db_max_num_sectors": 2 ** 30, - "shard_position": "1/4" + "db_max_num_sectors": 2**30, + "shard_position": "1/4", } self.zgs_node_configs[2] = { - "db_max_num_sectors": 2 ** 30, - "shard_position": "2/4" + "db_max_num_sectors": 2**30, + "shard_position": "2/4", } self.zgs_node_configs[3] = { - "db_max_num_sectors": 2 ** 30, - "shard_position": "3/4" + "db_max_num_sectors": 2**30, + "shard_position": "3/4", } - + def run_test(self): data_size = [ - 256*960, - 256*1024, + 256 * 960, + 256 * 1024, 2, 255, - 256*960, - 256*120, + 256 * 960, + 256 * 120, 256, 257, 1023, @@ -77,5 +77,6 @@ class ShardSubmitTest(TestFramework): submit_data(client, chunk_data) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) + if __name__ == "__main__": ShardSubmitTest().main() diff --git a/tests/shard_sync_test.py b/tests/shard_sync_test.py index 6f1f7e0..caef2af 100755 --- a/tests/shard_sync_test.py +++ b/tests/shard_sync_test.py @@ -13,16 +13,16 @@ class PrunerTest(TestFramework): self.num_blockchain_nodes = 1 self.num_nodes = 4 self.zgs_node_configs[0] = { - "db_max_num_sectors": 2 ** 30, - "shard_position": "0/2" + "db_max_num_sectors": 2**30, + "shard_position": "0/2", } self.zgs_node_configs[1] = { - "db_max_num_sectors": 2 ** 30, - "shard_position": "1/2" + "db_max_num_sectors": 2**30, + "shard_position": "1/2", } self.zgs_node_configs[3] = { - "db_max_num_sectors": 2 ** 30, - "shard_position": "1/4" + "db_max_num_sectors": 2**30, + "shard_position": "1/4", } self.enable_market = True @@ -31,7 +31,10 @@ class PrunerTest(TestFramework): chunk_data = b"\x02" * 8 * 256 * 1024 submissions, data_root = create_submission(chunk_data) - self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}) + self.contract.submit( + submissions, + tx_prarams={"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)}, + ) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) @@ -57,10 +60,18 @@ class PrunerTest(TestFramework): for i in range(len(segments)): index_store = i % 2 index_empty = 1 - i % 2 - seg0 = self.nodes[index_store].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - seg1 = self.nodes[index_empty].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - seg2 = self.nodes[2].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) - seg3 = self.nodes[3].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024) + seg0 = self.nodes[index_store].zgs_download_segment( + data_root, i * 1024, (i + 1) * 1024 + ) + seg1 = self.nodes[index_empty].zgs_download_segment( + data_root, i * 1024, (i + 1) * 1024 + ) + seg2 = self.nodes[2].zgs_download_segment( + data_root, i * 1024, (i + 1) * 1024 + ) + seg3 = self.nodes[3].zgs_download_segment( + data_root, i * 1024, (i + 1) * 1024 + ) # base64 encoding size assert_equal(len(seg0), 349528) assert_equal(seg1, None) diff --git a/tests/snapshot_test.py b/tests/snapshot_test.py index dc31c35..435b5a1 100644 --- a/tests/snapshot_test.py +++ b/tests/snapshot_test.py @@ -5,6 +5,7 @@ import shutil from test_framework.test_framework import TestFramework from utility.utils import wait_until + class SnapshotTask(TestFramework): def setup_params(self): self.num_nodes = 2 @@ -28,11 +29,11 @@ class SnapshotTask(TestFramework): # Start the last node to verify historical file sync self.nodes[1].shutdown() - shutil.rmtree(os.path.join(self.nodes[1].data_dir, 'db/data_db')) - + shutil.rmtree(os.path.join(self.nodes[1].data_dir, "db/data_db")) + self.start_storage_node(1) self.nodes[1].wait_for_rpc_connection() - + wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None) wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"]) diff --git a/tests/submission_test.py b/tests/submission_test.py index 5935fc0..bb4e9d6 100755 --- a/tests/submission_test.py +++ b/tests/submission_test.py @@ -77,9 +77,7 @@ class SubmissionTest(TestFramework): continue # Wait for log entry before file sync, otherwise, admin_startSyncFile will be failed. - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root) is not None - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None) self.nodes[i].admin_start_sync_file(submission_index - 1) @@ -89,15 +87,11 @@ class SubmissionTest(TestFramework): ) ) - wait_until( - lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"] - ) + wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]) assert_equal( base64.b64decode( - self.nodes[i] - .zgs_download_segment(data_root, 0, 1) - .encode("utf-8") + self.nodes[i].zgs_download_segment(data_root, 0, 1).encode("utf-8") ), first_entry, ) diff --git a/tests/sync_auto_historical_test.py b/tests/sync_auto_historical_test.py index 5b4989c..0319438 100644 --- a/tests/sync_auto_historical_test.py +++ b/tests/sync_auto_historical_test.py @@ -3,17 +3,14 @@ from test_framework.test_framework import TestFramework from utility.utils import wait_until + class AutoSyncHistoricalTest(TestFramework): def setup_params(self): self.num_nodes = 4 # Enable auto sync for i in range(self.num_nodes): - self.zgs_node_configs[i] = { - "sync": { - "auto_sync_enabled": True - } - } + self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}} def run_test(self): # Stop the last node to verify historical file sync @@ -26,17 +23,36 @@ class AutoSyncHistoricalTest(TestFramework): # Files should be available on other nodes via auto sync for i in range(1, self.num_nodes - 1): wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None) - wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"]) + wait_until( + lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"] + ) wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None) - wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]) + wait_until( + lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"] + ) # Start the last node to verify historical file sync self.start_storage_node(self.num_nodes - 1) self.nodes[self.num_nodes - 1].wait_for_rpc_connection() - wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1) is not None) - wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)["finalized"]) - wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2) is not None) - wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)["finalized"]) + wait_until( + lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1) + is not None + ) + wait_until( + lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)[ + "finalized" + ] + ) + wait_until( + lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2) + is not None + ) + wait_until( + lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)[ + "finalized" + ] + ) + if __name__ == "__main__": AutoSyncHistoricalTest().main() diff --git a/tests/sync_auto_test.py b/tests/sync_auto_test.py index 43ec235..d20c5af 100644 --- a/tests/sync_auto_test.py +++ b/tests/sync_auto_test.py @@ -3,17 +3,14 @@ from test_framework.test_framework import TestFramework from utility.utils import wait_until + class AutoSyncTest(TestFramework): def setup_params(self): self.num_nodes = 2 # Enable auto sync for i in range(self.num_nodes): - self.zgs_node_configs[i] = { - "sync": { - "auto_sync_enabled": True - } - } + self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}} def run_test(self): # Submit and upload files on node 0 @@ -26,5 +23,6 @@ class AutoSyncTest(TestFramework): wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None) wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"]) + if __name__ == "__main__": AutoSyncTest().main() diff --git a/tests/sync_chunks_test.py b/tests/sync_chunks_test.py index f24418f..33fc583 100644 --- a/tests/sync_chunks_test.py +++ b/tests/sync_chunks_test.py @@ -6,6 +6,7 @@ from test_framework.test_framework import TestFramework from utility.submission import data_to_segments from utility.utils import assert_equal, wait_until + class SyncChunksTest(TestFramework): """ By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false, @@ -17,9 +18,7 @@ class SyncChunksTest(TestFramework): # enable find chunks topic for i in range(self.num_nodes): - self.zgs_node_configs[i] = { - "network_find_chunks_enabled": True - } + self.zgs_node_configs[i] = {"network_find_chunks_enabled": True} def run_test(self): client1 = self.nodes[0] @@ -35,20 +34,25 @@ class SyncChunksTest(TestFramework): # Upload only 2nd segment to storage node segments = data_to_segments(chunk_data) - self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]) - assert(client1.zgs_upload_segment(segments[1]) is None) + self.log.info( + "segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments] + ) + assert client1.zgs_upload_segment(segments[1]) is None # segment 0 is not able to download - assert(client1.zgs_download_segment_decoded(data_root, 0, 1024) is None) + assert client1.zgs_download_segment_decoded(data_root, 0, 1024) is None # segment 1 is available to download - assert_equal(client1.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256]) + assert_equal( + client1.zgs_download_segment_decoded(data_root, 1024, 2048), + chunk_data[1024 * 256 : 2048 * 256], + ) # segment 2 is not able to download - assert(client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None) + assert client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None # Segment 1 should not be able to download on node 2 wait_until(lambda: client2.zgs_get_file_info(data_root) is not None) assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False) - assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None) + assert client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None # Restart node 1 to check if the proof nodes are persisted. self.stop_storage_node(0) @@ -56,12 +60,19 @@ class SyncChunksTest(TestFramework): self.nodes[0].wait_for_rpc_connection() # Trigger chunks sync by rpc - assert(client2.admin_start_sync_chunks(0, 1024, 2048) is None) + assert client2.admin_start_sync_chunks(0, 1024, 2048) is None wait_until(lambda: client2.sync_status_is_completed_or_unknown(0)) - wait_until(lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) is not None) + wait_until( + lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) + is not None + ) # Validate data - assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256]) + assert_equal( + client2.zgs_download_segment_decoded(data_root, 1024, 2048), + chunk_data[1024 * 256 : 2048 * 256], + ) + if __name__ == "__main__": SyncChunksTest().main() diff --git a/tests/sync_file_test.py b/tests/sync_file_test.py index 4a64f0e..265b5e6 100644 --- a/tests/sync_file_test.py +++ b/tests/sync_file_test.py @@ -5,6 +5,7 @@ import time from test_framework.test_framework import TestFramework from utility.utils import assert_equal, wait_until + class SyncFileTest(TestFramework): """ By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false, @@ -26,7 +27,7 @@ class SyncFileTest(TestFramework): # restart client2 client2.start() client2.wait_for_rpc_connection() - + # File should not be auto sync on node 2 and there is no cached file locations wait_until(lambda: client2.zgs_get_file_info(data_root) is not None) time.sleep(3) @@ -35,7 +36,7 @@ class SyncFileTest(TestFramework): # assert(client2.admin_get_file_location(0) is None) # Trigger file sync by rpc - assert(client2.admin_start_sync_file(0) is None) + assert client2.admin_start_sync_file(0) is None wait_until(lambda: client2.sync_status_is_completed_or_unknown(0)) wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"]) # file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore. @@ -47,5 +48,6 @@ class SyncFileTest(TestFramework): client1.zgs_download_segment(data_root, 0, 1024), ) + if __name__ == "__main__": SyncFileTest().main() diff --git a/tests/test_all.py b/tests/test_all.py index 2dfaafd..0bab35e 100755 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -6,8 +6,8 @@ from utility.run_all import run_all if __name__ == "__main__": run_all( - test_dir = os.path.dirname(__file__), + test_dir=os.path.dirname(__file__), slow_tests={"mine_test.py", "random_test.py", "same_root_test.py"}, long_manual_tests={"fuzz_test.py"}, single_run_tests={"mine_with_market_test.py"}, - ) \ No newline at end of file + ) diff --git a/tests/test_framework/blockchain_node.py b/tests/test_framework/blockchain_node.py index db36548..1dc5e4e 100644 --- a/tests/test_framework/blockchain_node.py +++ b/tests/test_framework/blockchain_node.py @@ -12,11 +12,7 @@ from config.node_config import ( TX_PARAMS, ) from utility.simple_rpc_proxy import SimpleRpcProxy -from utility.utils import ( - initialize_config, - wait_until, - estimate_st_performance -) +from utility.utils import initialize_config, wait_until, estimate_st_performance from test_framework.contracts import load_contract_metadata @@ -36,6 +32,7 @@ class BlockChainNodeType(Enum): else: raise AssertionError("Unsupported blockchain type") + @unique class NodeType(Enum): BlockChain = 0 @@ -50,7 +47,7 @@ class TestNode: def __init__( self, node_type, index, data_dir, rpc_url, binary, config, log, rpc_timeout=10 ): - assert os.path.exists(binary), ("Binary not found: %s" % binary) + assert os.path.exists(binary), "Binary not found: %s" % binary self.node_type = node_type self.index = index self.data_dir = data_dir @@ -270,12 +267,14 @@ class BlockchainNode(TestNode): def deploy_contract(name, args=None): if args is None: args = [] - contract_interface = load_contract_metadata(path=self.contract_path, name=name) + contract_interface = load_contract_metadata( + path=self.contract_path, name=name + ) contract = w3.eth.contract( abi=contract_interface["abi"], bytecode=contract_interface["bytecode"], ) - + tx_params = TX_PARAMS.copy() del tx_params["gas"] tx_hash = contract.constructor(*args).transact(tx_params) @@ -285,7 +284,7 @@ class BlockchainNode(TestNode): abi=contract_interface["abi"], ) return contract, tx_hash - + def deploy_no_market(): self.log.debug("Start deploy contracts") @@ -301,12 +300,16 @@ class BlockchainNode(TestNode): mine_contract, _ = deploy_contract("PoraMineTest", [0]) self.log.debug("Mine deployed") - mine_contract.functions.initialize(1, flow_contract.address, dummy_reward_contract.address).transact(TX_PARAMS) + mine_contract.functions.initialize( + 1, flow_contract.address, dummy_reward_contract.address + ).transact(TX_PARAMS) mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS) mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) self.log.debug("Mine Initialized") - flow_initialize_hash = flow_contract.functions.initialize(dummy_market_contract.address, mine_period).transact(TX_PARAMS) + flow_initialize_hash = flow_contract.functions.initialize( + dummy_market_contract.address, mine_period + ).transact(TX_PARAMS) self.log.debug("Flow Initialized") self.wait_for_transaction_receipt(w3, flow_initialize_hash) @@ -315,45 +318,60 @@ class BlockchainNode(TestNode): # tx_hash = mine_contract.functions.setMiner(decode_hex(MINER_ID)).transact(TX_PARAMS) # self.wait_for_transaction_receipt(w3, tx_hash) - return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract - + return ( + flow_contract, + flow_initialize_hash, + mine_contract, + dummy_reward_contract, + ) + def deploy_with_market(lifetime_seconds): self.log.debug("Start deploy contracts") - + mine_contract, _ = deploy_contract("PoraMineTest", [0]) self.log.debug("Mine deployed") - + market_contract, _ = deploy_contract("FixedPrice", []) self.log.debug("Market deployed") - - reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds]) + + reward_contract, _ = deploy_contract( + "ChunkLinearReward", [lifetime_seconds] + ) self.log.debug("Reward deployed") - + flow_contract, _ = deploy_contract("FixedPriceFlow", [0]) self.log.debug("Flow deployed") - - mine_contract.functions.initialize(1, flow_contract.address, reward_contract.address).transact(TX_PARAMS) + + mine_contract.functions.initialize( + 1, flow_contract.address, reward_contract.address + ).transact(TX_PARAMS) mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS) mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS) self.log.debug("Mine Initialized") - - market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 / - 2 ** 30 / 12 / 31 / 86400), - flow_contract.address, reward_contract.address).transact(TX_PARAMS) + + market_contract.functions.initialize( + int(lifetime_seconds * 256 * 10 * 10**18 / 2**30 / 12 / 31 / 86400), + flow_contract.address, + reward_contract.address, + ).transact(TX_PARAMS) self.log.debug("Market Initialized") - - reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS) - reward_contract.functions.setBaseReward(10 ** 18).transact(TX_PARAMS) + + reward_contract.functions.initialize( + market_contract.address, mine_contract.address + ).transact(TX_PARAMS) + reward_contract.functions.setBaseReward(10**18).transact(TX_PARAMS) self.log.debug("Reward Initialized") - - flow_initialize_hash = flow_contract.functions.initialize(market_contract.address, mine_period).transact(TX_PARAMS) + + flow_initialize_hash = flow_contract.functions.initialize( + market_contract.address, mine_period + ).transact(TX_PARAMS) self.log.debug("Flow Initialized") - + self.wait_for_transaction_receipt(w3, flow_initialize_hash) self.log.info("All contracts deployed") return flow_contract, flow_initialize_hash, mine_contract, reward_contract - + if enable_market: return deploy_with_market(lifetime_seconds) else: @@ -376,4 +394,7 @@ class BlockchainNode(TestNode): w3.eth.wait_for_transaction_receipt(tx_hash) def start(self): - super().start(self.blockchain_node_type == BlockChainNodeType.BSC or self.blockchain_node_type == BlockChainNodeType.ZG) + super().start( + self.blockchain_node_type == BlockChainNodeType.BSC + or self.blockchain_node_type == BlockChainNodeType.ZG + ) diff --git a/tests/test_framework/contract_proxy.py b/tests/test_framework/contract_proxy.py index 7f2d156..cfdebd8 100644 --- a/tests/test_framework/contract_proxy.py +++ b/tests/test_framework/contract_proxy.py @@ -36,28 +36,36 @@ class ContractProxy: tx_params = copy(TX_PARAMS) tx_params["value"] = value return getattr(contract.functions, fn_name)(**args).transact(tx_params) - + def _logs(self, event_name, node_idx, **args): assert node_idx < len(self.blockchain_nodes) contract = self._get_contract(node_idx) - - return getattr(contract.events, event_name).create_filter(from_block=0, to_block="latest").get_all_entries() - def transfer(self, value, node_idx = 0): + return ( + getattr(contract.events, event_name) + .create_filter(from_block=0, to_block="latest") + .get_all_entries() + ) + + def transfer(self, value, node_idx=0): tx_params = copy(TX_PARAMS) tx_params["value"] = value contract = self._get_contract(node_idx) contract.receive.transact(tx_params) - + def address(self): return self.contract_address class FlowContractProxy(ContractProxy): def submit( - self, submission_nodes, node_idx=0, tx_prarams=None, parent_hash=None, + self, + submission_nodes, + node_idx=0, + tx_prarams=None, + parent_hash=None, ): assert node_idx < len(self.blockchain_nodes) @@ -65,11 +73,12 @@ class FlowContractProxy(ContractProxy): if tx_prarams is not None: combined_tx_prarams.update(tx_prarams) - contract = self._get_contract(node_idx) # print(contract.functions.submit(submission_nodes).estimate_gas(combined_tx_prarams)) - tx_hash = contract.functions.submit(submission_nodes).transact(combined_tx_prarams) + tx_hash = contract.functions.submit(submission_nodes).transact( + combined_tx_prarams + ) receipt = self.blockchain_nodes[node_idx].wait_for_transaction_receipt( contract.w3, tx_hash, parent_hash=parent_hash ) @@ -85,46 +94,46 @@ class FlowContractProxy(ContractProxy): def epoch(self, node_idx=0): return self.get_mine_context(node_idx)[0] - + def update_context(self, node_idx=0): return self._send("makeContext", node_idx) def get_mine_context(self, node_idx=0): return self._call("makeContextWithResult", node_idx) - + def get_flow_root(self, node_idx=0): return self._call("computeFlowRoot", node_idx) def get_flow_length(self, node_idx=0): return self._call("tree", node_idx)[0] + class MineContractProxy(ContractProxy): def last_mined_epoch(self, node_idx=0): return self._call("lastMinedEpoch", node_idx) def can_submit(self, node_idx=0): return self._call("canSubmit", node_idx) - + def current_submissions(self, node_idx=0): return self._call("currentSubmissions", node_idx) def set_quality(self, quality, node_idx=0): return self._send("setQuality", node_idx, _targetQuality=quality) - class RewardContractProxy(ContractProxy): def reward_distributes(self, node_idx=0): return self._logs("DistributeReward", node_idx) - def donate(self, value, node_idx = 0): + def donate(self, value, node_idx=0): return self._send_payable("donate", node_idx, value) - - def base_reward(self, node_idx = 0): + + def base_reward(self, node_idx=0): return self._call("baseReward", node_idx) - def first_rewardable_chunk(self, node_idx = 0): + def first_rewardable_chunk(self, node_idx=0): return self._call("firstRewardableChunk", node_idx) - def reward_deadline(self, node_idx = 0): + def reward_deadline(self, node_idx=0): return self._call("rewardDeadline", node_idx, 0) diff --git a/tests/test_framework/contracts.py b/tests/test_framework/contracts.py index cea6f12..e3c1e2a 100644 --- a/tests/test_framework/contracts.py +++ b/tests/test_framework/contracts.py @@ -1,6 +1,7 @@ from pathlib import Path import json + def load_contract_metadata(path: str, name: str): path = Path(path) try: @@ -8,4 +9,3 @@ def load_contract_metadata(path: str, name: str): return json.loads(open(found_file, "r").read()) except StopIteration: raise Exception(f"Cannot found contract {name}'s metadata") - diff --git a/tests/test_framework/test_framework.py b/tests/test_framework/test_framework.py index 3f23782..11af013 100644 --- a/tests/test_framework/test_framework.py +++ b/tests/test_framework/test_framework.py @@ -15,7 +15,11 @@ from pathlib import Path from eth_utils import encode_hex from test_framework.bsc_node import BSCNode -from test_framework.contract_proxy import FlowContractProxy, MineContractProxy, RewardContractProxy +from test_framework.contract_proxy import ( + FlowContractProxy, + MineContractProxy, + RewardContractProxy, +) from test_framework.zgs_node import ZgsNode from test_framework.blockchain_node import BlockChainNodeType from test_framework.conflux_node import ConfluxNode, connect_sample_nodes @@ -74,13 +78,17 @@ class TestFramework: root_dir, "target", "release", "zgs_node" + binary_ext ) self.__default_zgs_cli_binary__ = os.path.join( - tests_dir, "tmp", "0g-storage-client" + binary_ext + tests_dir, "tmp", "0g-storage-client" + binary_ext ) def __setup_blockchain_node(self): if self.blockchain_node_type == BlockChainNodeType.ZG: - zg_node_init_genesis(self.blockchain_binary, self.root_dir, self.num_blockchain_nodes) - self.log.info("0gchain genesis initialized for %s nodes" % self.num_blockchain_nodes) + zg_node_init_genesis( + self.blockchain_binary, self.root_dir, self.num_blockchain_nodes + ) + self.log.info( + "0gchain genesis initialized for %s nodes" % self.num_blockchain_nodes + ) for i in range(self.num_blockchain_nodes): if i in self.blockchain_node_configs: @@ -171,15 +179,20 @@ class TestFramework: self.log.debug("Wait for 0gchain node to generate first block") time.sleep(0.5) for node in self.blockchain_nodes: - wait_until(lambda: node.net_peerCount() == self.num_blockchain_nodes - 1) + wait_until( + lambda: node.net_peerCount() == self.num_blockchain_nodes - 1 + ) wait_until(lambda: node.eth_blockNumber() is not None) wait_until(lambda: int(node.eth_blockNumber(), 16) > 0) - contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds) + contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[ + 0 + ].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds) self.contract = FlowContractProxy(contract, self.blockchain_nodes) self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes) - self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes) - + self.reward_contract = RewardContractProxy( + reward_contract, self.blockchain_nodes + ) for node in self.blockchain_nodes[1:]: node.wait_for_transaction(tx_hash) @@ -216,12 +229,14 @@ class TestFramework: time.sleep(1) node.start() - self.log.info("Wait the zgs_node launch for %d seconds", self.launch_wait_seconds) + self.log.info( + "Wait the zgs_node launch for %d seconds", self.launch_wait_seconds + ) time.sleep(self.launch_wait_seconds) for node in self.nodes: node.wait_for_rpc_connection() - + def add_arguments(self, parser: argparse.ArgumentParser): parser.add_argument( "--conflux-binary", @@ -336,10 +351,12 @@ class TestFramework: self.log.addHandler(ch) def _check_cli_binary(self): - if Path(self.cli_binary).absolute() == Path(self.__default_zgs_cli_binary__).absolute() and not os.path.exists(self.cli_binary): + if Path(self.cli_binary).absolute() == Path( + self.__default_zgs_cli_binary__ + ).absolute() and not os.path.exists(self.cli_binary): dir = Path(self.cli_binary).parent.absolute() build_cli(dir) - + assert os.path.exists(self.cli_binary), ( "zgs CLI binary not found: %s" % self.cli_binary ) @@ -353,14 +370,12 @@ class TestFramework: file_to_upload, ): self._check_cli_binary() - + upload_args = [ self.cli_binary, "upload", "--url", blockchain_node_rpc_url, - "--contract", - contract_address, "--key", encode_hex(key), "--node", @@ -372,7 +387,9 @@ class TestFramework: "--file", ] - output = tempfile.NamedTemporaryFile(dir=self.root_dir, delete=False, prefix="zgs_client_output_") + output = tempfile.NamedTemporaryFile( + dir=self.root_dir, delete=False, prefix="zgs_client_output_" + ) output_name = output.name output_fileno = output.fileno() @@ -383,7 +400,7 @@ class TestFramework: stdout=output_fileno, stderr=output_fileno, ) - + return_code = proc.wait(timeout=60) output.seek(0) @@ -392,17 +409,27 @@ class TestFramework: line = line.decode("utf-8") self.log.debug("line: %s", line) if "root" in line: - filtered_line = re.sub(r'\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?', '', line) + filtered_line = re.sub( + r"\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?", + "", + line, + ) index = filtered_line.find("root=") if index > 0: root = filtered_line[index + 5 : index + 5 + 66] except Exception as ex: - self.log.error("Failed to upload file via CLI tool, output: %s", output_name) + self.log.error( + "Failed to upload file via CLI tool, output: %s", output_name + ) raise ex finally: output.close() - assert return_code == 0, "%s upload file failed, output: %s, log: %s" % (self.cli_binary, output_name, lines) + assert return_code == 0, "%s upload file failed, output: %s, log: %s" % ( + self.cli_binary, + output_name, + lines, + ) return root @@ -410,8 +437,15 @@ class TestFramework: submissions, data_root = create_submission(chunk_data) self.contract.submit(submissions) self.num_deployed_contracts += 1 - wait_until(lambda: self.contract.num_submissions() == self.num_deployed_contracts) - self.log.info("Submission completed, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions) + wait_until( + lambda: self.contract.num_submissions() == self.num_deployed_contracts + ) + self.log.info( + "Submission completed, data root: %s, submissions(%s) = %s", + data_root, + len(submissions), + submissions, + ) return data_root def __upload_file__(self, node_index: int, random_data_size: int) -> str: @@ -426,7 +460,9 @@ class TestFramework: # Upload file to storage node segments = submit_data(client, chunk_data) - self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]) + self.log.info( + "segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments] + ) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) return data_root @@ -452,7 +488,6 @@ class TestFramework: if clean: self.nodes[index].clean_data() - def start_storage_node(self, index): self.nodes[index].start() @@ -484,7 +519,7 @@ class TestFramework: if os.path.islink(dst): os.remove(dst) - elif os.path.isdir(dst): + elif os.path.isdir(dst): shutil.rmtree(dst) elif os.path.exists(dst): os.remove(dst) diff --git a/tests/test_framework/zg_node.py b/tests/test_framework/zg_node.py index dbb0c75..b28b438 100644 --- a/tests/test_framework/zg_node.py +++ b/tests/test_framework/zg_node.py @@ -3,24 +3,35 @@ import subprocess import tempfile from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode -from utility.utils import blockchain_p2p_port, blockchain_rpc_port, blockchain_ws_port, blockchain_rpc_port_tendermint, pprof_port +from utility.utils import ( + blockchain_p2p_port, + blockchain_rpc_port, + blockchain_ws_port, + blockchain_rpc_port_tendermint, + pprof_port, +) from utility.build_binary import build_zg + def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int): assert num_nodes > 0, "Invalid number of blockchain nodes: %s" % num_nodes if not os.path.exists(binary): - build_zg(os.path.dirname(binary)) + build_zg(os.path.dirname(binary)) shell_script = os.path.join( - os.path.dirname(os.path.realpath(__file__)), # test_framework folder - "..", "config", "0gchain-init-genesis.sh" + os.path.dirname(os.path.realpath(__file__)), # test_framework folder + "..", + "config", + "0gchain-init-genesis.sh", ) zgchaind_dir = os.path.join(root_dir, "0gchaind") os.mkdir(zgchaind_dir) - - log_file = tempfile.NamedTemporaryFile(dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log") + + log_file = tempfile.NamedTemporaryFile( + dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log" + ) p2p_port_start = blockchain_p2p_port(0) ret = subprocess.run( @@ -31,7 +42,11 @@ def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int): log_file.close() - assert ret.returncode == 0, "Failed to init 0gchain genesis, see more details in log file: %s" % log_file.name + assert ret.returncode == 0, ( + "Failed to init 0gchain genesis, see more details in log file: %s" + % log_file.name + ) + class ZGNode(BlockchainNode): def __init__( @@ -61,19 +76,27 @@ class ZGNode(BlockchainNode): self.config_file = None self.args = [ - binary, "start", - "--home", data_dir, + binary, + "start", + "--home", + data_dir, # overwrite json rpc http port: 8545 - "--json-rpc.address", "127.0.0.1:%s" % blockchain_rpc_port(index), + "--json-rpc.address", + "127.0.0.1:%s" % blockchain_rpc_port(index), # overwrite json rpc ws port: 8546 - "--json-rpc.ws-address", "127.0.0.1:%s" % blockchain_ws_port(index), + "--json-rpc.ws-address", + "127.0.0.1:%s" % blockchain_ws_port(index), # overwrite p2p port: 26656 - "--p2p.laddr", "tcp://127.0.0.1:%s" % blockchain_p2p_port(index), + "--p2p.laddr", + "tcp://127.0.0.1:%s" % blockchain_p2p_port(index), # overwrite rpc port: 26657 - "--rpc.laddr", "tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index), + "--rpc.laddr", + "tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index), # overwrite pprof port: 6060 - "--rpc.pprof_laddr", "127.0.0.1:%s" % pprof_port(index), - "--log_level", "debug" + "--rpc.pprof_laddr", + "127.0.0.1:%s" % pprof_port(index), + "--log_level", + "debug", ] for k, v in updated_config.items(): @@ -82,4 +105,4 @@ class ZGNode(BlockchainNode): self.args.append(str(v)) def setup_config(self): - """ Already batch initialized by shell script in framework """ + """Already batch initialized by shell script in framework""" diff --git a/tests/test_framework/zgs_node.py b/tests/test_framework/zgs_node.py index fa2bd31..1decae5 100644 --- a/tests/test_framework/zgs_node.py +++ b/tests/test_framework/zgs_node.py @@ -11,6 +11,7 @@ from utility.utils import ( blockchain_rpc_port, ) + class ZgsNode(TestNode): def __init__( self, @@ -98,17 +99,21 @@ class ZgsNode(TestNode): def zgs_download_segment(self, data_root, start_index, end_index): return self.rpc.zgs_downloadSegment([data_root, start_index, end_index]) - - def zgs_download_segment_decoded(self, data_root: str, start_chunk_index: int, end_chunk_index: int) -> bytes: - encodedSegment = self.rpc.zgs_downloadSegment([data_root, start_chunk_index, end_chunk_index]) + + def zgs_download_segment_decoded( + self, data_root: str, start_chunk_index: int, end_chunk_index: int + ) -> bytes: + encodedSegment = self.rpc.zgs_downloadSegment( + [data_root, start_chunk_index, end_chunk_index] + ) return None if encodedSegment is None else base64.b64decode(encodedSegment) def zgs_get_file_info(self, data_root): - return self.rpc.zgs_getFileInfo([data_root]) + return self.rpc.zgs_getFileInfo([data_root, True]) def zgs_get_file_info_by_tx_seq(self, tx_seq): return self.rpc.zgs_getFileInfoByTxSeq([tx_seq]) - + def zgs_get_flow_context(self, tx_seq): return self.rpc.zgs_getFlowContext([tx_seq]) @@ -118,9 +123,13 @@ class ZgsNode(TestNode): def admin_start_sync_file(self, tx_seq): return self.rpc.admin_startSyncFile([tx_seq]) - - def admin_start_sync_chunks(self, tx_seq: int, start_chunk_index: int, end_chunk_index: int): - return self.rpc.admin_startSyncChunks([tx_seq, start_chunk_index, end_chunk_index]) + + def admin_start_sync_chunks( + self, tx_seq: int, start_chunk_index: int, end_chunk_index: int + ): + return self.rpc.admin_startSyncChunks( + [tx_seq, start_chunk_index, end_chunk_index] + ) def admin_get_sync_status(self, tx_seq): return self.rpc.admin_getSyncStatus([tx_seq]) @@ -128,8 +137,8 @@ class ZgsNode(TestNode): def sync_status_is_completed_or_unknown(self, tx_seq): status = self.rpc.admin_getSyncStatus([tx_seq]) return status == "Completed" or status == "unknown" - - def admin_get_file_location(self, tx_seq, all_shards = True): + + def admin_get_file_location(self, tx_seq, all_shards=True): return self.rpc.admin_getFileLocation([tx_seq, all_shards]) def clean_data(self): diff --git a/tests/utility/build_binary.py b/tests/utility/build_binary.py index c095917..ba7a813 100644 --- a/tests/utility/build_binary.py +++ b/tests/utility/build_binary.py @@ -9,31 +9,40 @@ from enum import Enum, unique from utility.utils import is_windows_platform, wait_until # v1.0.0-ci release -GITHUB_DOWNLOAD_URL="https://api.github.com/repos/0glabs/0g-storage-node/releases/152560136" +GITHUB_DOWNLOAD_URL = ( + "https://api.github.com/repos/0glabs/0g-storage-node/releases/152560136" +) CONFLUX_BINARY = "conflux.exe" if is_windows_platform() else "conflux" BSC_BINARY = "geth.exe" if is_windows_platform() else "geth" ZG_BINARY = "0gchaind.exe" if is_windows_platform() else "0gchaind" -CLIENT_BINARY = "0g-storage-client.exe" if is_windows_platform() else "0g-storage-client" +CLIENT_BINARY = ( + "0g-storage-client.exe" if is_windows_platform() else "0g-storage-client" +) CLI_GIT_REV = "98d74b7e7e6084fc986cb43ce2c66692dac094a6" + @unique class BuildBinaryResult(Enum): AlreadyExists = 0 Installed = 1 NotInstalled = 2 + def build_conflux(dir: str) -> BuildBinaryResult: # Download or build conflux binary if absent result = __download_from_github( dir=dir, binary_name=CONFLUX_BINARY, - github_url=GITHUB_DOWNLOAD_URL, + github_url=GITHUB_DOWNLOAD_URL, asset_name=__asset_name(CONFLUX_BINARY, zip=True), ) - if result == BuildBinaryResult.AlreadyExists or result == BuildBinaryResult.Installed: + if ( + result == BuildBinaryResult.AlreadyExists + or result == BuildBinaryResult.Installed + ): return result return __build_from_github( @@ -44,6 +53,7 @@ def build_conflux(dir: str) -> BuildBinaryResult: compiled_relative_path=["target", "release"], ) + def build_bsc(dir: str) -> BuildBinaryResult: # Download bsc binary if absent result = __download_from_github( @@ -55,10 +65,13 @@ def build_bsc(dir: str) -> BuildBinaryResult: # Requires to download binary successfully, since it is not ready to build # binary from source code. - assert result != BuildBinaryResult.NotInstalled, "Cannot download binary from github [%s]" % BSC_BINARY + assert result != BuildBinaryResult.NotInstalled, ( + "Cannot download binary from github [%s]" % BSC_BINARY + ) return result + def build_zg(dir: str) -> BuildBinaryResult: # Download or build 0gchain binary if absent result = __download_from_github( @@ -68,7 +81,10 @@ def build_zg(dir: str) -> BuildBinaryResult: asset_name=__asset_name(ZG_BINARY, zip=True), ) - if result == BuildBinaryResult.AlreadyExists or result == BuildBinaryResult.Installed: + if ( + result == BuildBinaryResult.AlreadyExists + or result == BuildBinaryResult.Installed + ): return result return __build_from_github( @@ -79,17 +95,18 @@ def build_zg(dir: str) -> BuildBinaryResult: compiled_relative_path=[], ) + def build_cli(dir: str) -> BuildBinaryResult: # Build 0g-storage-client binary if absent return __build_from_github( dir=dir, binary_name=CLIENT_BINARY, github_url="https://github.com/0glabs/0g-storage-client.git", - git_rev=CLI_GIT_REV, build_cmd="go build", compiled_relative_path=[], ) + def __asset_name(binary_name: str, zip: bool = False) -> str: sys = platform.system().lower() if sys == "linux": @@ -102,24 +119,34 @@ def __asset_name(binary_name: str, zip: bool = False) -> str: else: raise RuntimeError("Unable to recognize platform") -def __build_from_github(dir: str, binary_name: str, github_url: str, build_cmd: str, compiled_relative_path: list[str], git_rev = None) -> BuildBinaryResult: + +def __build_from_github( + dir: str, + binary_name: str, + github_url: str, + build_cmd: str, + compiled_relative_path: list[str], + git_rev=None, +) -> BuildBinaryResult: if git_rev is None: versioned_binary_name = binary_name elif binary_name.endswith(".exe"): versioned_binary_name = binary_name.removesuffix(".exe") + f"_{git_rev}.exe" else: versioned_binary_name = f"{binary_name}_{git_rev}" - + binary_path = os.path.join(dir, binary_name) versioned_binary_path = os.path.join(dir, versioned_binary_name) if os.path.exists(versioned_binary_path): __create_sym_link(versioned_binary_name, binary_name, dir) return BuildBinaryResult.AlreadyExists - + start_time = time.time() - + # clone code from github to a temp folder - code_tmp_dir_name = (binary_name[:-4] if is_windows_platform() else binary_name) + "_tmp" + code_tmp_dir_name = ( + binary_name[:-4] if is_windows_platform() else binary_name + ) + "_tmp" code_tmp_dir = os.path.join(dir, code_tmp_dir_name) if os.path.exists(code_tmp_dir): shutil.rmtree(code_tmp_dir) @@ -145,14 +172,22 @@ def __build_from_github(dir: str, binary_name: str, github_url: str, build_cmd: shutil.rmtree(code_tmp_dir, ignore_errors=True) - print("Completed to build binary " + binary_name + ", Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True) + print( + "Completed to build binary " + + binary_name + + ", Elapsed: " + + str(int(time.time() - start_time)) + + " seconds", + flush=True, + ) return BuildBinaryResult.Installed -def __create_sym_link(src, dst, path = None): + +def __create_sym_link(src, dst, path=None): if src == dst: return - + origin_path = os.getcwd() if path is not None: os.chdir(path) @@ -171,16 +206,19 @@ def __create_sym_link(src, dst, path = None): os.chdir(origin_path) -def __download_from_github(dir: str, binary_name: str, github_url: str, asset_name: str) -> BuildBinaryResult: + +def __download_from_github( + dir: str, binary_name: str, github_url: str, asset_name: str +) -> BuildBinaryResult: if not os.path.exists(dir): os.makedirs(dir, exist_ok=True) binary_path = os.path.join(dir, binary_name) if os.path.exists(binary_path): return BuildBinaryResult.AlreadyExists - + print("Begin to download binary from github: %s" % binary_name, flush=True) - + start_time = time.time() req = requests.get(github_url) @@ -194,7 +232,7 @@ def __download_from_github(dir: str, binary_name: str, github_url: str, asset_na if download_url is None: print(f"Cannot find asset by name {asset_name}", flush=True) return BuildBinaryResult.NotInstalled - + content = requests.get(download_url).content # Supports to read from zipped binary @@ -203,17 +241,24 @@ def __download_from_github(dir: str, binary_name: str, github_url: str, asset_na with open(asset_path, "xb") as f: f.write(content) shutil.unpack_archive(asset_path, dir) - assert os.path.exists(binary_path), f"Cannot find binary after unzip, binary = {binary_name}, asset = {asset_name}" + assert os.path.exists( + binary_path + ), f"Cannot find binary after unzip, binary = {binary_name}, asset = {asset_name}" else: with open(binary_path, "xb") as f: - f.write(content) + f.write(content) if not is_windows_platform(): st = os.stat(binary_path) os.chmod(binary_path, st.st_mode | stat.S_IEXEC) - + wait_until(lambda: os.access(binary_path, os.X_OK), timeout=120) - print("Completed to download binary, Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True) + print( + "Completed to download binary, Elapsed: " + + str(int(time.time() - start_time)) + + " seconds", + flush=True, + ) return BuildBinaryResult.Installed diff --git a/tests/utility/merkle_tree.py b/tests/utility/merkle_tree.py index 910ebd1..832d2cf 100644 --- a/tests/utility/merkle_tree.py +++ b/tests/utility/merkle_tree.py @@ -162,15 +162,20 @@ class MerkleTree: n = len(data) if n < ENTRY_SIZE or (n & (n - 1)) != 0: raise Exception("Input length is not power of 2") - - leaves = [Leaf.from_data(data[i:i + ENTRY_SIZE], tree.hasher) for i in range(0, n, ENTRY_SIZE)] + + leaves = [ + Leaf.from_data(data[i : i + ENTRY_SIZE], tree.hasher) + for i in range(0, n, ENTRY_SIZE) + ] tree.__leaves = leaves nodes = leaves while len(nodes) > 1: next_nodes = [] for i in range(0, len(nodes), 2): - next_nodes.append(Node.from_children(nodes[i], nodes[i+1], tree.hasher)) + next_nodes.append( + Node.from_children(nodes[i], nodes[i + 1], tree.hasher) + ) nodes = next_nodes diff --git a/tests/utility/run_all.py b/tests/utility/run_all.py index 9ce3dee..b00305b 100644 --- a/tests/utility/run_all.py +++ b/tests/utility/run_all.py @@ -12,8 +12,20 @@ DEFAULT_PORT_MIN = 11000 DEFAULT_PORT_MAX = 65535 DEFAULT_PORT_RANGE = 500 + def print_testcase_result(color, glyph, script, start_time): - print(color[1] + glyph + " Testcase " + script + "\telapsed: " + str(int(time.time() - start_time)) + " seconds" + color[0], flush=True) + print( + color[1] + + glyph + + " Testcase " + + script + + "\telapsed: " + + str(int(time.time() - start_time)) + + " seconds" + + color[0], + flush=True, + ) + def run_single_test(py, script, test_dir, index, port_min, port_max): try: @@ -58,7 +70,14 @@ def run_single_test(py, script, test_dir, index, port_min, port_max): raise err print_testcase_result(BLUE, TICK, script, start_time) -def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, long_manual_tests: set[str]={}, single_run_tests: set[str]={}): + +def run_all( + test_dir: str, + test_subdirs: list[str] = [], + slow_tests: set[str] = {}, + long_manual_tests: set[str] = {}, + single_run_tests: set[str] = {}, +): tmp_dir = os.path.join(test_dir, "tmp") if not os.path.exists(tmp_dir): os.makedirs(tmp_dir, exist_ok=True) @@ -102,7 +121,11 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, for file in os.listdir(subdir_path): if file.endswith("_test.py"): rel_path = os.path.join(subdir, file) - if rel_path not in slow_tests and rel_path not in long_manual_tests and rel_path not in single_run_tests: + if ( + rel_path not in slow_tests + and rel_path not in long_manual_tests + and rel_path not in single_run_tests + ): TEST_SCRIPTS.append(rel_path) executor = ProcessPoolExecutor(max_workers=options.max_workers) @@ -154,4 +177,3 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, for c in failed: print(c) sys.exit(1) - diff --git a/tests/utility/simple_rpc_proxy.py b/tests/utility/simple_rpc_proxy.py index 5b0628b..931ed52 100644 --- a/tests/utility/simple_rpc_proxy.py +++ b/tests/utility/simple_rpc_proxy.py @@ -25,7 +25,13 @@ class RpcCaller: if isinstance(parsed, Ok): return parsed.result else: - print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed)) + print( + "Failed to call RPC, method = %s(%s), error = %s" + % (self.method, str(*args)[-1500:], parsed) + ) except Exception as ex: - print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex)) + print( + "Failed to call RPC, method = %s(%s), exception = %s" + % (self.method, str(*args)[-1500:], ex) + ) return None diff --git a/tests/utility/spec.py b/tests/utility/spec.py index 40954b8..f86b755 100644 --- a/tests/utility/spec.py +++ b/tests/utility/spec.py @@ -1,2 +1,2 @@ ENTRY_SIZE = 256 -PORA_CHUNK_SIZE = 1024 \ No newline at end of file +PORA_CHUNK_SIZE = 1024 diff --git a/tests/utility/submission.py b/tests/utility/submission.py index 724a24f..4028b6e 100644 --- a/tests/utility/submission.py +++ b/tests/utility/submission.py @@ -5,6 +5,7 @@ from math import log2 from utility.merkle_tree import add_0x_prefix, Leaf, MerkleTree from utility.spec import ENTRY_SIZE, PORA_CHUNK_SIZE + def log2_pow2(n): return int(log2(((n ^ (n - 1)) >> 1) + 1)) @@ -106,29 +107,27 @@ def create_segment_node(data, offset, batch, size): else: tree.add_leaf(Leaf(segment_root(data[start:end]))) - return tree.get_root_hash() - segment_root_cached_chunks = None segment_root_cached_output = None + + def segment_root(chunks): global segment_root_cached_chunks, segment_root_cached_output if segment_root_cached_chunks == chunks: return segment_root_cached_output - data_len = len(chunks) if data_len == 0: return b"\x00" * 32 - tree = MerkleTree() for i in range(0, data_len, ENTRY_SIZE): tree.encrypt(chunks[i : i + ENTRY_SIZE]) - + digest = tree.get_root_hash() segment_root_cached_chunks = chunks @@ -241,4 +240,4 @@ def data_to_segments(data): segments.append(segment) idx += 1 - return segments \ No newline at end of file + return segments diff --git a/tests/utility/utils.py b/tests/utility/utils.py index c5e877f..a2f0a75 100644 --- a/tests/utility/utils.py +++ b/tests/utility/utils.py @@ -5,6 +5,7 @@ import rtoml import time import sha3 + class PortMin: # Must be initialized with a unique integer for each process n = 11000 @@ -35,15 +36,19 @@ def blockchain_rpc_port(n): def blockchain_rpc_port_core(n): return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n + def blockchain_ws_port(n): return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n + def blockchain_rpc_port_tendermint(n): return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n + def pprof_port(n): return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n + def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None): if attempts == float("inf") and timeout == float("inf"): timeout = 60 @@ -135,11 +140,12 @@ def assert_greater_than_or_equal(thing1, thing2): if thing1 < thing2: raise AssertionError("%s < %s" % (str(thing1), str(thing2))) -# 14900K has the performance point 100 + +# 14900K has the performance point 100 def estimate_st_performance(): hasher = sha3.keccak_256() - input = b"\xcc" * (1<<26) + input = b"\xcc" * (1 << 26) start_time = time.perf_counter() hasher.update(input) digest = hasher.hexdigest() - return 10 / (time.perf_counter() - start_time) \ No newline at end of file + return 10 / (time.perf_counter() - start_time)