add api of getting available file info by root (#357)

* add api of getting available file info by root
This commit is contained in:
0g-peterzhb 2025-03-24 16:51:53 +08:00 committed by GitHub
parent d43a616b56
commit cfe4b45c41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 676 additions and 340 deletions

View File

@ -45,6 +45,11 @@ jobs:
python-version: '3.9'
cache: 'pip'
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'
- name: Install dependencies
run: |
python -m pip install --upgrade pip

View File

@ -63,7 +63,11 @@ pub trait Rpc {
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
#[method(name = "getFileInfo")]
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>>;
async fn get_file_info(
&self,
data_root: DataRoot,
need_available: bool,
) -> RpcResult<Option<FileInfo>>;
#[method(name = "getFileInfoByTxSeq")]
async fn get_file_info_by_tx_seq(&self, tx_seq: u64) -> RpcResult<Option<FileInfo>>;

View File

@ -95,7 +95,7 @@ impl RpcServer for RpcServerImpl {
let tx_seq = try_option!(
self.ctx
.log_store
.get_tx_seq_by_data_root(&data_root)
.get_tx_seq_by_data_root(&data_root, true)
.await?
);
@ -121,7 +121,12 @@ impl RpcServer for RpcServerImpl {
) -> RpcResult<Option<SegmentWithProof>> {
info!(%data_root, %index, "zgs_downloadSegmentWithProof");
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
let tx = try_option!(
self.ctx
.log_store
.get_tx_by_data_root(&data_root, true)
.await?
);
self.get_segment_with_proof_by_tx(tx, index).await
}
@ -144,7 +149,12 @@ impl RpcServer for RpcServerImpl {
let seq = match tx_seq_or_root {
TxSeqOrRoot::TxSeq(v) => v,
TxSeqOrRoot::Root(v) => {
try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?)
try_option!(
self.ctx
.log_store
.get_tx_seq_by_data_root(&v, false)
.await?
)
}
};
@ -163,10 +173,19 @@ impl RpcServer for RpcServerImpl {
}
}
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>> {
async fn get_file_info(
&self,
data_root: DataRoot,
need_available: bool,
) -> RpcResult<Option<FileInfo>> {
debug!(%data_root, "zgs_getFileInfo");
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
let tx = try_option!(
self.ctx
.log_store
.get_tx_by_data_root(&data_root, need_available)
.await?
);
Ok(Some(self.get_file_info_by_tx(tx).await?))
}
@ -288,7 +307,7 @@ impl RpcServerImpl {
let maybe_tx = self
.ctx
.log_store
.get_tx_by_data_root(&segment.root)
.get_tx_by_data_root(&segment.root, false)
.await?;
self.put_segment_with_maybe_tx(segment, maybe_tx).await

View File

@ -59,15 +59,23 @@ impl Store {
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
pub async fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>> {
pub async fn get_tx_seq_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<u64>> {
let root = *data_root;
self.spawn(move |store| store.get_tx_seq_by_data_root(&root))
self.spawn(move |store| store.get_tx_seq_by_data_root(&root, need_available))
.await
}
pub async fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
pub async fn get_tx_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<Transaction>> {
let root = *data_root;
self.spawn(move |store| store.get_tx_by_data_root(&root))
self.spawn(move |store| store.get_tx_by_data_root(&root, need_available))
.await
}

View File

@ -511,7 +511,7 @@ impl LogStoreChunkRead for LogManager {
index_start: usize,
index_end: usize,
) -> crate::error::Result<Option<ChunkArray>> {
let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root)?);
let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root, true)?);
self.get_chunks_by_tx_and_index_range(tx_seq, index_start, index_end)
}
@ -536,13 +536,27 @@ impl LogStoreRead for LogManager {
self.tx_store.get_tx_by_seq_number(seq)
}
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> {
fn get_tx_seq_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> crate::error::Result<Option<u64>> {
let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
let mut available_seq = None;
for tx_seq in &seq_list {
if self.tx_store.check_tx_completed(*tx_seq)? {
// Return the first finalized tx if possible.
return Ok(Some(*tx_seq));
}
if need_available
&& available_seq.is_none()
&& !self.tx_store.check_tx_pruned(*tx_seq)?
{
available_seq = Some(*tx_seq);
}
}
if need_available {
return Ok(available_seq);
}
// No tx is finalized, return the first one.
Ok(seq_list.first().cloned())

View File

@ -31,14 +31,22 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
/// Get a transaction by the data root of its data.
/// If all txs are not finalized, return the first one.
/// If all txs are not finalized, return the first one if need available is false.
/// Otherwise, return the first finalized tx.
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>;
fn get_tx_seq_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<u64>>;
/// If all txs are not finalized, return the first one.
/// If all txs are not finalized, return the first one if need available is false.
/// Otherwise, return the first finalized tx.
fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
match self.get_tx_seq_by_data_root(data_root)? {
fn get_tx_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<Transaction>> {
match self.get_tx_seq_by_data_root(data_root, need_available)? {
Some(seq) => self.get_tx_by_seq_number(seq),
None => Ok(None),
}

View File

@ -17,7 +17,10 @@ class ExampleTest(TestFramework):
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
wait_until(lambda: not client.zgs_get_file_info(data_root)["isCached"] and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1)
wait_until(
lambda: not client.zgs_get_file_info(data_root)["isCached"]
and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1
)
client.zgs_upload_segment(segments[1])
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])

View File

@ -7,9 +7,7 @@ ZGS_CONFIG = {
"log_config_file": "log_config",
"confirmation_block_count": 1,
"discv5_disable_ip_limit": True,
"network_peer_manager": {
"heartbeat_interval": "1s"
},
"network_peer_manager": {"heartbeat_interval": "1s"},
"router": {
"private_ip_enabled": True,
},
@ -22,7 +20,7 @@ ZGS_CONFIG = {
"auto_sync_idle_interval": "1s",
"sequential_find_peer_timeout": "10s",
"random_find_peer_timeout": "10s",
}
},
}
CONFIG_DIR = os.path.dirname(__file__)
@ -75,11 +73,12 @@ TX_PARAMS1 = {
NO_SEAL_FLAG = 0x1
NO_MERKLE_PROOF_FLAG = 0x2
def update_config(default: dict, custom: dict):
"""
Supports to update configurations with dict value.
"""
for (key, value) in custom.items():
for key, value in custom.items():
if default.get(key) is None or type(value) != dict:
default[key] = value
else:

View File

@ -20,17 +20,15 @@ class CrashTest(TestFramework):
segment = submit_data(self.nodes[0], chunk_data)
self.log.info("segment: %s", segment)
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True)
wait_until(
lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True
)
for i in range(1, self.num_nodes):
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(0)
self.log.info("wait for node: %s", i)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
# 2: first node runnging, other nodes killed
self.log.info("kill node")
@ -56,22 +54,16 @@ class CrashTest(TestFramework):
for i in range(2, self.num_nodes):
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(1)
self.nodes[i].stop(kill=True)
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(1)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
# 4: node[1..] synced contract entries and killed
self.log.info("kill node 0")
@ -96,13 +88,9 @@ class CrashTest(TestFramework):
self.log.info("wait for node: %s", i)
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(2)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
# 5: node[1..] synced contract entries and killed, sync disorder
self.nodes[0].stop(kill=True)
@ -137,21 +125,13 @@ class CrashTest(TestFramework):
self.log.info("wait for node: %s", i)
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None)
self.nodes[i].admin_start_sync_file(4)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"])
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(3)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
if __name__ == "__main__":

View File

@ -45,13 +45,9 @@ class FuzzTest(TestFramework):
lock.release()
log.info("submit data via client %s", idx)
wait_until(
lambda: nodes[idx].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: nodes[idx].zgs_get_file_info(data_root) is not None)
segment = submit_data(nodes[idx], chunk_data)
wait_until(
lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"])
lock.acquire()
nodes_index.append(idx)
@ -65,17 +61,15 @@ class FuzzTest(TestFramework):
lambda: nodes[idx].zgs_get_file_info(data_root) is not None
)
def wait_finalized():
def wait_finalized():
ret = nodes[idx].zgs_get_file_info(data_root)
if ret["finalized"]:
return True
else:
nodes[idx].admin_start_sync_file(ret['tx']['seq'])
nodes[idx].admin_start_sync_file(ret["tx"]["seq"])
return False
wait_until(
lambda: wait_finalized(), timeout = 180
)
wait_until(lambda: wait_finalized(), timeout=180)
def run_small_chunk_size(nodes, contract, log):
sizes = [i for i in range(1, SAMLL_SIZE + 1)]
@ -84,7 +78,7 @@ class FuzzTest(TestFramework):
run_chunk_size(sizes, nodes, contract, log)
def run_large_chunk_size(nodes, contract, log):
sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256 )]
sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256)]
random.shuffle(sizes)
run_chunk_size(sizes, nodes, contract, log)

View File

@ -18,7 +18,6 @@ class LongTimeMineTest(TestFramework):
self.mine_period = 15
self.launch_wait_seconds = 15
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
@ -44,7 +43,10 @@ class LongTimeMineTest(TestFramework):
self.submit_data(b"\x11", 2000)
self.log.info("Start mine")
wait_until(lambda: int(blockchain.eth_blockNumber(), 16) > self.mine_period, timeout=180)
wait_until(
lambda: int(blockchain.eth_blockNumber(), 16) > self.mine_period,
timeout=180,
)
self.log.info("Wait for the first mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == 1)

View File

@ -15,7 +15,11 @@ class MineTest(TestFramework):
}
self.mine_period = int(45 / self.block_time)
self.launch_wait_seconds = 15
self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
self.log.info(
"Contract Info: Est. block time %.2f, Mine period %d",
self.block_time,
self.mine_period,
)
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
@ -37,7 +41,11 @@ class MineTest(TestFramework):
first_block = self.contract.first_block()
self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16))
self.log.info("Flow deployment block number %d, epoch 1 start %d", first_block, first_block + self.mine_period)
self.log.info(
"Flow deployment block number %d, epoch 1 start %d",
first_block,
first_block + self.mine_period,
)
wait_until(lambda: self.contract.epoch() >= 1, timeout=180)
quality = int(2**256 / 100 / estimate_st_performance())
@ -54,27 +62,39 @@ class MineTest(TestFramework):
self.contract.update_context()
self.log.info("Wait for the first mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=180)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1
and not self.mine_contract.can_submit(),
timeout=180,
)
self.log.info("Wait for the second mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 2, timeout=180)
self.contract.update_context()
self.log.info("Wait for the second mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit(), timeout=180)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2
and not self.mine_contract.can_submit(),
timeout=180,
)
self.nodes[0].miner_stop()
self.log.info("Wait for the third mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 3, timeout=180)
self.contract.update_context()
self.log.info("Submit the second data chunk")
self.submit_data(b"\x22", 2000)
# Now the storage node should have the latest flow, but the mining context is using an old one.
self.nodes[0].miner_start()
self.log.info("Wait for the third mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 3 and not self.mine_contract.can_submit(), timeout=180)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 3
and not self.mine_contract.can_submit(),
timeout=180,
)
self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16))

View File

@ -2,13 +2,18 @@
from test_framework.test_framework import TestFramework
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
from utility.submission import create_submission, submit_data
from utility.utils import wait_until, assert_equal, assert_greater_than, estimate_st_performance
from utility.utils import (
wait_until,
assert_equal,
assert_greater_than,
estimate_st_performance,
)
from test_framework.blockchain_node import BlockChainNodeType
import time
import math
PRICE_PER_SECTOR = math.ceil(10 * (10 ** 18) / (2 ** 30) * 256 / 12)
PRICE_PER_SECTOR = math.ceil(10 * (10**18) / (2**30) * 256 / 12)
class MineTest(TestFramework):
@ -23,17 +28,21 @@ class MineTest(TestFramework):
self.enable_market = True
self.mine_period = int(50 / self.block_time)
self.launch_wait_seconds = 15
self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
self.log.info(
"Contract Info: Est. block time %.2f, Mine period %d",
self.block_time,
self.mine_period,
)
def submit_data(self, item, size, no_submit = False):
def submit_data(self, item, size, no_submit=False):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
chunk_data = item * 256 * size
submissions, data_root = create_submission(chunk_data)
value = int(size * PRICE_PER_SECTOR * 1.1)
self.contract.submit(submissions, tx_prarams = {"value": value})
self.contract.submit(submissions, tx_prarams={"value": value})
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
if not no_submit:
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
@ -48,11 +57,15 @@ class MineTest(TestFramework):
difficulty = int(2**256 / 5 / estimate_st_performance())
self.mine_contract.set_quality(difficulty)
SECTORS_PER_PRICING = int(8 * ( 2 ** 30 ) / 256)
SECTORS_PER_PRICING = int(8 * (2**30) / 256)
first_block = self.contract.first_block()
self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16))
self.log.info("Flow deployment block number %d, epoch 1 start %d, wait for epoch 1 start", first_block, first_block + self.mine_period)
self.log.info(
"Flow deployment block number %d, epoch 1 start %d, wait for epoch 1 start",
first_block,
first_block + self.mine_period,
)
wait_until(lambda: self.contract.epoch() >= 1, timeout=180)
self.log.info("Submit the actual data chunk (256 MB)")
@ -63,8 +76,12 @@ class MineTest(TestFramework):
# wait_until(lambda: self.contract.epoch() >= 1, timeout=180)
start_epoch = self.contract.epoch()
self.log.info("Submission Done, epoch is %d, current block number %d", start_epoch, int(blockchain.eth_blockNumber(), 16))
self.log.info(
"Submission Done, epoch is %d, current block number %d",
start_epoch,
int(blockchain.eth_blockNumber(), 16),
)
self.log.info("Wait for mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 1, timeout=180)
@ -72,27 +89,38 @@ class MineTest(TestFramework):
self.contract.update_context()
self.log.info("Wait for mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=120)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1
and not self.mine_contract.can_submit(),
timeout=120,
)
rewards = self.reward_contract.reward_distributes()
assert_equal(len(rewards), 4)
firstReward = rewards[0].args.amount
self.log.info("Received reward %d Gwei", firstReward / (10**9))
self.reward_contract.donate(10000 * 10 ** 18)
self.reward_contract.donate(10000 * 10**18)
self.log.info("Donation Done")
self.log.info("Submit the data hash only (8 GB)")
self.submit_data(b"\x11", int(SECTORS_PER_PRICING), no_submit=True)
current_epoch = self.contract.epoch()
assert_equal(current_epoch, start_epoch + 1);
self.log.info("Sumission Done, epoch is %d, current block number %d", self.contract.epoch(), int(blockchain.eth_blockNumber(), 16))
assert_equal(current_epoch, start_epoch + 1)
self.log.info(
"Sumission Done, epoch is %d, current block number %d",
self.contract.epoch(),
int(blockchain.eth_blockNumber(), 16),
)
self.log.info("Wait for mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 2, timeout=180)
self.contract.update_context()
self.log.info("Wait for mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit())
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2
and not self.mine_contract.can_submit()
)
assert_equal(self.contract.epoch(), start_epoch + 2)
rewards = self.reward_contract.reward_distributes()

View File

@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkDiscoveryTest(TestFramework):
"""
This is to test whether community nodes could connect to each other via UDP discovery.
@ -24,7 +25,6 @@ class NetworkDiscoveryTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
}
@ -37,7 +37,6 @@ class NetworkDiscoveryTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
@ -57,7 +56,13 @@ class NetworkDiscoveryTest(TestFramework):
total_connected += info["connectedPeers"]
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
i,
info["totalPeers"],
info["bannedPeers"],
info["disconnectedPeers"],
info["connectedPeers"],
info["connectedIncomingPeers"],
info["connectedOutgoingPeers"],
)
if total_connected >= self.num_nodes * (self.num_nodes - 1):
@ -66,5 +71,6 @@ class NetworkDiscoveryTest(TestFramework):
self.log.info("====================================")
self.log.info("All nodes connected to each other successfully")
if __name__ == "__main__":
NetworkDiscoveryTest().main()

View File

@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkDiscoveryUpgradeTest(TestFramework):
"""
This is to test that low version community nodes could not connect to bootnodes.
@ -24,7 +25,6 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
}
@ -37,11 +37,9 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
# disable network identity in ENR
"discv5_disable_enr_network_id": True,
}
@ -57,7 +55,13 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
total_connected += info["connectedPeers"]
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
i,
info["totalPeers"],
info["bannedPeers"],
info["disconnectedPeers"],
info["connectedPeers"],
info["connectedIncomingPeers"],
info["connectedOutgoingPeers"],
)
# ENR incompatible and should not discover each other for TCP connection
@ -66,5 +70,6 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
self.log.info("====================================")
self.log.info("ENR incompatible nodes do not connect to each other")
if __name__ == "__main__":
NetworkDiscoveryUpgradeTest().main()

View File

@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkTcpShardTest(TestFramework):
"""
This is to test TCP connection for shard config mismatched peers of UDP discovery.
@ -24,12 +25,10 @@ class NetworkTcpShardTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
# custom shard config
"shard_position": "0/4"
"shard_position": "0/4",
}
# setup node 1 & 2 as community nodes
@ -40,13 +39,11 @@ class NetworkTcpShardTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
# custom shard config
"shard_position": f"{i}/4"
"shard_position": f"{i}/4",
}
def run_test(self):
@ -60,7 +57,13 @@ class NetworkTcpShardTest(TestFramework):
info = self.nodes[i].rpc.admin_getNetworkInfo()
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
i,
info["totalPeers"],
info["bannedPeers"],
info["disconnectedPeers"],
info["connectedPeers"],
info["connectedIncomingPeers"],
info["connectedOutgoingPeers"],
)
if i == timeout_secs - 1:
@ -72,5 +75,6 @@ class NetworkTcpShardTest(TestFramework):
self.log.info("====================================")
self.log.info("All nodes discovered but not connected for each other")
if __name__ == "__main__":
NetworkTcpShardTest().main()

View File

@ -23,7 +23,11 @@ class PrunerTest(TestFramework):
self.mine_period = int(45 / self.block_time)
self.lifetime_seconds = 240
self.launch_wait_seconds = 15
self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
self.log.info(
"Contract Info: Est. block time %.2f, Mine period %d",
self.block_time,
self.mine_period,
)
def run_test(self):
client = self.nodes[0]
@ -31,7 +35,10 @@ class PrunerTest(TestFramework):
chunk_data = b"\x02" * 16 * 256 * 1024
# chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
self.contract.submit(
submissions,
tx_prarams={"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)},
)
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)

View File

@ -7,6 +7,7 @@ from utility.submission import create_submission
from utility.submission import submit_data
from utility.utils import wait_until
class RandomTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
@ -32,14 +33,18 @@ class RandomTest(TestFramework):
else:
size = random.randint(0, max_size)
no_data = random.random() <= no_data_ratio
self.log.info(f"choose {chosen_node}, seq={i}, size={size}, no_data={no_data}")
self.log.info(
f"choose {chosen_node}, seq={i}, size={size}, no_data={no_data}"
)
client = self.nodes[chosen_node]
chunk_data = random.randbytes(size)
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == i + 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None, timeout=120)
wait_until(
lambda: client.zgs_get_file_info(data_root) is not None, timeout=120
)
if not no_data:
submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
@ -47,8 +52,17 @@ class RandomTest(TestFramework):
for node_index in range(len(self.nodes)):
if node_index != chosen_node:
self.log.debug(f"check {node_index}")
wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root) is not None, timeout=300)
wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root)["finalized"], timeout=300)
wait_until(
lambda: self.nodes[node_index].zgs_get_file_info(data_root)
is not None,
timeout=300,
)
wait_until(
lambda: self.nodes[node_index].zgs_get_file_info(data_root)[
"finalized"
],
timeout=300,
)
# TODO(zz): This is a temp solution to trigger auto sync after all nodes started.
if i >= tx_count - 2:
continue
@ -72,7 +86,10 @@ class RandomTest(TestFramework):
if not no_data:
for node in self.nodes:
self.log.debug(f"check {data_root}, {node.index}")
wait_until(lambda: node.zgs_get_file_info(data_root)["finalized"], timeout=300)
wait_until(
lambda: node.zgs_get_file_info(data_root)["finalized"],
timeout=300,
)
if __name__ == "__main__":

View File

@ -32,8 +32,6 @@ class RootConsistencyTest(TestFramework):
assert_equal(contract_length, expected_length)
assert_equal(contract_root, node_root[2:])
def run_test(self):
self.assert_flow_status(1)
@ -48,7 +46,6 @@ class RootConsistencyTest(TestFramework):
self.submit_data(b"\x13", 512 + 256)
self.assert_flow_status(1024 + 512 + 256)
if __name__ == "__main__":

View File

@ -29,6 +29,7 @@ class RpcTest(TestFramework):
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)
assert_equal(client1.zgs_get_file_info(data_root)["pruned"], False)
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
@ -37,17 +38,13 @@ class RpcTest(TestFramework):
self.log.info("segment: %s", segment)
wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"])
assert_equal(
client1.zgs_download_segment(data_root, 0, 1), segment[0]["data"]
)
assert_equal(client1.zgs_download_segment(data_root, 0, 1), segment[0]["data"])
client2.admin_start_sync_file(0)
wait_until(lambda: client2.sync_status_is_completed_or_unknown(0))
wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])
assert_equal(
client2.zgs_download_segment(data_root, 0, 1), segment[0]["data"]
)
assert_equal(client2.zgs_download_segment(data_root, 0, 1), segment[0]["data"])
self.__test_upload_file_with_cli(client1)
@ -89,9 +86,7 @@ class RpcTest(TestFramework):
)
)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(root)["finalized"])
assert_equal(
client1.zgs_download_segment(root, 0, 2),

View File

@ -41,7 +41,11 @@ class SubmissionTest(TestFramework):
for tx_offset in range(same_root_tx_count + 1):
tx_seq = next_tx_seq - 1 - tx_offset
# old txs are finalized after finalizing the new tx, so we may need to wait here.
wait_until(lambda: self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)["finalized"])
wait_until(
lambda: self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)[
"finalized"
]
)
# Send tx after uploading data
for _ in range(same_root_tx_count):
@ -57,7 +61,10 @@ class SubmissionTest(TestFramework):
client = self.nodes[node_idx]
wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq) is not None)
wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"] == data_finalized)
wait_until(
lambda: client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"]
== data_finalized
)
def submit_data(self, chunk_data, node_idx=0):
_, data_root = create_submission(chunk_data)

View File

@ -18,30 +18,30 @@ class ShardSubmitTest(TestFramework):
self.num_blockchain_nodes = 1
self.num_nodes = 4
self.zgs_node_configs[0] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "0/4"
"db_max_num_sectors": 2**30,
"shard_position": "0/4",
}
self.zgs_node_configs[1] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "1/4"
"db_max_num_sectors": 2**30,
"shard_position": "1/4",
}
self.zgs_node_configs[2] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "2/4"
"db_max_num_sectors": 2**30,
"shard_position": "2/4",
}
self.zgs_node_configs[3] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "3/4"
"db_max_num_sectors": 2**30,
"shard_position": "3/4",
}
def run_test(self):
data_size = [
256*960,
256*1024,
256 * 960,
256 * 1024,
2,
255,
256*960,
256*120,
256 * 960,
256 * 120,
256,
257,
1023,
@ -77,5 +77,6 @@ class ShardSubmitTest(TestFramework):
submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
if __name__ == "__main__":
ShardSubmitTest().main()

View File

@ -13,16 +13,16 @@ class PrunerTest(TestFramework):
self.num_blockchain_nodes = 1
self.num_nodes = 4
self.zgs_node_configs[0] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "0/2"
"db_max_num_sectors": 2**30,
"shard_position": "0/2",
}
self.zgs_node_configs[1] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "1/2"
"db_max_num_sectors": 2**30,
"shard_position": "1/2",
}
self.zgs_node_configs[3] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "1/4"
"db_max_num_sectors": 2**30,
"shard_position": "1/4",
}
self.enable_market = True
@ -31,7 +31,10 @@ class PrunerTest(TestFramework):
chunk_data = b"\x02" * 8 * 256 * 1024
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
self.contract.submit(
submissions,
tx_prarams={"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)},
)
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
@ -57,10 +60,18 @@ class PrunerTest(TestFramework):
for i in range(len(segments)):
index_store = i % 2
index_empty = 1 - i % 2
seg0 = self.nodes[index_store].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg1 = self.nodes[index_empty].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg2 = self.nodes[2].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg3 = self.nodes[3].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg0 = self.nodes[index_store].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
seg1 = self.nodes[index_empty].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
seg2 = self.nodes[2].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
seg3 = self.nodes[3].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
# base64 encoding size
assert_equal(len(seg0), 349528)
assert_equal(seg1, None)

View File

@ -5,6 +5,7 @@ import shutil
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class SnapshotTask(TestFramework):
def setup_params(self):
self.num_nodes = 2
@ -28,11 +29,11 @@ class SnapshotTask(TestFramework):
# Start the last node to verify historical file sync
self.nodes[1].shutdown()
shutil.rmtree(os.path.join(self.nodes[1].data_dir, 'db/data_db'))
shutil.rmtree(os.path.join(self.nodes[1].data_dir, "db/data_db"))
self.start_storage_node(1)
self.nodes[1].wait_for_rpc_connection()
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])

View File

@ -77,9 +77,7 @@ class SubmissionTest(TestFramework):
continue
# Wait for log entry before file sync, otherwise, admin_startSyncFile will be failed.
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(submission_index - 1)
@ -89,15 +87,11 @@ class SubmissionTest(TestFramework):
)
)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
assert_equal(
base64.b64decode(
self.nodes[i]
.zgs_download_segment(data_root, 0, 1)
.encode("utf-8")
self.nodes[i].zgs_download_segment(data_root, 0, 1).encode("utf-8")
),
first_entry,
)

View File

@ -3,17 +3,14 @@
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoSyncHistoricalTest(TestFramework):
def setup_params(self):
self.num_nodes = 4
# Enable auto sync
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True
}
}
self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}}
def run_test(self):
# Stop the last node to verify historical file sync
@ -26,17 +23,36 @@ class AutoSyncHistoricalTest(TestFramework):
# Files should be available on other nodes via auto sync
for i in range(1, self.num_nodes - 1):
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]
)
# Start the last node to verify historical file sync
self.start_storage_node(self.num_nodes - 1)
self.nodes[self.num_nodes - 1].wait_for_rpc_connection()
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)["finalized"])
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)
is not None
)
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)[
"finalized"
]
)
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)
is not None
)
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)[
"finalized"
]
)
if __name__ == "__main__":
AutoSyncHistoricalTest().main()

View File

@ -3,17 +3,14 @@
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoSyncTest(TestFramework):
def setup_params(self):
self.num_nodes = 2
# Enable auto sync
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True
}
}
self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}}
def run_test(self):
# Submit and upload files on node 0
@ -26,5 +23,6 @@ class AutoSyncTest(TestFramework):
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoSyncTest().main()

View File

@ -6,6 +6,7 @@ from test_framework.test_framework import TestFramework
from utility.submission import data_to_segments
from utility.utils import assert_equal, wait_until
class SyncChunksTest(TestFramework):
"""
By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
@ -17,9 +18,7 @@ class SyncChunksTest(TestFramework):
# enable find chunks topic
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"network_find_chunks_enabled": True
}
self.zgs_node_configs[i] = {"network_find_chunks_enabled": True}
def run_test(self):
client1 = self.nodes[0]
@ -35,20 +34,25 @@ class SyncChunksTest(TestFramework):
# Upload only 2nd segment to storage node
segments = data_to_segments(chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
assert(client1.zgs_upload_segment(segments[1]) is None)
self.log.info(
"segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]
)
assert client1.zgs_upload_segment(segments[1]) is None
# segment 0 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 0, 1024) is None)
assert client1.zgs_download_segment_decoded(data_root, 0, 1024) is None
# segment 1 is available to download
assert_equal(client1.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
assert_equal(
client1.zgs_download_segment_decoded(data_root, 1024, 2048),
chunk_data[1024 * 256 : 2048 * 256],
)
# segment 2 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None)
assert client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None
# Segment 1 should not be able to download on node 2
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None)
assert client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None
# Restart node 1 to check if the proof nodes are persisted.
self.stop_storage_node(0)
@ -56,12 +60,19 @@ class SyncChunksTest(TestFramework):
self.nodes[0].wait_for_rpc_connection()
# Trigger chunks sync by rpc
assert(client2.admin_start_sync_chunks(0, 1024, 2048) is None)
assert client2.admin_start_sync_chunks(0, 1024, 2048) is None
wait_until(lambda: client2.sync_status_is_completed_or_unknown(0))
wait_until(lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) is not None)
wait_until(
lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048)
is not None
)
# Validate data
assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
assert_equal(
client2.zgs_download_segment_decoded(data_root, 1024, 2048),
chunk_data[1024 * 256 : 2048 * 256],
)
if __name__ == "__main__":
SyncChunksTest().main()

View File

@ -5,6 +5,7 @@ import time
from test_framework.test_framework import TestFramework
from utility.utils import assert_equal, wait_until
class SyncFileTest(TestFramework):
"""
By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
@ -26,7 +27,7 @@ class SyncFileTest(TestFramework):
# restart client2
client2.start()
client2.wait_for_rpc_connection()
# File should not be auto sync on node 2 and there is no cached file locations
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
time.sleep(3)
@ -35,7 +36,7 @@ class SyncFileTest(TestFramework):
# assert(client2.admin_get_file_location(0) is None)
# Trigger file sync by rpc
assert(client2.admin_start_sync_file(0) is None)
assert client2.admin_start_sync_file(0) is None
wait_until(lambda: client2.sync_status_is_completed_or_unknown(0))
wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])
# file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore.
@ -47,5 +48,6 @@ class SyncFileTest(TestFramework):
client1.zgs_download_segment(data_root, 0, 1024),
)
if __name__ == "__main__":
SyncFileTest().main()

View File

@ -6,8 +6,8 @@ from utility.run_all import run_all
if __name__ == "__main__":
run_all(
test_dir = os.path.dirname(__file__),
test_dir=os.path.dirname(__file__),
slow_tests={"mine_test.py", "random_test.py", "same_root_test.py"},
long_manual_tests={"fuzz_test.py"},
single_run_tests={"mine_with_market_test.py"},
)
)

View File

@ -12,11 +12,7 @@ from config.node_config import (
TX_PARAMS,
)
from utility.simple_rpc_proxy import SimpleRpcProxy
from utility.utils import (
initialize_config,
wait_until,
estimate_st_performance
)
from utility.utils import initialize_config, wait_until, estimate_st_performance
from test_framework.contracts import load_contract_metadata
@ -36,6 +32,7 @@ class BlockChainNodeType(Enum):
else:
raise AssertionError("Unsupported blockchain type")
@unique
class NodeType(Enum):
BlockChain = 0
@ -50,7 +47,7 @@ class TestNode:
def __init__(
self, node_type, index, data_dir, rpc_url, binary, config, log, rpc_timeout=10
):
assert os.path.exists(binary), ("Binary not found: %s" % binary)
assert os.path.exists(binary), "Binary not found: %s" % binary
self.node_type = node_type
self.index = index
self.data_dir = data_dir
@ -270,12 +267,14 @@ class BlockchainNode(TestNode):
def deploy_contract(name, args=None):
if args is None:
args = []
contract_interface = load_contract_metadata(path=self.contract_path, name=name)
contract_interface = load_contract_metadata(
path=self.contract_path, name=name
)
contract = w3.eth.contract(
abi=contract_interface["abi"],
bytecode=contract_interface["bytecode"],
)
tx_params = TX_PARAMS.copy()
del tx_params["gas"]
tx_hash = contract.constructor(*args).transact(tx_params)
@ -285,7 +284,7 @@ class BlockchainNode(TestNode):
abi=contract_interface["abi"],
)
return contract, tx_hash
def deploy_no_market():
self.log.debug("Start deploy contracts")
@ -301,12 +300,16 @@ class BlockchainNode(TestNode):
mine_contract, _ = deploy_contract("PoraMineTest", [0])
self.log.debug("Mine deployed")
mine_contract.functions.initialize(1, flow_contract.address, dummy_reward_contract.address).transact(TX_PARAMS)
mine_contract.functions.initialize(
1, flow_contract.address, dummy_reward_contract.address
).transact(TX_PARAMS)
mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS)
mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS)
self.log.debug("Mine Initialized")
flow_initialize_hash = flow_contract.functions.initialize(dummy_market_contract.address, mine_period).transact(TX_PARAMS)
flow_initialize_hash = flow_contract.functions.initialize(
dummy_market_contract.address, mine_period
).transact(TX_PARAMS)
self.log.debug("Flow Initialized")
self.wait_for_transaction_receipt(w3, flow_initialize_hash)
@ -315,45 +318,60 @@ class BlockchainNode(TestNode):
# tx_hash = mine_contract.functions.setMiner(decode_hex(MINER_ID)).transact(TX_PARAMS)
# self.wait_for_transaction_receipt(w3, tx_hash)
return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract
return (
flow_contract,
flow_initialize_hash,
mine_contract,
dummy_reward_contract,
)
def deploy_with_market(lifetime_seconds):
self.log.debug("Start deploy contracts")
mine_contract, _ = deploy_contract("PoraMineTest", [0])
self.log.debug("Mine deployed")
market_contract, _ = deploy_contract("FixedPrice", [])
self.log.debug("Market deployed")
reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds])
reward_contract, _ = deploy_contract(
"ChunkLinearReward", [lifetime_seconds]
)
self.log.debug("Reward deployed")
flow_contract, _ = deploy_contract("FixedPriceFlow", [0])
self.log.debug("Flow deployed")
mine_contract.functions.initialize(1, flow_contract.address, reward_contract.address).transact(TX_PARAMS)
mine_contract.functions.initialize(
1, flow_contract.address, reward_contract.address
).transact(TX_PARAMS)
mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS)
mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS)
self.log.debug("Mine Initialized")
market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 /
2 ** 30 / 12 / 31 / 86400),
flow_contract.address, reward_contract.address).transact(TX_PARAMS)
market_contract.functions.initialize(
int(lifetime_seconds * 256 * 10 * 10**18 / 2**30 / 12 / 31 / 86400),
flow_contract.address,
reward_contract.address,
).transact(TX_PARAMS)
self.log.debug("Market Initialized")
reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS)
reward_contract.functions.setBaseReward(10 ** 18).transact(TX_PARAMS)
reward_contract.functions.initialize(
market_contract.address, mine_contract.address
).transact(TX_PARAMS)
reward_contract.functions.setBaseReward(10**18).transact(TX_PARAMS)
self.log.debug("Reward Initialized")
flow_initialize_hash = flow_contract.functions.initialize(market_contract.address, mine_period).transact(TX_PARAMS)
flow_initialize_hash = flow_contract.functions.initialize(
market_contract.address, mine_period
).transact(TX_PARAMS)
self.log.debug("Flow Initialized")
self.wait_for_transaction_receipt(w3, flow_initialize_hash)
self.log.info("All contracts deployed")
return flow_contract, flow_initialize_hash, mine_contract, reward_contract
if enable_market:
return deploy_with_market(lifetime_seconds)
else:
@ -376,4 +394,7 @@ class BlockchainNode(TestNode):
w3.eth.wait_for_transaction_receipt(tx_hash)
def start(self):
super().start(self.blockchain_node_type == BlockChainNodeType.BSC or self.blockchain_node_type == BlockChainNodeType.ZG)
super().start(
self.blockchain_node_type == BlockChainNodeType.BSC
or self.blockchain_node_type == BlockChainNodeType.ZG
)

View File

@ -36,28 +36,36 @@ class ContractProxy:
tx_params = copy(TX_PARAMS)
tx_params["value"] = value
return getattr(contract.functions, fn_name)(**args).transact(tx_params)
def _logs(self, event_name, node_idx, **args):
assert node_idx < len(self.blockchain_nodes)
contract = self._get_contract(node_idx)
return getattr(contract.events, event_name).create_filter(from_block=0, to_block="latest").get_all_entries()
def transfer(self, value, node_idx = 0):
return (
getattr(contract.events, event_name)
.create_filter(from_block=0, to_block="latest")
.get_all_entries()
)
def transfer(self, value, node_idx=0):
tx_params = copy(TX_PARAMS)
tx_params["value"] = value
contract = self._get_contract(node_idx)
contract.receive.transact(tx_params)
def address(self):
return self.contract_address
class FlowContractProxy(ContractProxy):
def submit(
self, submission_nodes, node_idx=0, tx_prarams=None, parent_hash=None,
self,
submission_nodes,
node_idx=0,
tx_prarams=None,
parent_hash=None,
):
assert node_idx < len(self.blockchain_nodes)
@ -65,11 +73,12 @@ class FlowContractProxy(ContractProxy):
if tx_prarams is not None:
combined_tx_prarams.update(tx_prarams)
contract = self._get_contract(node_idx)
# print(contract.functions.submit(submission_nodes).estimate_gas(combined_tx_prarams))
tx_hash = contract.functions.submit(submission_nodes).transact(combined_tx_prarams)
tx_hash = contract.functions.submit(submission_nodes).transact(
combined_tx_prarams
)
receipt = self.blockchain_nodes[node_idx].wait_for_transaction_receipt(
contract.w3, tx_hash, parent_hash=parent_hash
)
@ -85,46 +94,46 @@ class FlowContractProxy(ContractProxy):
def epoch(self, node_idx=0):
return self.get_mine_context(node_idx)[0]
def update_context(self, node_idx=0):
return self._send("makeContext", node_idx)
def get_mine_context(self, node_idx=0):
return self._call("makeContextWithResult", node_idx)
def get_flow_root(self, node_idx=0):
return self._call("computeFlowRoot", node_idx)
def get_flow_length(self, node_idx=0):
return self._call("tree", node_idx)[0]
class MineContractProxy(ContractProxy):
def last_mined_epoch(self, node_idx=0):
return self._call("lastMinedEpoch", node_idx)
def can_submit(self, node_idx=0):
return self._call("canSubmit", node_idx)
def current_submissions(self, node_idx=0):
return self._call("currentSubmissions", node_idx)
def set_quality(self, quality, node_idx=0):
return self._send("setQuality", node_idx, _targetQuality=quality)
class RewardContractProxy(ContractProxy):
def reward_distributes(self, node_idx=0):
return self._logs("DistributeReward", node_idx)
def donate(self, value, node_idx = 0):
def donate(self, value, node_idx=0):
return self._send_payable("donate", node_idx, value)
def base_reward(self, node_idx = 0):
def base_reward(self, node_idx=0):
return self._call("baseReward", node_idx)
def first_rewardable_chunk(self, node_idx = 0):
def first_rewardable_chunk(self, node_idx=0):
return self._call("firstRewardableChunk", node_idx)
def reward_deadline(self, node_idx = 0):
def reward_deadline(self, node_idx=0):
return self._call("rewardDeadline", node_idx, 0)

View File

@ -1,6 +1,7 @@
from pathlib import Path
import json
def load_contract_metadata(path: str, name: str):
path = Path(path)
try:
@ -8,4 +9,3 @@ def load_contract_metadata(path: str, name: str):
return json.loads(open(found_file, "r").read())
except StopIteration:
raise Exception(f"Cannot found contract {name}'s metadata")

View File

@ -15,7 +15,11 @@ from pathlib import Path
from eth_utils import encode_hex
from test_framework.bsc_node import BSCNode
from test_framework.contract_proxy import FlowContractProxy, MineContractProxy, RewardContractProxy
from test_framework.contract_proxy import (
FlowContractProxy,
MineContractProxy,
RewardContractProxy,
)
from test_framework.zgs_node import ZgsNode
from test_framework.blockchain_node import BlockChainNodeType
from test_framework.conflux_node import ConfluxNode, connect_sample_nodes
@ -74,13 +78,17 @@ class TestFramework:
root_dir, "target", "release", "zgs_node" + binary_ext
)
self.__default_zgs_cli_binary__ = os.path.join(
tests_dir, "tmp", "0g-storage-client" + binary_ext
tests_dir, "tmp", "0g-storage-client" + binary_ext
)
def __setup_blockchain_node(self):
if self.blockchain_node_type == BlockChainNodeType.ZG:
zg_node_init_genesis(self.blockchain_binary, self.root_dir, self.num_blockchain_nodes)
self.log.info("0gchain genesis initialized for %s nodes" % self.num_blockchain_nodes)
zg_node_init_genesis(
self.blockchain_binary, self.root_dir, self.num_blockchain_nodes
)
self.log.info(
"0gchain genesis initialized for %s nodes" % self.num_blockchain_nodes
)
for i in range(self.num_blockchain_nodes):
if i in self.blockchain_node_configs:
@ -171,15 +179,20 @@ class TestFramework:
self.log.debug("Wait for 0gchain node to generate first block")
time.sleep(0.5)
for node in self.blockchain_nodes:
wait_until(lambda: node.net_peerCount() == self.num_blockchain_nodes - 1)
wait_until(
lambda: node.net_peerCount() == self.num_blockchain_nodes - 1
)
wait_until(lambda: node.eth_blockNumber() is not None)
wait_until(lambda: int(node.eth_blockNumber(), 16) > 0)
contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds)
contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[
0
].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds)
self.contract = FlowContractProxy(contract, self.blockchain_nodes)
self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes)
self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes)
self.reward_contract = RewardContractProxy(
reward_contract, self.blockchain_nodes
)
for node in self.blockchain_nodes[1:]:
node.wait_for_transaction(tx_hash)
@ -216,12 +229,14 @@ class TestFramework:
time.sleep(1)
node.start()
self.log.info("Wait the zgs_node launch for %d seconds", self.launch_wait_seconds)
self.log.info(
"Wait the zgs_node launch for %d seconds", self.launch_wait_seconds
)
time.sleep(self.launch_wait_seconds)
for node in self.nodes:
node.wait_for_rpc_connection()
def add_arguments(self, parser: argparse.ArgumentParser):
parser.add_argument(
"--conflux-binary",
@ -336,10 +351,12 @@ class TestFramework:
self.log.addHandler(ch)
def _check_cli_binary(self):
if Path(self.cli_binary).absolute() == Path(self.__default_zgs_cli_binary__).absolute() and not os.path.exists(self.cli_binary):
if Path(self.cli_binary).absolute() == Path(
self.__default_zgs_cli_binary__
).absolute() and not os.path.exists(self.cli_binary):
dir = Path(self.cli_binary).parent.absolute()
build_cli(dir)
assert os.path.exists(self.cli_binary), (
"zgs CLI binary not found: %s" % self.cli_binary
)
@ -353,14 +370,12 @@ class TestFramework:
file_to_upload,
):
self._check_cli_binary()
upload_args = [
self.cli_binary,
"upload",
"--url",
blockchain_node_rpc_url,
"--contract",
contract_address,
"--key",
encode_hex(key),
"--node",
@ -372,7 +387,9 @@ class TestFramework:
"--file",
]
output = tempfile.NamedTemporaryFile(dir=self.root_dir, delete=False, prefix="zgs_client_output_")
output = tempfile.NamedTemporaryFile(
dir=self.root_dir, delete=False, prefix="zgs_client_output_"
)
output_name = output.name
output_fileno = output.fileno()
@ -383,7 +400,7 @@ class TestFramework:
stdout=output_fileno,
stderr=output_fileno,
)
return_code = proc.wait(timeout=60)
output.seek(0)
@ -392,17 +409,27 @@ class TestFramework:
line = line.decode("utf-8")
self.log.debug("line: %s", line)
if "root" in line:
filtered_line = re.sub(r'\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?', '', line)
filtered_line = re.sub(
r"\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?",
"",
line,
)
index = filtered_line.find("root=")
if index > 0:
root = filtered_line[index + 5 : index + 5 + 66]
except Exception as ex:
self.log.error("Failed to upload file via CLI tool, output: %s", output_name)
self.log.error(
"Failed to upload file via CLI tool, output: %s", output_name
)
raise ex
finally:
output.close()
assert return_code == 0, "%s upload file failed, output: %s, log: %s" % (self.cli_binary, output_name, lines)
assert return_code == 0, "%s upload file failed, output: %s, log: %s" % (
self.cli_binary,
output_name,
lines,
)
return root
@ -410,8 +437,15 @@ class TestFramework:
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
self.num_deployed_contracts += 1
wait_until(lambda: self.contract.num_submissions() == self.num_deployed_contracts)
self.log.info("Submission completed, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
wait_until(
lambda: self.contract.num_submissions() == self.num_deployed_contracts
)
self.log.info(
"Submission completed, data root: %s, submissions(%s) = %s",
data_root,
len(submissions),
submissions,
)
return data_root
def __upload_file__(self, node_index: int, random_data_size: int) -> str:
@ -426,7 +460,9 @@ class TestFramework:
# Upload file to storage node
segments = submit_data(client, chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
self.log.info(
"segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]
)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
return data_root
@ -452,7 +488,6 @@ class TestFramework:
if clean:
self.nodes[index].clean_data()
def start_storage_node(self, index):
self.nodes[index].start()
@ -484,7 +519,7 @@ class TestFramework:
if os.path.islink(dst):
os.remove(dst)
elif os.path.isdir(dst):
elif os.path.isdir(dst):
shutil.rmtree(dst)
elif os.path.exists(dst):
os.remove(dst)

View File

@ -3,24 +3,35 @@ import subprocess
import tempfile
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
from utility.utils import blockchain_p2p_port, blockchain_rpc_port, blockchain_ws_port, blockchain_rpc_port_tendermint, pprof_port
from utility.utils import (
blockchain_p2p_port,
blockchain_rpc_port,
blockchain_ws_port,
blockchain_rpc_port_tendermint,
pprof_port,
)
from utility.build_binary import build_zg
def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
assert num_nodes > 0, "Invalid number of blockchain nodes: %s" % num_nodes
if not os.path.exists(binary):
build_zg(os.path.dirname(binary))
build_zg(os.path.dirname(binary))
shell_script = os.path.join(
os.path.dirname(os.path.realpath(__file__)), # test_framework folder
"..", "config", "0gchain-init-genesis.sh"
os.path.dirname(os.path.realpath(__file__)), # test_framework folder
"..",
"config",
"0gchain-init-genesis.sh",
)
zgchaind_dir = os.path.join(root_dir, "0gchaind")
os.mkdir(zgchaind_dir)
log_file = tempfile.NamedTemporaryFile(dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log")
log_file = tempfile.NamedTemporaryFile(
dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log"
)
p2p_port_start = blockchain_p2p_port(0)
ret = subprocess.run(
@ -31,7 +42,11 @@ def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
log_file.close()
assert ret.returncode == 0, "Failed to init 0gchain genesis, see more details in log file: %s" % log_file.name
assert ret.returncode == 0, (
"Failed to init 0gchain genesis, see more details in log file: %s"
% log_file.name
)
class ZGNode(BlockchainNode):
def __init__(
@ -61,19 +76,27 @@ class ZGNode(BlockchainNode):
self.config_file = None
self.args = [
binary, "start",
"--home", data_dir,
binary,
"start",
"--home",
data_dir,
# overwrite json rpc http port: 8545
"--json-rpc.address", "127.0.0.1:%s" % blockchain_rpc_port(index),
"--json-rpc.address",
"127.0.0.1:%s" % blockchain_rpc_port(index),
# overwrite json rpc ws port: 8546
"--json-rpc.ws-address", "127.0.0.1:%s" % blockchain_ws_port(index),
"--json-rpc.ws-address",
"127.0.0.1:%s" % blockchain_ws_port(index),
# overwrite p2p port: 26656
"--p2p.laddr", "tcp://127.0.0.1:%s" % blockchain_p2p_port(index),
"--p2p.laddr",
"tcp://127.0.0.1:%s" % blockchain_p2p_port(index),
# overwrite rpc port: 26657
"--rpc.laddr", "tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index),
"--rpc.laddr",
"tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index),
# overwrite pprof port: 6060
"--rpc.pprof_laddr", "127.0.0.1:%s" % pprof_port(index),
"--log_level", "debug"
"--rpc.pprof_laddr",
"127.0.0.1:%s" % pprof_port(index),
"--log_level",
"debug",
]
for k, v in updated_config.items():
@ -82,4 +105,4 @@ class ZGNode(BlockchainNode):
self.args.append(str(v))
def setup_config(self):
""" Already batch initialized by shell script in framework """
"""Already batch initialized by shell script in framework"""

View File

@ -11,6 +11,7 @@ from utility.utils import (
blockchain_rpc_port,
)
class ZgsNode(TestNode):
def __init__(
self,
@ -98,17 +99,21 @@ class ZgsNode(TestNode):
def zgs_download_segment(self, data_root, start_index, end_index):
return self.rpc.zgs_downloadSegment([data_root, start_index, end_index])
def zgs_download_segment_decoded(self, data_root: str, start_chunk_index: int, end_chunk_index: int) -> bytes:
encodedSegment = self.rpc.zgs_downloadSegment([data_root, start_chunk_index, end_chunk_index])
def zgs_download_segment_decoded(
self, data_root: str, start_chunk_index: int, end_chunk_index: int
) -> bytes:
encodedSegment = self.rpc.zgs_downloadSegment(
[data_root, start_chunk_index, end_chunk_index]
)
return None if encodedSegment is None else base64.b64decode(encodedSegment)
def zgs_get_file_info(self, data_root):
return self.rpc.zgs_getFileInfo([data_root])
return self.rpc.zgs_getFileInfo([data_root, True])
def zgs_get_file_info_by_tx_seq(self, tx_seq):
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
def zgs_get_flow_context(self, tx_seq):
return self.rpc.zgs_getFlowContext([tx_seq])
@ -118,9 +123,13 @@ class ZgsNode(TestNode):
def admin_start_sync_file(self, tx_seq):
return self.rpc.admin_startSyncFile([tx_seq])
def admin_start_sync_chunks(self, tx_seq: int, start_chunk_index: int, end_chunk_index: int):
return self.rpc.admin_startSyncChunks([tx_seq, start_chunk_index, end_chunk_index])
def admin_start_sync_chunks(
self, tx_seq: int, start_chunk_index: int, end_chunk_index: int
):
return self.rpc.admin_startSyncChunks(
[tx_seq, start_chunk_index, end_chunk_index]
)
def admin_get_sync_status(self, tx_seq):
return self.rpc.admin_getSyncStatus([tx_seq])
@ -128,8 +137,8 @@ class ZgsNode(TestNode):
def sync_status_is_completed_or_unknown(self, tx_seq):
status = self.rpc.admin_getSyncStatus([tx_seq])
return status == "Completed" or status == "unknown"
def admin_get_file_location(self, tx_seq, all_shards = True):
def admin_get_file_location(self, tx_seq, all_shards=True):
return self.rpc.admin_getFileLocation([tx_seq, all_shards])
def clean_data(self):

View File

@ -9,31 +9,40 @@ from enum import Enum, unique
from utility.utils import is_windows_platform, wait_until
# v1.0.0-ci release
GITHUB_DOWNLOAD_URL="https://api.github.com/repos/0glabs/0g-storage-node/releases/152560136"
GITHUB_DOWNLOAD_URL = (
"https://api.github.com/repos/0glabs/0g-storage-node/releases/152560136"
)
CONFLUX_BINARY = "conflux.exe" if is_windows_platform() else "conflux"
BSC_BINARY = "geth.exe" if is_windows_platform() else "geth"
ZG_BINARY = "0gchaind.exe" if is_windows_platform() else "0gchaind"
CLIENT_BINARY = "0g-storage-client.exe" if is_windows_platform() else "0g-storage-client"
CLIENT_BINARY = (
"0g-storage-client.exe" if is_windows_platform() else "0g-storage-client"
)
CLI_GIT_REV = "98d74b7e7e6084fc986cb43ce2c66692dac094a6"
@unique
class BuildBinaryResult(Enum):
AlreadyExists = 0
Installed = 1
NotInstalled = 2
def build_conflux(dir: str) -> BuildBinaryResult:
# Download or build conflux binary if absent
result = __download_from_github(
dir=dir,
binary_name=CONFLUX_BINARY,
github_url=GITHUB_DOWNLOAD_URL,
github_url=GITHUB_DOWNLOAD_URL,
asset_name=__asset_name(CONFLUX_BINARY, zip=True),
)
if result == BuildBinaryResult.AlreadyExists or result == BuildBinaryResult.Installed:
if (
result == BuildBinaryResult.AlreadyExists
or result == BuildBinaryResult.Installed
):
return result
return __build_from_github(
@ -44,6 +53,7 @@ def build_conflux(dir: str) -> BuildBinaryResult:
compiled_relative_path=["target", "release"],
)
def build_bsc(dir: str) -> BuildBinaryResult:
# Download bsc binary if absent
result = __download_from_github(
@ -55,10 +65,13 @@ def build_bsc(dir: str) -> BuildBinaryResult:
# Requires to download binary successfully, since it is not ready to build
# binary from source code.
assert result != BuildBinaryResult.NotInstalled, "Cannot download binary from github [%s]" % BSC_BINARY
assert result != BuildBinaryResult.NotInstalled, (
"Cannot download binary from github [%s]" % BSC_BINARY
)
return result
def build_zg(dir: str) -> BuildBinaryResult:
# Download or build 0gchain binary if absent
result = __download_from_github(
@ -68,7 +81,10 @@ def build_zg(dir: str) -> BuildBinaryResult:
asset_name=__asset_name(ZG_BINARY, zip=True),
)
if result == BuildBinaryResult.AlreadyExists or result == BuildBinaryResult.Installed:
if (
result == BuildBinaryResult.AlreadyExists
or result == BuildBinaryResult.Installed
):
return result
return __build_from_github(
@ -79,17 +95,18 @@ def build_zg(dir: str) -> BuildBinaryResult:
compiled_relative_path=[],
)
def build_cli(dir: str) -> BuildBinaryResult:
# Build 0g-storage-client binary if absent
return __build_from_github(
dir=dir,
binary_name=CLIENT_BINARY,
github_url="https://github.com/0glabs/0g-storage-client.git",
git_rev=CLI_GIT_REV,
build_cmd="go build",
compiled_relative_path=[],
)
def __asset_name(binary_name: str, zip: bool = False) -> str:
sys = platform.system().lower()
if sys == "linux":
@ -102,24 +119,34 @@ def __asset_name(binary_name: str, zip: bool = False) -> str:
else:
raise RuntimeError("Unable to recognize platform")
def __build_from_github(dir: str, binary_name: str, github_url: str, build_cmd: str, compiled_relative_path: list[str], git_rev = None) -> BuildBinaryResult:
def __build_from_github(
dir: str,
binary_name: str,
github_url: str,
build_cmd: str,
compiled_relative_path: list[str],
git_rev=None,
) -> BuildBinaryResult:
if git_rev is None:
versioned_binary_name = binary_name
elif binary_name.endswith(".exe"):
versioned_binary_name = binary_name.removesuffix(".exe") + f"_{git_rev}.exe"
else:
versioned_binary_name = f"{binary_name}_{git_rev}"
binary_path = os.path.join(dir, binary_name)
versioned_binary_path = os.path.join(dir, versioned_binary_name)
if os.path.exists(versioned_binary_path):
__create_sym_link(versioned_binary_name, binary_name, dir)
return BuildBinaryResult.AlreadyExists
start_time = time.time()
# clone code from github to a temp folder
code_tmp_dir_name = (binary_name[:-4] if is_windows_platform() else binary_name) + "_tmp"
code_tmp_dir_name = (
binary_name[:-4] if is_windows_platform() else binary_name
) + "_tmp"
code_tmp_dir = os.path.join(dir, code_tmp_dir_name)
if os.path.exists(code_tmp_dir):
shutil.rmtree(code_tmp_dir)
@ -145,14 +172,22 @@ def __build_from_github(dir: str, binary_name: str, github_url: str, build_cmd:
shutil.rmtree(code_tmp_dir, ignore_errors=True)
print("Completed to build binary " + binary_name + ", Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True)
print(
"Completed to build binary "
+ binary_name
+ ", Elapsed: "
+ str(int(time.time() - start_time))
+ " seconds",
flush=True,
)
return BuildBinaryResult.Installed
def __create_sym_link(src, dst, path = None):
def __create_sym_link(src, dst, path=None):
if src == dst:
return
origin_path = os.getcwd()
if path is not None:
os.chdir(path)
@ -171,16 +206,19 @@ def __create_sym_link(src, dst, path = None):
os.chdir(origin_path)
def __download_from_github(dir: str, binary_name: str, github_url: str, asset_name: str) -> BuildBinaryResult:
def __download_from_github(
dir: str, binary_name: str, github_url: str, asset_name: str
) -> BuildBinaryResult:
if not os.path.exists(dir):
os.makedirs(dir, exist_ok=True)
binary_path = os.path.join(dir, binary_name)
if os.path.exists(binary_path):
return BuildBinaryResult.AlreadyExists
print("Begin to download binary from github: %s" % binary_name, flush=True)
start_time = time.time()
req = requests.get(github_url)
@ -194,7 +232,7 @@ def __download_from_github(dir: str, binary_name: str, github_url: str, asset_na
if download_url is None:
print(f"Cannot find asset by name {asset_name}", flush=True)
return BuildBinaryResult.NotInstalled
content = requests.get(download_url).content
# Supports to read from zipped binary
@ -203,17 +241,24 @@ def __download_from_github(dir: str, binary_name: str, github_url: str, asset_na
with open(asset_path, "xb") as f:
f.write(content)
shutil.unpack_archive(asset_path, dir)
assert os.path.exists(binary_path), f"Cannot find binary after unzip, binary = {binary_name}, asset = {asset_name}"
assert os.path.exists(
binary_path
), f"Cannot find binary after unzip, binary = {binary_name}, asset = {asset_name}"
else:
with open(binary_path, "xb") as f:
f.write(content)
f.write(content)
if not is_windows_platform():
st = os.stat(binary_path)
os.chmod(binary_path, st.st_mode | stat.S_IEXEC)
wait_until(lambda: os.access(binary_path, os.X_OK), timeout=120)
print("Completed to download binary, Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True)
print(
"Completed to download binary, Elapsed: "
+ str(int(time.time() - start_time))
+ " seconds",
flush=True,
)
return BuildBinaryResult.Installed

View File

@ -162,15 +162,20 @@ class MerkleTree:
n = len(data)
if n < ENTRY_SIZE or (n & (n - 1)) != 0:
raise Exception("Input length is not power of 2")
leaves = [Leaf.from_data(data[i:i + ENTRY_SIZE], tree.hasher) for i in range(0, n, ENTRY_SIZE)]
leaves = [
Leaf.from_data(data[i : i + ENTRY_SIZE], tree.hasher)
for i in range(0, n, ENTRY_SIZE)
]
tree.__leaves = leaves
nodes = leaves
while len(nodes) > 1:
next_nodes = []
for i in range(0, len(nodes), 2):
next_nodes.append(Node.from_children(nodes[i], nodes[i+1], tree.hasher))
next_nodes.append(
Node.from_children(nodes[i], nodes[i + 1], tree.hasher)
)
nodes = next_nodes

View File

@ -12,8 +12,20 @@ DEFAULT_PORT_MIN = 11000
DEFAULT_PORT_MAX = 65535
DEFAULT_PORT_RANGE = 500
def print_testcase_result(color, glyph, script, start_time):
print(color[1] + glyph + " Testcase " + script + "\telapsed: " + str(int(time.time() - start_time)) + " seconds" + color[0], flush=True)
print(
color[1]
+ glyph
+ " Testcase "
+ script
+ "\telapsed: "
+ str(int(time.time() - start_time))
+ " seconds"
+ color[0],
flush=True,
)
def run_single_test(py, script, test_dir, index, port_min, port_max):
try:
@ -58,7 +70,14 @@ def run_single_test(py, script, test_dir, index, port_min, port_max):
raise err
print_testcase_result(BLUE, TICK, script, start_time)
def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, long_manual_tests: set[str]={}, single_run_tests: set[str]={}):
def run_all(
test_dir: str,
test_subdirs: list[str] = [],
slow_tests: set[str] = {},
long_manual_tests: set[str] = {},
single_run_tests: set[str] = {},
):
tmp_dir = os.path.join(test_dir, "tmp")
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir, exist_ok=True)
@ -102,7 +121,11 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
for file in os.listdir(subdir_path):
if file.endswith("_test.py"):
rel_path = os.path.join(subdir, file)
if rel_path not in slow_tests and rel_path not in long_manual_tests and rel_path not in single_run_tests:
if (
rel_path not in slow_tests
and rel_path not in long_manual_tests
and rel_path not in single_run_tests
):
TEST_SCRIPTS.append(rel_path)
executor = ProcessPoolExecutor(max_workers=options.max_workers)
@ -154,4 +177,3 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
for c in failed:
print(c)
sys.exit(1)

View File

@ -25,7 +25,13 @@ class RpcCaller:
if isinstance(parsed, Ok):
return parsed.result
else:
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed))
print(
"Failed to call RPC, method = %s(%s), error = %s"
% (self.method, str(*args)[-1500:], parsed)
)
except Exception as ex:
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex))
print(
"Failed to call RPC, method = %s(%s), exception = %s"
% (self.method, str(*args)[-1500:], ex)
)
return None

View File

@ -1,2 +1,2 @@
ENTRY_SIZE = 256
PORA_CHUNK_SIZE = 1024
PORA_CHUNK_SIZE = 1024

View File

@ -5,6 +5,7 @@ from math import log2
from utility.merkle_tree import add_0x_prefix, Leaf, MerkleTree
from utility.spec import ENTRY_SIZE, PORA_CHUNK_SIZE
def log2_pow2(n):
return int(log2(((n ^ (n - 1)) >> 1) + 1))
@ -106,29 +107,27 @@ def create_segment_node(data, offset, batch, size):
else:
tree.add_leaf(Leaf(segment_root(data[start:end])))
return tree.get_root_hash()
segment_root_cached_chunks = None
segment_root_cached_output = None
def segment_root(chunks):
global segment_root_cached_chunks, segment_root_cached_output
if segment_root_cached_chunks == chunks:
return segment_root_cached_output
data_len = len(chunks)
if data_len == 0:
return b"\x00" * 32
tree = MerkleTree()
for i in range(0, data_len, ENTRY_SIZE):
tree.encrypt(chunks[i : i + ENTRY_SIZE])
digest = tree.get_root_hash()
segment_root_cached_chunks = chunks
@ -241,4 +240,4 @@ def data_to_segments(data):
segments.append(segment)
idx += 1
return segments
return segments

View File

@ -5,6 +5,7 @@ import rtoml
import time
import sha3
class PortMin:
# Must be initialized with a unique integer for each process
n = 11000
@ -35,15 +36,19 @@ def blockchain_rpc_port(n):
def blockchain_rpc_port_core(n):
return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
def blockchain_ws_port(n):
return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
def blockchain_rpc_port_tendermint(n):
return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
def pprof_port(n):
return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):
if attempts == float("inf") and timeout == float("inf"):
timeout = 60
@ -135,11 +140,12 @@ def assert_greater_than_or_equal(thing1, thing2):
if thing1 < thing2:
raise AssertionError("%s < %s" % (str(thing1), str(thing2)))
# 14900K has the performance point 100
# 14900K has the performance point 100
def estimate_st_performance():
hasher = sha3.keccak_256()
input = b"\xcc" * (1<<26)
input = b"\xcc" * (1 << 26)
start_time = time.perf_counter()
hasher.update(input)
digest = hasher.hexdigest()
return 10 / (time.perf_counter() - start_time)
return 10 / (time.perf_counter() - start_time)