Compare commits

...

4 Commits

Author SHA1 Message Date
peilun-conflux
b1601f1b1a
Merge deaf224d0f into 052d2d781b 2024-09-08 08:38:56 +08:00
Bo QIU
052d2d781b
Change zg chain block time in python tests (#181)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Change zg chain block time in python tests
2024-09-08 08:38:53 +08:00
bruno-valante
52878b6709
Fix recall range computation for sharding (#186)
* Fix recall range computation for sharding

* cargo fmt
2024-09-08 08:37:03 +08:00
Joel Liu
43c2d5f788
Add configuration which can force sync from start_block_number (#183)
* queyr logs via LogQuery in wath loop
2024-09-08 08:34:59 +08:00
10 changed files with 61 additions and 11 deletions

View File

@ -32,6 +32,8 @@ pub struct LogSyncConfig {
pub remove_finalized_block_interval_minutes: u64, pub remove_finalized_block_interval_minutes: u64,
// watch_loop (eth_getLogs) trigger interval // watch_loop (eth_getLogs) trigger interval
pub watch_loop_wait_time_ms: u64, pub watch_loop_wait_time_ms: u64,
// force to sync log from start block number
pub force_log_sync_from_start_block_number: bool,
} }
#[derive(Clone)] #[derive(Clone)]
@ -58,6 +60,7 @@ impl LogSyncConfig {
default_finalized_block_count: u64, default_finalized_block_count: u64,
remove_finalized_block_interval_minutes: u64, remove_finalized_block_interval_minutes: u64,
watch_loop_wait_time_ms: u64, watch_loop_wait_time_ms: u64,
force_log_sync_from_start_block_number: bool,
) -> Self { ) -> Self {
Self { Self {
rpc_endpoint_url, rpc_endpoint_url,
@ -73,6 +76,7 @@ impl LogSyncConfig {
default_finalized_block_count, default_finalized_block_count,
remove_finalized_block_interval_minutes, remove_finalized_block_interval_minutes,
watch_loop_wait_time_ms, watch_loop_wait_time_ms,
force_log_sync_from_start_block_number,
} }
} }
} }

View File

@ -5,7 +5,7 @@ use anyhow::{anyhow, bail, Result};
use ethereum_types::H256; use ethereum_types::H256;
use ethers::{prelude::Middleware, types::BlockNumber}; use ethers::{prelude::Middleware, types::BlockNumber};
use futures::FutureExt; use futures::FutureExt;
use jsonrpsee::tracing::{debug, error, trace, warn}; use jsonrpsee::tracing::{debug, error, warn};
use shared_types::{ChunkArray, Transaction}; use shared_types::{ChunkArray, Transaction};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::Debug; use std::fmt::Debug;
@ -305,7 +305,7 @@ impl LogSyncManager {
}; };
while let Some(data) = rx.recv().await { while let Some(data) = rx.recv().await {
trace!("handle_data: data={:?}", data); debug!("handle_data: data={:?}", data);
match data { match data {
LogFetchProgress::SyncedBlock(( LogFetchProgress::SyncedBlock((
block_number, block_number,
@ -472,6 +472,15 @@ impl LogSyncManager {
async fn get_start_block_number_with_hash( async fn get_start_block_number_with_hash(
log_sync_manager: &LogSyncManager, log_sync_manager: &LogSyncManager,
) -> Result<(u64, H256), anyhow::Error> { ) -> Result<(u64, H256), anyhow::Error> {
if log_sync_manager
.config
.force_log_sync_from_start_block_number
{
let block_number = log_sync_manager.config.start_block_number;
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
return Ok((block_number, block_hash));
}
if let Some(block_number) = log_sync_manager.store.get_log_latest_block_number()? { if let Some(block_number) = log_sync_manager.store.get_log_latest_block_number()? {
if let Some(Some(val)) = log_sync_manager if let Some(Some(val)) = log_sync_manager
.block_hash_cache .block_hash_cache

View File

@ -75,7 +75,11 @@ impl MineRangeConfig {
let minable_length = let minable_length =
(context.flow_length.as_u64() / SECTORS_PER_LOAD as u64) * SECTORS_PER_LOAD as u64; (context.flow_length.as_u64() / SECTORS_PER_LOAD as u64) * SECTORS_PER_LOAD as u64;
let mining_length = std::cmp::min(minable_length, SECTORS_PER_MAX_MINING_RANGE as u64); let num_shards = 1u64 << self.shard_config.miner_shard_mask().count_zeros();
let mining_length = std::cmp::min(
minable_length,
(SECTORS_PER_MAX_MINING_RANGE as u64).saturating_mul(num_shards),
);
let start_position = std::cmp::min(self_start_position, minable_length - mining_length); let start_position = std::cmp::min(self_start_position, minable_length - mining_length);
let start_position = let start_position =

View File

@ -156,6 +156,7 @@ impl ZgsConfig {
self.default_finalized_block_count, self.default_finalized_block_count,
self.remove_finalized_block_interval_minutes, self.remove_finalized_block_interval_minutes,
self.watch_loop_wait_time_ms, self.watch_loop_wait_time_ms,
self.force_log_sync_from_start_block_number,
)) ))
} }

View File

@ -33,6 +33,7 @@ build_config! {
(blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string()) (blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string())
(log_contract_address, (String), "".to_string()) (log_contract_address, (String), "".to_string())
(log_sync_start_block_number, (u64), 0) (log_sync_start_block_number, (u64), 0)
(force_log_sync_from_start_block_number, (bool), false)
(confirmation_block_count, (u64), 3) (confirmation_block_count, (u64), 3)
(log_page_size, (u64), 999) (log_page_size, (u64), 999)
(max_cache_data_size, (usize), 100 * 1024 * 1024) // 100 MB (max_cache_data_size, (usize), 100 * 1024 * 1024) // 100 MB

View File

@ -62,6 +62,15 @@ for ((i=0; i<$NUM_NODES; i++)) do
CONFIG_TOML=$ROOT_DIR/node$i/config/config.toml CONFIG_TOML=$ROOT_DIR/node$i/config/config.toml
sed -i '/seeds = /c\seeds = ""' $CONFIG_TOML sed -i '/seeds = /c\seeds = ""' $CONFIG_TOML
sed -i 's/addr_book_strict = true/addr_book_strict = false/' $CONFIG_TOML sed -i 's/addr_book_strict = true/addr_book_strict = false/' $CONFIG_TOML
# Change block time to very small
sed -i '/timeout_propose = "3s"/c\timeout_propose = "300ms"' $CONFIG_TOML
sed -i '/timeout_propose_delta = "500ms"/c\timeout_propose_delta = "50ms"' $CONFIG_TOML
sed -i '/timeout_prevote = "1s"/c\timeout_prevote = "100ms"' $CONFIG_TOML
sed -i '/timeout_prevote_delta = "500ms"/c\timeout_prevote_delta = "50ms"' $CONFIG_TOML
sed -i '/timeout_precommit = "1s"/c\timeout_precommit = "100ms"' $CONFIG_TOML
sed -i '/timeout_precommit_delta = "500ms"/c\timeout_precommit_delta = "50ms"' $CONFIG_TOML
sed -i '/timeout_commit = "5s"/c\timeout_commit = "500ms"' $CONFIG_TOML
done done
# Update persistent_peers in config.toml # Update persistent_peers in config.toml

View File

@ -7,6 +7,7 @@ from utility.run_all import run_all
if __name__ == "__main__": if __name__ == "__main__":
run_all( run_all(
test_dir = os.path.dirname(__file__), test_dir = os.path.dirname(__file__),
slow_tests={"random_test.py", "same_root_test.py"}, slow_tests={"mine_test.py", "random_test.py", "same_root_test.py"},
long_manual_tests={"fuzz_test.py"}, long_manual_tests={"fuzz_test.py"},
single_run_tests={"mine_with_market_test.py"},
) )

View File

@ -167,8 +167,8 @@ class TestFramework:
# sync_blocks(self.blockchain_nodes) # sync_blocks(self.blockchain_nodes)
elif self.blockchain_node_type == BlockChainNodeType.ZG: elif self.blockchain_node_type == BlockChainNodeType.ZG:
# wait for the first block # wait for the first block
self.log.debug("Wait 3 seconds for 0gchain node to generate first block") self.log.debug("Wait for 0gchain node to generate first block")
time.sleep(3) time.sleep(0.5)
for node in self.blockchain_nodes: for node in self.blockchain_nodes:
wait_until(lambda: node.net_peerCount() == self.num_blockchain_nodes - 1) wait_until(lambda: node.net_peerCount() == self.num_blockchain_nodes - 1)
wait_until(lambda: node.eth_blockNumber() is not None) wait_until(lambda: node.eth_blockNumber() is not None)

View File

@ -50,11 +50,15 @@ def run_single_test(py, script, test_dir, index, port_min, port_max):
) )
except subprocess.CalledProcessError as err: except subprocess.CalledProcessError as err:
print_testcase_result(RED, CROSS, script, start_time) print_testcase_result(RED, CROSS, script, start_time)
print("Output of " + script + "\n" + err.output.decode("utf-8"), flush=True) try:
print("Output of " + script + "\n" + err.output.decode("utf-8"), flush=True)
except UnicodeDecodeError:
print("Output of " + script + "\n", flush=True)
print(err.output)
raise err raise err
print_testcase_result(BLUE, TICK, script, start_time) print_testcase_result(BLUE, TICK, script, start_time)
def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, long_manual_tests: set[str]={}): def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, long_manual_tests: set[str]={}, single_run_tests: set[str]={}):
tmp_dir = os.path.join(test_dir, "tmp") tmp_dir = os.path.join(test_dir, "tmp")
if not os.path.exists(tmp_dir): if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir, exist_ok=True) os.makedirs(tmp_dir, exist_ok=True)
@ -98,7 +102,7 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
for file in os.listdir(subdir_path): for file in os.listdir(subdir_path):
if file.endswith("_test.py"): if file.endswith("_test.py"):
rel_path = os.path.join(subdir, file) rel_path = os.path.join(subdir, file)
if rel_path not in slow_tests and rel_path not in long_manual_tests: if rel_path not in slow_tests and rel_path not in long_manual_tests and rel_path not in single_run_tests:
TEST_SCRIPTS.append(rel_path) TEST_SCRIPTS.append(rel_path)
executor = ProcessPoolExecutor(max_workers=options.max_workers) executor = ProcessPoolExecutor(max_workers=options.max_workers)
@ -131,6 +135,18 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
print("CalledProcessError " + repr(err)) print("CalledProcessError " + repr(err))
failed.add(script) failed.add(script)
# Run single tests one by one
for script in single_run_tests:
f = executor.submit(
run_single_test, py, script, test_dir, i, options.port_min, options.port_max
)
try:
f.result()
except subprocess.CalledProcessError as err:
print("CalledProcessError " + repr(err))
failed.add(script)
i += 1
print("Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True) print("Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True)
if len(failed) > 0: if len(failed) > 0:

View File

@ -194,9 +194,14 @@ def generate_merkle_tree_by_batch(data):
def submit_data(client, data): def submit_data(client, data):
shard_config = client.rpc.zgs_getShardConfig()
shard_id = int(shard_config["shardId"])
num_shard = int(shard_config["numShard"])
segments = data_to_segments(data) segments = data_to_segments(data)
for segment in segments: for index, segment in enumerate(segments):
client.zgs_upload_segment(segment) if index % num_shard == shard_id:
client.zgs_upload_segment(segment)
return segments return segments