mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Compare commits
4 Commits
5c2e3fcfbe
...
b1601f1b1a
Author | SHA1 | Date | |
---|---|---|---|
|
b1601f1b1a | ||
|
052d2d781b | ||
|
52878b6709 | ||
|
43c2d5f788 |
@ -32,6 +32,8 @@ pub struct LogSyncConfig {
|
||||
pub remove_finalized_block_interval_minutes: u64,
|
||||
// watch_loop (eth_getLogs) trigger interval
|
||||
pub watch_loop_wait_time_ms: u64,
|
||||
// force to sync log from start block number
|
||||
pub force_log_sync_from_start_block_number: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -58,6 +60,7 @@ impl LogSyncConfig {
|
||||
default_finalized_block_count: u64,
|
||||
remove_finalized_block_interval_minutes: u64,
|
||||
watch_loop_wait_time_ms: u64,
|
||||
force_log_sync_from_start_block_number: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
rpc_endpoint_url,
|
||||
@ -73,6 +76,7 @@ impl LogSyncConfig {
|
||||
default_finalized_block_count,
|
||||
remove_finalized_block_interval_minutes,
|
||||
watch_loop_wait_time_ms,
|
||||
force_log_sync_from_start_block_number,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use anyhow::{anyhow, bail, Result};
|
||||
use ethereum_types::H256;
|
||||
use ethers::{prelude::Middleware, types::BlockNumber};
|
||||
use futures::FutureExt;
|
||||
use jsonrpsee::tracing::{debug, error, trace, warn};
|
||||
use jsonrpsee::tracing::{debug, error, warn};
|
||||
use shared_types::{ChunkArray, Transaction};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::Debug;
|
||||
@ -305,7 +305,7 @@ impl LogSyncManager {
|
||||
};
|
||||
|
||||
while let Some(data) = rx.recv().await {
|
||||
trace!("handle_data: data={:?}", data);
|
||||
debug!("handle_data: data={:?}", data);
|
||||
match data {
|
||||
LogFetchProgress::SyncedBlock((
|
||||
block_number,
|
||||
@ -472,6 +472,15 @@ impl LogSyncManager {
|
||||
async fn get_start_block_number_with_hash(
|
||||
log_sync_manager: &LogSyncManager,
|
||||
) -> Result<(u64, H256), anyhow::Error> {
|
||||
if log_sync_manager
|
||||
.config
|
||||
.force_log_sync_from_start_block_number
|
||||
{
|
||||
let block_number = log_sync_manager.config.start_block_number;
|
||||
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
|
||||
return Ok((block_number, block_hash));
|
||||
}
|
||||
|
||||
if let Some(block_number) = log_sync_manager.store.get_log_latest_block_number()? {
|
||||
if let Some(Some(val)) = log_sync_manager
|
||||
.block_hash_cache
|
||||
|
@ -75,7 +75,11 @@ impl MineRangeConfig {
|
||||
let minable_length =
|
||||
(context.flow_length.as_u64() / SECTORS_PER_LOAD as u64) * SECTORS_PER_LOAD as u64;
|
||||
|
||||
let mining_length = std::cmp::min(minable_length, SECTORS_PER_MAX_MINING_RANGE as u64);
|
||||
let num_shards = 1u64 << self.shard_config.miner_shard_mask().count_zeros();
|
||||
let mining_length = std::cmp::min(
|
||||
minable_length,
|
||||
(SECTORS_PER_MAX_MINING_RANGE as u64).saturating_mul(num_shards),
|
||||
);
|
||||
|
||||
let start_position = std::cmp::min(self_start_position, minable_length - mining_length);
|
||||
let start_position =
|
||||
|
@ -156,6 +156,7 @@ impl ZgsConfig {
|
||||
self.default_finalized_block_count,
|
||||
self.remove_finalized_block_interval_minutes,
|
||||
self.watch_loop_wait_time_ms,
|
||||
self.force_log_sync_from_start_block_number,
|
||||
))
|
||||
}
|
||||
|
||||
|
@ -33,6 +33,7 @@ build_config! {
|
||||
(blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string())
|
||||
(log_contract_address, (String), "".to_string())
|
||||
(log_sync_start_block_number, (u64), 0)
|
||||
(force_log_sync_from_start_block_number, (bool), false)
|
||||
(confirmation_block_count, (u64), 3)
|
||||
(log_page_size, (u64), 999)
|
||||
(max_cache_data_size, (usize), 100 * 1024 * 1024) // 100 MB
|
||||
|
@ -62,6 +62,15 @@ for ((i=0; i<$NUM_NODES; i++)) do
|
||||
CONFIG_TOML=$ROOT_DIR/node$i/config/config.toml
|
||||
sed -i '/seeds = /c\seeds = ""' $CONFIG_TOML
|
||||
sed -i 's/addr_book_strict = true/addr_book_strict = false/' $CONFIG_TOML
|
||||
|
||||
# Change block time to very small
|
||||
sed -i '/timeout_propose = "3s"/c\timeout_propose = "300ms"' $CONFIG_TOML
|
||||
sed -i '/timeout_propose_delta = "500ms"/c\timeout_propose_delta = "50ms"' $CONFIG_TOML
|
||||
sed -i '/timeout_prevote = "1s"/c\timeout_prevote = "100ms"' $CONFIG_TOML
|
||||
sed -i '/timeout_prevote_delta = "500ms"/c\timeout_prevote_delta = "50ms"' $CONFIG_TOML
|
||||
sed -i '/timeout_precommit = "1s"/c\timeout_precommit = "100ms"' $CONFIG_TOML
|
||||
sed -i '/timeout_precommit_delta = "500ms"/c\timeout_precommit_delta = "50ms"' $CONFIG_TOML
|
||||
sed -i '/timeout_commit = "5s"/c\timeout_commit = "500ms"' $CONFIG_TOML
|
||||
done
|
||||
|
||||
# Update persistent_peers in config.toml
|
||||
|
@ -7,6 +7,7 @@ from utility.run_all import run_all
|
||||
if __name__ == "__main__":
|
||||
run_all(
|
||||
test_dir = os.path.dirname(__file__),
|
||||
slow_tests={"random_test.py", "same_root_test.py"},
|
||||
slow_tests={"mine_test.py", "random_test.py", "same_root_test.py"},
|
||||
long_manual_tests={"fuzz_test.py"},
|
||||
single_run_tests={"mine_with_market_test.py"},
|
||||
)
|
@ -167,8 +167,8 @@ class TestFramework:
|
||||
# sync_blocks(self.blockchain_nodes)
|
||||
elif self.blockchain_node_type == BlockChainNodeType.ZG:
|
||||
# wait for the first block
|
||||
self.log.debug("Wait 3 seconds for 0gchain node to generate first block")
|
||||
time.sleep(3)
|
||||
self.log.debug("Wait for 0gchain node to generate first block")
|
||||
time.sleep(0.5)
|
||||
for node in self.blockchain_nodes:
|
||||
wait_until(lambda: node.net_peerCount() == self.num_blockchain_nodes - 1)
|
||||
wait_until(lambda: node.eth_blockNumber() is not None)
|
||||
|
@ -50,11 +50,15 @@ def run_single_test(py, script, test_dir, index, port_min, port_max):
|
||||
)
|
||||
except subprocess.CalledProcessError as err:
|
||||
print_testcase_result(RED, CROSS, script, start_time)
|
||||
print("Output of " + script + "\n" + err.output.decode("utf-8"), flush=True)
|
||||
try:
|
||||
print("Output of " + script + "\n" + err.output.decode("utf-8"), flush=True)
|
||||
except UnicodeDecodeError:
|
||||
print("Output of " + script + "\n", flush=True)
|
||||
print(err.output)
|
||||
raise err
|
||||
print_testcase_result(BLUE, TICK, script, start_time)
|
||||
|
||||
def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, long_manual_tests: set[str]={}):
|
||||
def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, long_manual_tests: set[str]={}, single_run_tests: set[str]={}):
|
||||
tmp_dir = os.path.join(test_dir, "tmp")
|
||||
if not os.path.exists(tmp_dir):
|
||||
os.makedirs(tmp_dir, exist_ok=True)
|
||||
@ -98,7 +102,7 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
|
||||
for file in os.listdir(subdir_path):
|
||||
if file.endswith("_test.py"):
|
||||
rel_path = os.path.join(subdir, file)
|
||||
if rel_path not in slow_tests and rel_path not in long_manual_tests:
|
||||
if rel_path not in slow_tests and rel_path not in long_manual_tests and rel_path not in single_run_tests:
|
||||
TEST_SCRIPTS.append(rel_path)
|
||||
|
||||
executor = ProcessPoolExecutor(max_workers=options.max_workers)
|
||||
@ -131,6 +135,18 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
|
||||
print("CalledProcessError " + repr(err))
|
||||
failed.add(script)
|
||||
|
||||
# Run single tests one by one
|
||||
for script in single_run_tests:
|
||||
f = executor.submit(
|
||||
run_single_test, py, script, test_dir, i, options.port_min, options.port_max
|
||||
)
|
||||
try:
|
||||
f.result()
|
||||
except subprocess.CalledProcessError as err:
|
||||
print("CalledProcessError " + repr(err))
|
||||
failed.add(script)
|
||||
i += 1
|
||||
|
||||
print("Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True)
|
||||
|
||||
if len(failed) > 0:
|
||||
|
@ -194,9 +194,14 @@ def generate_merkle_tree_by_batch(data):
|
||||
|
||||
|
||||
def submit_data(client, data):
|
||||
shard_config = client.rpc.zgs_getShardConfig()
|
||||
shard_id = int(shard_config["shardId"])
|
||||
num_shard = int(shard_config["numShard"])
|
||||
|
||||
segments = data_to_segments(data)
|
||||
for segment in segments:
|
||||
client.zgs_upload_segment(segment)
|
||||
for index, segment in enumerate(segments):
|
||||
if index % num_shard == shard_id:
|
||||
client.zgs_upload_segment(segment)
|
||||
return segments
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user