Compare commits

..

5 Commits
v0.8.6 ... main

Author SHA1 Message Date
Donny
74074dfa2f
Update update_abis.shFixing bug with incorrect argument handling in copy_abis function (#342)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2025-03-24 17:03:58 +08:00
Maximilian Hubert
12538e4b6c
Typo Fixes in Documentation (#340)
* Update contributing.md

* Update mining-reward.md

* Update proof-of-random-access.md

* Update run.md
2025-03-24 17:02:54 +08:00
Alex Pikme
2fd8ffc2ea
fix spelling error and correct minor error (#343)
* Update network_behaviour.rs

* Update client.rs

---------

Co-authored-by: XxAlex74xX <30472093+XxAlex74xX@users.noreply.github.com>
2025-03-24 16:56:12 +08:00
PixelPilot
4cf45149cb
Update file_location_cache.rs (#351) 2025-03-24 16:55:28 +08:00
0g-peterzhb
cfe4b45c41
add api of getting available file info by root (#357)
* add api of getting available file info by root
2025-03-24 16:51:53 +08:00
51 changed files with 692 additions and 356 deletions

View File

@ -45,6 +45,11 @@ jobs:
python-version: '3.9'
cache: 'pip'
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'
- name: Install dependencies
run: |
python -m pip install --upgrade pip

View File

@ -4,7 +4,7 @@
### Checks
* [ ] I've made sure the lint is passing in this PR.
* [ ] I've made sure the linter is passing in this PR.
* [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, in that case, please comment that they are not relevant.
* [ ] Testing Strategy
* [ ] Unit tests

View File

@ -1,6 +1,6 @@
# Mining Reward
0G Storage creates pricing segments every 8 GB of data chunks over the data flow. Each pricing segment is associated with an Endowment Pool and a Reward Pool. The Endowment Pool collects the storage endowments of all the data chunks belongs to this pricing segment and releases a fixed ratio of balance to the Reward Pool every second. The rate of reward release is set to 4% per year.
0G Storage creates pricing segments every 8 GB of data chunks over the data flow. Each pricing segment is associated with an Endowment Pool and a Reward Pool. The Endowment Pool collects the storage endowments of all the data chunks belong to this pricing segment and releases a fixed ratio of balance to the Reward Pool every second. The rate of reward release is set to 4% per year.
The mining reward is paid to miners for providing data service. Miners receive mining reward when submit the first legitimate PoRA for a mining epoch to 0G Storage contract.

View File

@ -2,7 +2,7 @@
The ZeroGravity network adopts a Proof of Random Access (PoRA) mechanism to incentivize miners to store data. By requiring miners to answer randomly produced queries to archived data chunks, the PoRA mechanism establishes the relation between mining proof generation power and data storage. Miners answer the queries repeatedly and computes an output digest for each loaded chunk until find a digest that satisfies the mining difficulty (i.e., has enough leading zeros). PoRA will stress the miners' disk I/O and reduce their capability to respond user queries. So 0G Storage adopts intermittent mining, in which a mining epoch starts with a block generation at a specific block height on the host chain and stops when a valid PoRA is submitted to the 0G Storage contract.
In a strawman design, a PoRA iteration consists of a computing stage and a loading stage. In the computing stage, a miner computes a random recall position (the universal offset in the flow) based on an arbitrary picked random nonce and a mining status read from the host chain. In the loading stage, a miner loads the archived data chunks at the given recall position, and computes output digest by hashing the tuple of mining status and the data chunks. If the output digest satisfies the target difficulty, the miner can construct a legitimate PoRA consists of the chosen random nonce, the loaded data chunk and the proof for the correctness of data chunk to the mining contract.
In a strawman design, a PoRA iteration consists of a computing stage and a loading stage. In the computing stage, a miner computes a random recall position (the universal offset in the flow) based on an arbitrary picked random nonce and a mining status read from the host chain. In the loading stage, a miner loads the archived data chunks at the given recall position, and computes output digest by hashing the tuple of mining status and the data chunks. If the output digest satisfies the target difficulty, the miner can construct a legitimate PoRA, which consists of the chosen random nonce, the loaded data chunk and the proof for the correctness of data chunk to the mining contract.
## Fairness

View File

@ -4,7 +4,7 @@
### Setup Environment
Install the dependencies Node.js, yarn, hardhat.
Install the dependencies: Node.js, yarn, hardhat.
- Linux

View File

@ -358,7 +358,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_peek_priority() {
fn test_announcement_cache_peek_priority() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();
@ -382,7 +382,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_pop_len() {
fn test_announcement_cache_pop_len() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();
@ -404,7 +404,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_garbage_collect() {
fn test_announcement_cache_garbage_collect() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();
@ -422,7 +422,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_insert_gc() {
fn test_announcement_cache_insert_gc() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();
@ -438,7 +438,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_insert_ignore_older() {
fn test_announcement_cache_insert_ignore_older() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();
@ -461,7 +461,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_insert_overwrite() {
fn test_announcement_cache_insert_overwrite() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();
@ -479,7 +479,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_insert_cap_exceeded() {
fn test_announcement_cache_insert_cap_exceeded() {
let mut cache = AnnouncementCache::new(3, 3600);
let now = timestamp_now();
@ -499,7 +499,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_random() {
fn test_announcement_cache_random() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();
@ -515,7 +515,7 @@ mod tests {
}
#[test]
fn test_annoucement_cache_all() {
fn test_announcement_cache_all() {
let mut cache = AnnouncementCache::new(100, 3600);
let now = timestamp_now();

View File

@ -134,7 +134,7 @@ impl NetworkBehaviour for PeerManager {
BanResult::NotBanned => {}
}
// Count dialing peers in the limit if the peer dialied us.
// Count dialing peers in the limit if the peer dialed us.
let count_dialing = endpoint.is_listener();
// Check the connection limits
if self.peer_limit_reached(count_dialing)

View File

@ -19,7 +19,7 @@ pub struct Client {
#[derive(Clone, Copy, Debug, Serialize, PartialEq, Eq, AsRefStr, IntoStaticStr, EnumIter)]
pub enum ClientKind {
/// An Zgs node.
/// A Zgs node.
Zgs,
/// An unknown client.
Unknown,

View File

@ -63,7 +63,11 @@ pub trait Rpc {
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
#[method(name = "getFileInfo")]
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>>;
async fn get_file_info(
&self,
data_root: DataRoot,
need_available: bool,
) -> RpcResult<Option<FileInfo>>;
#[method(name = "getFileInfoByTxSeq")]
async fn get_file_info_by_tx_seq(&self, tx_seq: u64) -> RpcResult<Option<FileInfo>>;

View File

@ -95,7 +95,7 @@ impl RpcServer for RpcServerImpl {
let tx_seq = try_option!(
self.ctx
.log_store
.get_tx_seq_by_data_root(&data_root)
.get_tx_seq_by_data_root(&data_root, true)
.await?
);
@ -121,7 +121,12 @@ impl RpcServer for RpcServerImpl {
) -> RpcResult<Option<SegmentWithProof>> {
info!(%data_root, %index, "zgs_downloadSegmentWithProof");
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
let tx = try_option!(
self.ctx
.log_store
.get_tx_by_data_root(&data_root, true)
.await?
);
self.get_segment_with_proof_by_tx(tx, index).await
}
@ -144,7 +149,12 @@ impl RpcServer for RpcServerImpl {
let seq = match tx_seq_or_root {
TxSeqOrRoot::TxSeq(v) => v,
TxSeqOrRoot::Root(v) => {
try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?)
try_option!(
self.ctx
.log_store
.get_tx_seq_by_data_root(&v, false)
.await?
)
}
};
@ -163,10 +173,19 @@ impl RpcServer for RpcServerImpl {
}
}
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>> {
async fn get_file_info(
&self,
data_root: DataRoot,
need_available: bool,
) -> RpcResult<Option<FileInfo>> {
debug!(%data_root, "zgs_getFileInfo");
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
let tx = try_option!(
self.ctx
.log_store
.get_tx_by_data_root(&data_root, need_available)
.await?
);
Ok(Some(self.get_file_info_by_tx(tx).await?))
}
@ -288,7 +307,7 @@ impl RpcServerImpl {
let maybe_tx = self
.ctx
.log_store
.get_tx_by_data_root(&segment.root)
.get_tx_by_data_root(&segment.root, false)
.await?;
self.put_segment_with_maybe_tx(segment, maybe_tx).await

View File

@ -59,15 +59,23 @@ impl Store {
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
pub async fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>> {
pub async fn get_tx_seq_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<u64>> {
let root = *data_root;
self.spawn(move |store| store.get_tx_seq_by_data_root(&root))
self.spawn(move |store| store.get_tx_seq_by_data_root(&root, need_available))
.await
}
pub async fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
pub async fn get_tx_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<Transaction>> {
let root = *data_root;
self.spawn(move |store| store.get_tx_by_data_root(&root))
self.spawn(move |store| store.get_tx_by_data_root(&root, need_available))
.await
}

View File

@ -511,7 +511,7 @@ impl LogStoreChunkRead for LogManager {
index_start: usize,
index_end: usize,
) -> crate::error::Result<Option<ChunkArray>> {
let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root)?);
let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root, true)?);
self.get_chunks_by_tx_and_index_range(tx_seq, index_start, index_end)
}
@ -536,13 +536,27 @@ impl LogStoreRead for LogManager {
self.tx_store.get_tx_by_seq_number(seq)
}
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> {
fn get_tx_seq_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> crate::error::Result<Option<u64>> {
let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
let mut available_seq = None;
for tx_seq in &seq_list {
if self.tx_store.check_tx_completed(*tx_seq)? {
// Return the first finalized tx if possible.
return Ok(Some(*tx_seq));
}
if need_available
&& available_seq.is_none()
&& !self.tx_store.check_tx_pruned(*tx_seq)?
{
available_seq = Some(*tx_seq);
}
}
if need_available {
return Ok(available_seq);
}
// No tx is finalized, return the first one.
Ok(seq_list.first().cloned())

View File

@ -31,14 +31,22 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
/// Get a transaction by the data root of its data.
/// If all txs are not finalized, return the first one.
/// If all txs are not finalized, return the first one if need available is false.
/// Otherwise, return the first finalized tx.
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>;
fn get_tx_seq_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<u64>>;
/// If all txs are not finalized, return the first one.
/// If all txs are not finalized, return the first one if need available is false.
/// Otherwise, return the first finalized tx.
fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
match self.get_tx_seq_by_data_root(data_root)? {
fn get_tx_by_data_root(
&self,
data_root: &DataRoot,
need_available: bool,
) -> Result<Option<Transaction>> {
match self.get_tx_seq_by_data_root(data_root, need_available)? {
Some(seq) => self.get_tx_by_seq_number(seq),
None => Ok(None),
}

View File

@ -50,7 +50,7 @@ copy_file() {
copy_abis() {
for contract_name in "$@"; do
copy_file $(./scripts/search_abi.sh "$path/artifacts" "$contract_name.json") "storage-contracts-abis/$contract_name.json"
copy_file "$(./scripts/search_abi.sh "$path/artifacts" "$contract_name.json")" "storage-contracts-abis/$contract_name.json"
done
}

View File

@ -17,7 +17,10 @@ class ExampleTest(TestFramework):
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
wait_until(lambda: not client.zgs_get_file_info(data_root)["isCached"] and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1)
wait_until(
lambda: not client.zgs_get_file_info(data_root)["isCached"]
and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1
)
client.zgs_upload_segment(segments[1])
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])

View File

@ -7,9 +7,7 @@ ZGS_CONFIG = {
"log_config_file": "log_config",
"confirmation_block_count": 1,
"discv5_disable_ip_limit": True,
"network_peer_manager": {
"heartbeat_interval": "1s"
},
"network_peer_manager": {"heartbeat_interval": "1s"},
"router": {
"private_ip_enabled": True,
},
@ -22,7 +20,7 @@ ZGS_CONFIG = {
"auto_sync_idle_interval": "1s",
"sequential_find_peer_timeout": "10s",
"random_find_peer_timeout": "10s",
}
},
}
CONFIG_DIR = os.path.dirname(__file__)
@ -75,11 +73,12 @@ TX_PARAMS1 = {
NO_SEAL_FLAG = 0x1
NO_MERKLE_PROOF_FLAG = 0x2
def update_config(default: dict, custom: dict):
"""
Supports to update configurations with dict value.
"""
for (key, value) in custom.items():
for key, value in custom.items():
if default.get(key) is None or type(value) != dict:
default[key] = value
else:

View File

@ -20,17 +20,15 @@ class CrashTest(TestFramework):
segment = submit_data(self.nodes[0], chunk_data)
self.log.info("segment: %s", segment)
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True)
wait_until(
lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True
)
for i in range(1, self.num_nodes):
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(0)
self.log.info("wait for node: %s", i)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
# 2: first node runnging, other nodes killed
self.log.info("kill node")
@ -56,22 +54,16 @@ class CrashTest(TestFramework):
for i in range(2, self.num_nodes):
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(1)
self.nodes[i].stop(kill=True)
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(1)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
# 4: node[1..] synced contract entries and killed
self.log.info("kill node 0")
@ -96,13 +88,9 @@ class CrashTest(TestFramework):
self.log.info("wait for node: %s", i)
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(2)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
# 5: node[1..] synced contract entries and killed, sync disorder
self.nodes[0].stop(kill=True)
@ -137,21 +125,13 @@ class CrashTest(TestFramework):
self.log.info("wait for node: %s", i)
self.start_storage_node(i)
self.nodes[i].wait_for_rpc_connection()
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None)
self.nodes[i].admin_start_sync_file(4)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"])
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(3)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
if __name__ == "__main__":

View File

@ -45,13 +45,9 @@ class FuzzTest(TestFramework):
lock.release()
log.info("submit data via client %s", idx)
wait_until(
lambda: nodes[idx].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: nodes[idx].zgs_get_file_info(data_root) is not None)
segment = submit_data(nodes[idx], chunk_data)
wait_until(
lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"])
lock.acquire()
nodes_index.append(idx)
@ -65,17 +61,15 @@ class FuzzTest(TestFramework):
lambda: nodes[idx].zgs_get_file_info(data_root) is not None
)
def wait_finalized():
def wait_finalized():
ret = nodes[idx].zgs_get_file_info(data_root)
if ret["finalized"]:
return True
else:
nodes[idx].admin_start_sync_file(ret['tx']['seq'])
nodes[idx].admin_start_sync_file(ret["tx"]["seq"])
return False
wait_until(
lambda: wait_finalized(), timeout = 180
)
wait_until(lambda: wait_finalized(), timeout=180)
def run_small_chunk_size(nodes, contract, log):
sizes = [i for i in range(1, SAMLL_SIZE + 1)]
@ -84,7 +78,7 @@ class FuzzTest(TestFramework):
run_chunk_size(sizes, nodes, contract, log)
def run_large_chunk_size(nodes, contract, log):
sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256 )]
sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256)]
random.shuffle(sizes)
run_chunk_size(sizes, nodes, contract, log)

View File

@ -18,7 +18,6 @@ class LongTimeMineTest(TestFramework):
self.mine_period = 15
self.launch_wait_seconds = 15
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
@ -44,7 +43,10 @@ class LongTimeMineTest(TestFramework):
self.submit_data(b"\x11", 2000)
self.log.info("Start mine")
wait_until(lambda: int(blockchain.eth_blockNumber(), 16) > self.mine_period, timeout=180)
wait_until(
lambda: int(blockchain.eth_blockNumber(), 16) > self.mine_period,
timeout=180,
)
self.log.info("Wait for the first mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == 1)

View File

@ -15,7 +15,11 @@ class MineTest(TestFramework):
}
self.mine_period = int(45 / self.block_time)
self.launch_wait_seconds = 15
self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
self.log.info(
"Contract Info: Est. block time %.2f, Mine period %d",
self.block_time,
self.mine_period,
)
def submit_data(self, item, size):
submissions_before = self.contract.num_submissions()
@ -37,7 +41,11 @@ class MineTest(TestFramework):
first_block = self.contract.first_block()
self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16))
self.log.info("Flow deployment block number %d, epoch 1 start %d", first_block, first_block + self.mine_period)
self.log.info(
"Flow deployment block number %d, epoch 1 start %d",
first_block,
first_block + self.mine_period,
)
wait_until(lambda: self.contract.epoch() >= 1, timeout=180)
quality = int(2**256 / 100 / estimate_st_performance())
@ -54,27 +62,39 @@ class MineTest(TestFramework):
self.contract.update_context()
self.log.info("Wait for the first mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=180)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1
and not self.mine_contract.can_submit(),
timeout=180,
)
self.log.info("Wait for the second mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 2, timeout=180)
self.contract.update_context()
self.log.info("Wait for the second mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit(), timeout=180)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2
and not self.mine_contract.can_submit(),
timeout=180,
)
self.nodes[0].miner_stop()
self.log.info("Wait for the third mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 3, timeout=180)
self.contract.update_context()
self.log.info("Submit the second data chunk")
self.submit_data(b"\x22", 2000)
# Now the storage node should have the latest flow, but the mining context is using an old one.
self.nodes[0].miner_start()
self.log.info("Wait for the third mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 3 and not self.mine_contract.can_submit(), timeout=180)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 3
and not self.mine_contract.can_submit(),
timeout=180,
)
self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16))

View File

@ -2,13 +2,18 @@
from test_framework.test_framework import TestFramework
from config.node_config import MINER_ID, GENESIS_PRIV_KEY
from utility.submission import create_submission, submit_data
from utility.utils import wait_until, assert_equal, assert_greater_than, estimate_st_performance
from utility.utils import (
wait_until,
assert_equal,
assert_greater_than,
estimate_st_performance,
)
from test_framework.blockchain_node import BlockChainNodeType
import time
import math
PRICE_PER_SECTOR = math.ceil(10 * (10 ** 18) / (2 ** 30) * 256 / 12)
PRICE_PER_SECTOR = math.ceil(10 * (10**18) / (2**30) * 256 / 12)
class MineTest(TestFramework):
@ -23,17 +28,21 @@ class MineTest(TestFramework):
self.enable_market = True
self.mine_period = int(50 / self.block_time)
self.launch_wait_seconds = 15
self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
self.log.info(
"Contract Info: Est. block time %.2f, Mine period %d",
self.block_time,
self.mine_period,
)
def submit_data(self, item, size, no_submit = False):
def submit_data(self, item, size, no_submit=False):
submissions_before = self.contract.num_submissions()
client = self.nodes[0]
chunk_data = item * 256 * size
submissions, data_root = create_submission(chunk_data)
value = int(size * PRICE_PER_SECTOR * 1.1)
self.contract.submit(submissions, tx_prarams = {"value": value})
self.contract.submit(submissions, tx_prarams={"value": value})
wait_until(lambda: self.contract.num_submissions() == submissions_before + 1)
if not no_submit:
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
segment = submit_data(client, chunk_data)
@ -48,11 +57,15 @@ class MineTest(TestFramework):
difficulty = int(2**256 / 5 / estimate_st_performance())
self.mine_contract.set_quality(difficulty)
SECTORS_PER_PRICING = int(8 * ( 2 ** 30 ) / 256)
SECTORS_PER_PRICING = int(8 * (2**30) / 256)
first_block = self.contract.first_block()
self.log.info("Current block number %d", int(blockchain.eth_blockNumber(), 16))
self.log.info("Flow deployment block number %d, epoch 1 start %d, wait for epoch 1 start", first_block, first_block + self.mine_period)
self.log.info(
"Flow deployment block number %d, epoch 1 start %d, wait for epoch 1 start",
first_block,
first_block + self.mine_period,
)
wait_until(lambda: self.contract.epoch() >= 1, timeout=180)
self.log.info("Submit the actual data chunk (256 MB)")
@ -63,8 +76,12 @@ class MineTest(TestFramework):
# wait_until(lambda: self.contract.epoch() >= 1, timeout=180)
start_epoch = self.contract.epoch()
self.log.info("Submission Done, epoch is %d, current block number %d", start_epoch, int(blockchain.eth_blockNumber(), 16))
self.log.info(
"Submission Done, epoch is %d, current block number %d",
start_epoch,
int(blockchain.eth_blockNumber(), 16),
)
self.log.info("Wait for mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 1, timeout=180)
@ -72,27 +89,38 @@ class MineTest(TestFramework):
self.contract.update_context()
self.log.info("Wait for mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1 and not self.mine_contract.can_submit(), timeout=120)
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 1
and not self.mine_contract.can_submit(),
timeout=120,
)
rewards = self.reward_contract.reward_distributes()
assert_equal(len(rewards), 4)
firstReward = rewards[0].args.amount
self.log.info("Received reward %d Gwei", firstReward / (10**9))
self.reward_contract.donate(10000 * 10 ** 18)
self.reward_contract.donate(10000 * 10**18)
self.log.info("Donation Done")
self.log.info("Submit the data hash only (8 GB)")
self.submit_data(b"\x11", int(SECTORS_PER_PRICING), no_submit=True)
current_epoch = self.contract.epoch()
assert_equal(current_epoch, start_epoch + 1);
self.log.info("Sumission Done, epoch is %d, current block number %d", self.contract.epoch(), int(blockchain.eth_blockNumber(), 16))
assert_equal(current_epoch, start_epoch + 1)
self.log.info(
"Sumission Done, epoch is %d, current block number %d",
self.contract.epoch(),
int(blockchain.eth_blockNumber(), 16),
)
self.log.info("Wait for mine context release")
wait_until(lambda: self.contract.epoch() >= start_epoch + 2, timeout=180)
self.contract.update_context()
self.log.info("Wait for mine answer")
wait_until(lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2 and not self.mine_contract.can_submit())
wait_until(
lambda: self.mine_contract.last_mined_epoch() == start_epoch + 2
and not self.mine_contract.can_submit()
)
assert_equal(self.contract.epoch(), start_epoch + 2)
rewards = self.reward_contract.reward_distributes()

View File

@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkDiscoveryTest(TestFramework):
"""
This is to test whether community nodes could connect to each other via UDP discovery.
@ -24,7 +25,6 @@ class NetworkDiscoveryTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
}
@ -37,7 +37,6 @@ class NetworkDiscoveryTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
@ -57,7 +56,13 @@ class NetworkDiscoveryTest(TestFramework):
total_connected += info["connectedPeers"]
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
i,
info["totalPeers"],
info["bannedPeers"],
info["disconnectedPeers"],
info["connectedPeers"],
info["connectedIncomingPeers"],
info["connectedOutgoingPeers"],
)
if total_connected >= self.num_nodes * (self.num_nodes - 1):
@ -66,5 +71,6 @@ class NetworkDiscoveryTest(TestFramework):
self.log.info("====================================")
self.log.info("All nodes connected to each other successfully")
if __name__ == "__main__":
NetworkDiscoveryTest().main()

View File

@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkDiscoveryUpgradeTest(TestFramework):
"""
This is to test that low version community nodes could not connect to bootnodes.
@ -24,7 +25,6 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
}
@ -37,11 +37,9 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
# disable network identity in ENR
"discv5_disable_enr_network_id": True,
}
@ -57,7 +55,13 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
total_connected += info["connectedPeers"]
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
i,
info["totalPeers"],
info["bannedPeers"],
info["disconnectedPeers"],
info["connectedPeers"],
info["connectedIncomingPeers"],
info["connectedOutgoingPeers"],
)
# ENR incompatible and should not discover each other for TCP connection
@ -66,5 +70,6 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
self.log.info("====================================")
self.log.info("ENR incompatible nodes do not connect to each other")
if __name__ == "__main__":
NetworkDiscoveryUpgradeTest().main()

View File

@ -7,6 +7,7 @@ from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkTcpShardTest(TestFramework):
"""
This is to test TCP connection for shard config mismatched peers of UDP discovery.
@ -24,12 +25,10 @@ class NetworkTcpShardTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
# custom shard config
"shard_position": "0/4"
"shard_position": "0/4",
}
# setup node 1 & 2 as community nodes
@ -40,13 +39,11 @@ class NetworkTcpShardTest(TestFramework):
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
# custom shard config
"shard_position": f"{i}/4"
"shard_position": f"{i}/4",
}
def run_test(self):
@ -60,7 +57,13 @@ class NetworkTcpShardTest(TestFramework):
info = self.nodes[i].rpc.admin_getNetworkInfo()
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
i,
info["totalPeers"],
info["bannedPeers"],
info["disconnectedPeers"],
info["connectedPeers"],
info["connectedIncomingPeers"],
info["connectedOutgoingPeers"],
)
if i == timeout_secs - 1:
@ -72,5 +75,6 @@ class NetworkTcpShardTest(TestFramework):
self.log.info("====================================")
self.log.info("All nodes discovered but not connected for each other")
if __name__ == "__main__":
NetworkTcpShardTest().main()

View File

@ -23,7 +23,11 @@ class PrunerTest(TestFramework):
self.mine_period = int(45 / self.block_time)
self.lifetime_seconds = 240
self.launch_wait_seconds = 15
self.log.info("Contract Info: Est. block time %.2f, Mine period %d", self.block_time, self.mine_period)
self.log.info(
"Contract Info: Est. block time %.2f, Mine period %d",
self.block_time,
self.mine_period,
)
def run_test(self):
client = self.nodes[0]
@ -31,7 +35,10 @@ class PrunerTest(TestFramework):
chunk_data = b"\x02" * 16 * 256 * 1024
# chunk_data = b"\x02" * 5 * 1024 * 1024 * 1024
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
self.contract.submit(
submissions,
tx_prarams={"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)},
)
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)

View File

@ -7,6 +7,7 @@ from utility.submission import create_submission
from utility.submission import submit_data
from utility.utils import wait_until
class RandomTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
@ -32,14 +33,18 @@ class RandomTest(TestFramework):
else:
size = random.randint(0, max_size)
no_data = random.random() <= no_data_ratio
self.log.info(f"choose {chosen_node}, seq={i}, size={size}, no_data={no_data}")
self.log.info(
f"choose {chosen_node}, seq={i}, size={size}, no_data={no_data}"
)
client = self.nodes[chosen_node]
chunk_data = random.randbytes(size)
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == i + 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None, timeout=120)
wait_until(
lambda: client.zgs_get_file_info(data_root) is not None, timeout=120
)
if not no_data:
submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
@ -47,8 +52,17 @@ class RandomTest(TestFramework):
for node_index in range(len(self.nodes)):
if node_index != chosen_node:
self.log.debug(f"check {node_index}")
wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root) is not None, timeout=300)
wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root)["finalized"], timeout=300)
wait_until(
lambda: self.nodes[node_index].zgs_get_file_info(data_root)
is not None,
timeout=300,
)
wait_until(
lambda: self.nodes[node_index].zgs_get_file_info(data_root)[
"finalized"
],
timeout=300,
)
# TODO(zz): This is a temp solution to trigger auto sync after all nodes started.
if i >= tx_count - 2:
continue
@ -72,7 +86,10 @@ class RandomTest(TestFramework):
if not no_data:
for node in self.nodes:
self.log.debug(f"check {data_root}, {node.index}")
wait_until(lambda: node.zgs_get_file_info(data_root)["finalized"], timeout=300)
wait_until(
lambda: node.zgs_get_file_info(data_root)["finalized"],
timeout=300,
)
if __name__ == "__main__":

View File

@ -32,8 +32,6 @@ class RootConsistencyTest(TestFramework):
assert_equal(contract_length, expected_length)
assert_equal(contract_root, node_root[2:])
def run_test(self):
self.assert_flow_status(1)
@ -48,7 +46,6 @@ class RootConsistencyTest(TestFramework):
self.submit_data(b"\x13", 512 + 256)
self.assert_flow_status(1024 + 512 + 256)
if __name__ == "__main__":

View File

@ -29,6 +29,7 @@ class RpcTest(TestFramework):
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)
assert_equal(client1.zgs_get_file_info(data_root)["pruned"], False)
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
@ -37,17 +38,13 @@ class RpcTest(TestFramework):
self.log.info("segment: %s", segment)
wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"])
assert_equal(
client1.zgs_download_segment(data_root, 0, 1), segment[0]["data"]
)
assert_equal(client1.zgs_download_segment(data_root, 0, 1), segment[0]["data"])
client2.admin_start_sync_file(0)
wait_until(lambda: client2.sync_status_is_completed_or_unknown(0))
wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])
assert_equal(
client2.zgs_download_segment(data_root, 0, 1), segment[0]["data"]
)
assert_equal(client2.zgs_download_segment(data_root, 0, 1), segment[0]["data"])
self.__test_upload_file_with_cli(client1)
@ -89,9 +86,7 @@ class RpcTest(TestFramework):
)
)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(root)["finalized"])
assert_equal(
client1.zgs_download_segment(root, 0, 2),

View File

@ -41,7 +41,11 @@ class SubmissionTest(TestFramework):
for tx_offset in range(same_root_tx_count + 1):
tx_seq = next_tx_seq - 1 - tx_offset
# old txs are finalized after finalizing the new tx, so we may need to wait here.
wait_until(lambda: self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)["finalized"])
wait_until(
lambda: self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)[
"finalized"
]
)
# Send tx after uploading data
for _ in range(same_root_tx_count):
@ -57,7 +61,10 @@ class SubmissionTest(TestFramework):
client = self.nodes[node_idx]
wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq) is not None)
wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"] == data_finalized)
wait_until(
lambda: client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"]
== data_finalized
)
def submit_data(self, chunk_data, node_idx=0):
_, data_root = create_submission(chunk_data)

View File

@ -18,30 +18,30 @@ class ShardSubmitTest(TestFramework):
self.num_blockchain_nodes = 1
self.num_nodes = 4
self.zgs_node_configs[0] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "0/4"
"db_max_num_sectors": 2**30,
"shard_position": "0/4",
}
self.zgs_node_configs[1] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "1/4"
"db_max_num_sectors": 2**30,
"shard_position": "1/4",
}
self.zgs_node_configs[2] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "2/4"
"db_max_num_sectors": 2**30,
"shard_position": "2/4",
}
self.zgs_node_configs[3] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "3/4"
"db_max_num_sectors": 2**30,
"shard_position": "3/4",
}
def run_test(self):
data_size = [
256*960,
256*1024,
256 * 960,
256 * 1024,
2,
255,
256*960,
256*120,
256 * 960,
256 * 120,
256,
257,
1023,
@ -77,5 +77,6 @@ class ShardSubmitTest(TestFramework):
submit_data(client, chunk_data)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
if __name__ == "__main__":
ShardSubmitTest().main()

View File

@ -13,16 +13,16 @@ class PrunerTest(TestFramework):
self.num_blockchain_nodes = 1
self.num_nodes = 4
self.zgs_node_configs[0] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "0/2"
"db_max_num_sectors": 2**30,
"shard_position": "0/2",
}
self.zgs_node_configs[1] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "1/2"
"db_max_num_sectors": 2**30,
"shard_position": "1/2",
}
self.zgs_node_configs[3] = {
"db_max_num_sectors": 2 ** 30,
"shard_position": "1/4"
"db_max_num_sectors": 2**30,
"shard_position": "1/4",
}
self.enable_market = True
@ -31,7 +31,10 @@ class PrunerTest(TestFramework):
chunk_data = b"\x02" * 8 * 256 * 1024
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions, tx_prarams = {"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)})
self.contract.submit(
submissions,
tx_prarams={"value": int(len(chunk_data) / 256 * PRICE_PER_SECTOR * 1.1)},
)
wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
@ -57,10 +60,18 @@ class PrunerTest(TestFramework):
for i in range(len(segments)):
index_store = i % 2
index_empty = 1 - i % 2
seg0 = self.nodes[index_store].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg1 = self.nodes[index_empty].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg2 = self.nodes[2].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg3 = self.nodes[3].zgs_download_segment(data_root, i * 1024, (i + 1) * 1024)
seg0 = self.nodes[index_store].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
seg1 = self.nodes[index_empty].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
seg2 = self.nodes[2].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
seg3 = self.nodes[3].zgs_download_segment(
data_root, i * 1024, (i + 1) * 1024
)
# base64 encoding size
assert_equal(len(seg0), 349528)
assert_equal(seg1, None)

View File

@ -5,6 +5,7 @@ import shutil
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class SnapshotTask(TestFramework):
def setup_params(self):
self.num_nodes = 2
@ -28,11 +29,11 @@ class SnapshotTask(TestFramework):
# Start the last node to verify historical file sync
self.nodes[1].shutdown()
shutil.rmtree(os.path.join(self.nodes[1].data_dir, 'db/data_db'))
shutil.rmtree(os.path.join(self.nodes[1].data_dir, "db/data_db"))
self.start_storage_node(1)
self.nodes[1].wait_for_rpc_connection()
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])

View File

@ -77,9 +77,7 @@ class SubmissionTest(TestFramework):
continue
# Wait for log entry before file sync, otherwise, admin_startSyncFile will be failed.
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
self.nodes[i].admin_start_sync_file(submission_index - 1)
@ -89,15 +87,11 @@ class SubmissionTest(TestFramework):
)
)
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
assert_equal(
base64.b64decode(
self.nodes[i]
.zgs_download_segment(data_root, 0, 1)
.encode("utf-8")
self.nodes[i].zgs_download_segment(data_root, 0, 1).encode("utf-8")
),
first_entry,
)

View File

@ -3,17 +3,14 @@
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoSyncHistoricalTest(TestFramework):
def setup_params(self):
self.num_nodes = 4
# Enable auto sync
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True
}
}
self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}}
def run_test(self):
# Stop the last node to verify historical file sync
@ -26,17 +23,36 @@ class AutoSyncHistoricalTest(TestFramework):
# Files should be available on other nodes via auto sync
for i in range(1, self.num_nodes - 1):
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"]
)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"]
)
# Start the last node to verify historical file sync
self.start_storage_node(self.num_nodes - 1)
self.nodes[self.num_nodes - 1].wait_for_rpc_connection()
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)["finalized"])
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)
is not None
)
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_1)[
"finalized"
]
)
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)
is not None
)
wait_until(
lambda: self.nodes[self.num_nodes - 1].zgs_get_file_info(data_root_2)[
"finalized"
]
)
if __name__ == "__main__":
AutoSyncHistoricalTest().main()

View File

@ -3,17 +3,14 @@
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoSyncTest(TestFramework):
def setup_params(self):
self.num_nodes = 2
# Enable auto sync
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True
}
}
self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}}
def run_test(self):
# Submit and upload files on node 0
@ -26,5 +23,6 @@ class AutoSyncTest(TestFramework):
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoSyncTest().main()

View File

@ -6,6 +6,7 @@ from test_framework.test_framework import TestFramework
from utility.submission import data_to_segments
from utility.utils import assert_equal, wait_until
class SyncChunksTest(TestFramework):
"""
By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
@ -17,9 +18,7 @@ class SyncChunksTest(TestFramework):
# enable find chunks topic
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"network_find_chunks_enabled": True
}
self.zgs_node_configs[i] = {"network_find_chunks_enabled": True}
def run_test(self):
client1 = self.nodes[0]
@ -35,20 +34,25 @@ class SyncChunksTest(TestFramework):
# Upload only 2nd segment to storage node
segments = data_to_segments(chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
assert(client1.zgs_upload_segment(segments[1]) is None)
self.log.info(
"segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]
)
assert client1.zgs_upload_segment(segments[1]) is None
# segment 0 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 0, 1024) is None)
assert client1.zgs_download_segment_decoded(data_root, 0, 1024) is None
# segment 1 is available to download
assert_equal(client1.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
assert_equal(
client1.zgs_download_segment_decoded(data_root, 1024, 2048),
chunk_data[1024 * 256 : 2048 * 256],
)
# segment 2 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None)
assert client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None
# Segment 1 should not be able to download on node 2
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None)
assert client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None
# Restart node 1 to check if the proof nodes are persisted.
self.stop_storage_node(0)
@ -56,12 +60,19 @@ class SyncChunksTest(TestFramework):
self.nodes[0].wait_for_rpc_connection()
# Trigger chunks sync by rpc
assert(client2.admin_start_sync_chunks(0, 1024, 2048) is None)
assert client2.admin_start_sync_chunks(0, 1024, 2048) is None
wait_until(lambda: client2.sync_status_is_completed_or_unknown(0))
wait_until(lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) is not None)
wait_until(
lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048)
is not None
)
# Validate data
assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
assert_equal(
client2.zgs_download_segment_decoded(data_root, 1024, 2048),
chunk_data[1024 * 256 : 2048 * 256],
)
if __name__ == "__main__":
SyncChunksTest().main()

View File

@ -5,6 +5,7 @@ import time
from test_framework.test_framework import TestFramework
from utility.utils import assert_equal, wait_until
class SyncFileTest(TestFramework):
"""
By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
@ -26,7 +27,7 @@ class SyncFileTest(TestFramework):
# restart client2
client2.start()
client2.wait_for_rpc_connection()
# File should not be auto sync on node 2 and there is no cached file locations
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
time.sleep(3)
@ -35,7 +36,7 @@ class SyncFileTest(TestFramework):
# assert(client2.admin_get_file_location(0) is None)
# Trigger file sync by rpc
assert(client2.admin_start_sync_file(0) is None)
assert client2.admin_start_sync_file(0) is None
wait_until(lambda: client2.sync_status_is_completed_or_unknown(0))
wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])
# file sync use ASK_FILE & ANSWER FILE protocol, and do not cache file announcement anymore.
@ -47,5 +48,6 @@ class SyncFileTest(TestFramework):
client1.zgs_download_segment(data_root, 0, 1024),
)
if __name__ == "__main__":
SyncFileTest().main()

View File

@ -6,8 +6,8 @@ from utility.run_all import run_all
if __name__ == "__main__":
run_all(
test_dir = os.path.dirname(__file__),
test_dir=os.path.dirname(__file__),
slow_tests={"mine_test.py", "random_test.py", "same_root_test.py"},
long_manual_tests={"fuzz_test.py"},
single_run_tests={"mine_with_market_test.py"},
)
)

View File

@ -12,11 +12,7 @@ from config.node_config import (
TX_PARAMS,
)
from utility.simple_rpc_proxy import SimpleRpcProxy
from utility.utils import (
initialize_config,
wait_until,
estimate_st_performance
)
from utility.utils import initialize_config, wait_until, estimate_st_performance
from test_framework.contracts import load_contract_metadata
@ -36,6 +32,7 @@ class BlockChainNodeType(Enum):
else:
raise AssertionError("Unsupported blockchain type")
@unique
class NodeType(Enum):
BlockChain = 0
@ -50,7 +47,7 @@ class TestNode:
def __init__(
self, node_type, index, data_dir, rpc_url, binary, config, log, rpc_timeout=10
):
assert os.path.exists(binary), ("Binary not found: %s" % binary)
assert os.path.exists(binary), "Binary not found: %s" % binary
self.node_type = node_type
self.index = index
self.data_dir = data_dir
@ -270,12 +267,14 @@ class BlockchainNode(TestNode):
def deploy_contract(name, args=None):
if args is None:
args = []
contract_interface = load_contract_metadata(path=self.contract_path, name=name)
contract_interface = load_contract_metadata(
path=self.contract_path, name=name
)
contract = w3.eth.contract(
abi=contract_interface["abi"],
bytecode=contract_interface["bytecode"],
)
tx_params = TX_PARAMS.copy()
del tx_params["gas"]
tx_hash = contract.constructor(*args).transact(tx_params)
@ -285,7 +284,7 @@ class BlockchainNode(TestNode):
abi=contract_interface["abi"],
)
return contract, tx_hash
def deploy_no_market():
self.log.debug("Start deploy contracts")
@ -301,12 +300,16 @@ class BlockchainNode(TestNode):
mine_contract, _ = deploy_contract("PoraMineTest", [0])
self.log.debug("Mine deployed")
mine_contract.functions.initialize(1, flow_contract.address, dummy_reward_contract.address).transact(TX_PARAMS)
mine_contract.functions.initialize(
1, flow_contract.address, dummy_reward_contract.address
).transact(TX_PARAMS)
mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS)
mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS)
self.log.debug("Mine Initialized")
flow_initialize_hash = flow_contract.functions.initialize(dummy_market_contract.address, mine_period).transact(TX_PARAMS)
flow_initialize_hash = flow_contract.functions.initialize(
dummy_market_contract.address, mine_period
).transact(TX_PARAMS)
self.log.debug("Flow Initialized")
self.wait_for_transaction_receipt(w3, flow_initialize_hash)
@ -315,45 +318,60 @@ class BlockchainNode(TestNode):
# tx_hash = mine_contract.functions.setMiner(decode_hex(MINER_ID)).transact(TX_PARAMS)
# self.wait_for_transaction_receipt(w3, tx_hash)
return flow_contract, flow_initialize_hash, mine_contract, dummy_reward_contract
return (
flow_contract,
flow_initialize_hash,
mine_contract,
dummy_reward_contract,
)
def deploy_with_market(lifetime_seconds):
self.log.debug("Start deploy contracts")
mine_contract, _ = deploy_contract("PoraMineTest", [0])
self.log.debug("Mine deployed")
market_contract, _ = deploy_contract("FixedPrice", [])
self.log.debug("Market deployed")
reward_contract, _ = deploy_contract("ChunkLinearReward", [lifetime_seconds])
reward_contract, _ = deploy_contract(
"ChunkLinearReward", [lifetime_seconds]
)
self.log.debug("Reward deployed")
flow_contract, _ = deploy_contract("FixedPriceFlow", [0])
self.log.debug("Flow deployed")
mine_contract.functions.initialize(1, flow_contract.address, reward_contract.address).transact(TX_PARAMS)
mine_contract.functions.initialize(
1, flow_contract.address, reward_contract.address
).transact(TX_PARAMS)
mine_contract.functions.setDifficultyAdjustRatio(1).transact(TX_PARAMS)
mine_contract.functions.setTargetSubmissions(2).transact(TX_PARAMS)
self.log.debug("Mine Initialized")
market_contract.functions.initialize(int(lifetime_seconds * 256 * 10 * 10 ** 18 /
2 ** 30 / 12 / 31 / 86400),
flow_contract.address, reward_contract.address).transact(TX_PARAMS)
market_contract.functions.initialize(
int(lifetime_seconds * 256 * 10 * 10**18 / 2**30 / 12 / 31 / 86400),
flow_contract.address,
reward_contract.address,
).transact(TX_PARAMS)
self.log.debug("Market Initialized")
reward_contract.functions.initialize(market_contract.address, mine_contract.address).transact(TX_PARAMS)
reward_contract.functions.setBaseReward(10 ** 18).transact(TX_PARAMS)
reward_contract.functions.initialize(
market_contract.address, mine_contract.address
).transact(TX_PARAMS)
reward_contract.functions.setBaseReward(10**18).transact(TX_PARAMS)
self.log.debug("Reward Initialized")
flow_initialize_hash = flow_contract.functions.initialize(market_contract.address, mine_period).transact(TX_PARAMS)
flow_initialize_hash = flow_contract.functions.initialize(
market_contract.address, mine_period
).transact(TX_PARAMS)
self.log.debug("Flow Initialized")
self.wait_for_transaction_receipt(w3, flow_initialize_hash)
self.log.info("All contracts deployed")
return flow_contract, flow_initialize_hash, mine_contract, reward_contract
if enable_market:
return deploy_with_market(lifetime_seconds)
else:
@ -376,4 +394,7 @@ class BlockchainNode(TestNode):
w3.eth.wait_for_transaction_receipt(tx_hash)
def start(self):
super().start(self.blockchain_node_type == BlockChainNodeType.BSC or self.blockchain_node_type == BlockChainNodeType.ZG)
super().start(
self.blockchain_node_type == BlockChainNodeType.BSC
or self.blockchain_node_type == BlockChainNodeType.ZG
)

View File

@ -36,28 +36,36 @@ class ContractProxy:
tx_params = copy(TX_PARAMS)
tx_params["value"] = value
return getattr(contract.functions, fn_name)(**args).transact(tx_params)
def _logs(self, event_name, node_idx, **args):
assert node_idx < len(self.blockchain_nodes)
contract = self._get_contract(node_idx)
return getattr(contract.events, event_name).create_filter(from_block=0, to_block="latest").get_all_entries()
def transfer(self, value, node_idx = 0):
return (
getattr(contract.events, event_name)
.create_filter(from_block=0, to_block="latest")
.get_all_entries()
)
def transfer(self, value, node_idx=0):
tx_params = copy(TX_PARAMS)
tx_params["value"] = value
contract = self._get_contract(node_idx)
contract.receive.transact(tx_params)
def address(self):
return self.contract_address
class FlowContractProxy(ContractProxy):
def submit(
self, submission_nodes, node_idx=0, tx_prarams=None, parent_hash=None,
self,
submission_nodes,
node_idx=0,
tx_prarams=None,
parent_hash=None,
):
assert node_idx < len(self.blockchain_nodes)
@ -65,11 +73,12 @@ class FlowContractProxy(ContractProxy):
if tx_prarams is not None:
combined_tx_prarams.update(tx_prarams)
contract = self._get_contract(node_idx)
# print(contract.functions.submit(submission_nodes).estimate_gas(combined_tx_prarams))
tx_hash = contract.functions.submit(submission_nodes).transact(combined_tx_prarams)
tx_hash = contract.functions.submit(submission_nodes).transact(
combined_tx_prarams
)
receipt = self.blockchain_nodes[node_idx].wait_for_transaction_receipt(
contract.w3, tx_hash, parent_hash=parent_hash
)
@ -85,46 +94,46 @@ class FlowContractProxy(ContractProxy):
def epoch(self, node_idx=0):
return self.get_mine_context(node_idx)[0]
def update_context(self, node_idx=0):
return self._send("makeContext", node_idx)
def get_mine_context(self, node_idx=0):
return self._call("makeContextWithResult", node_idx)
def get_flow_root(self, node_idx=0):
return self._call("computeFlowRoot", node_idx)
def get_flow_length(self, node_idx=0):
return self._call("tree", node_idx)[0]
class MineContractProxy(ContractProxy):
def last_mined_epoch(self, node_idx=0):
return self._call("lastMinedEpoch", node_idx)
def can_submit(self, node_idx=0):
return self._call("canSubmit", node_idx)
def current_submissions(self, node_idx=0):
return self._call("currentSubmissions", node_idx)
def set_quality(self, quality, node_idx=0):
return self._send("setQuality", node_idx, _targetQuality=quality)
class RewardContractProxy(ContractProxy):
def reward_distributes(self, node_idx=0):
return self._logs("DistributeReward", node_idx)
def donate(self, value, node_idx = 0):
def donate(self, value, node_idx=0):
return self._send_payable("donate", node_idx, value)
def base_reward(self, node_idx = 0):
def base_reward(self, node_idx=0):
return self._call("baseReward", node_idx)
def first_rewardable_chunk(self, node_idx = 0):
def first_rewardable_chunk(self, node_idx=0):
return self._call("firstRewardableChunk", node_idx)
def reward_deadline(self, node_idx = 0):
def reward_deadline(self, node_idx=0):
return self._call("rewardDeadline", node_idx, 0)

View File

@ -1,6 +1,7 @@
from pathlib import Path
import json
def load_contract_metadata(path: str, name: str):
path = Path(path)
try:
@ -8,4 +9,3 @@ def load_contract_metadata(path: str, name: str):
return json.loads(open(found_file, "r").read())
except StopIteration:
raise Exception(f"Cannot found contract {name}'s metadata")

View File

@ -15,7 +15,11 @@ from pathlib import Path
from eth_utils import encode_hex
from test_framework.bsc_node import BSCNode
from test_framework.contract_proxy import FlowContractProxy, MineContractProxy, RewardContractProxy
from test_framework.contract_proxy import (
FlowContractProxy,
MineContractProxy,
RewardContractProxy,
)
from test_framework.zgs_node import ZgsNode
from test_framework.blockchain_node import BlockChainNodeType
from test_framework.conflux_node import ConfluxNode, connect_sample_nodes
@ -74,13 +78,17 @@ class TestFramework:
root_dir, "target", "release", "zgs_node" + binary_ext
)
self.__default_zgs_cli_binary__ = os.path.join(
tests_dir, "tmp", "0g-storage-client" + binary_ext
tests_dir, "tmp", "0g-storage-client" + binary_ext
)
def __setup_blockchain_node(self):
if self.blockchain_node_type == BlockChainNodeType.ZG:
zg_node_init_genesis(self.blockchain_binary, self.root_dir, self.num_blockchain_nodes)
self.log.info("0gchain genesis initialized for %s nodes" % self.num_blockchain_nodes)
zg_node_init_genesis(
self.blockchain_binary, self.root_dir, self.num_blockchain_nodes
)
self.log.info(
"0gchain genesis initialized for %s nodes" % self.num_blockchain_nodes
)
for i in range(self.num_blockchain_nodes):
if i in self.blockchain_node_configs:
@ -171,15 +179,20 @@ class TestFramework:
self.log.debug("Wait for 0gchain node to generate first block")
time.sleep(0.5)
for node in self.blockchain_nodes:
wait_until(lambda: node.net_peerCount() == self.num_blockchain_nodes - 1)
wait_until(
lambda: node.net_peerCount() == self.num_blockchain_nodes - 1
)
wait_until(lambda: node.eth_blockNumber() is not None)
wait_until(lambda: int(node.eth_blockNumber(), 16) > 0)
contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[0].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds)
contract, tx_hash, mine_contract, reward_contract = self.blockchain_nodes[
0
].setup_contract(self.enable_market, self.mine_period, self.lifetime_seconds)
self.contract = FlowContractProxy(contract, self.blockchain_nodes)
self.mine_contract = MineContractProxy(mine_contract, self.blockchain_nodes)
self.reward_contract = RewardContractProxy(reward_contract, self.blockchain_nodes)
self.reward_contract = RewardContractProxy(
reward_contract, self.blockchain_nodes
)
for node in self.blockchain_nodes[1:]:
node.wait_for_transaction(tx_hash)
@ -216,12 +229,14 @@ class TestFramework:
time.sleep(1)
node.start()
self.log.info("Wait the zgs_node launch for %d seconds", self.launch_wait_seconds)
self.log.info(
"Wait the zgs_node launch for %d seconds", self.launch_wait_seconds
)
time.sleep(self.launch_wait_seconds)
for node in self.nodes:
node.wait_for_rpc_connection()
def add_arguments(self, parser: argparse.ArgumentParser):
parser.add_argument(
"--conflux-binary",
@ -336,10 +351,12 @@ class TestFramework:
self.log.addHandler(ch)
def _check_cli_binary(self):
if Path(self.cli_binary).absolute() == Path(self.__default_zgs_cli_binary__).absolute() and not os.path.exists(self.cli_binary):
if Path(self.cli_binary).absolute() == Path(
self.__default_zgs_cli_binary__
).absolute() and not os.path.exists(self.cli_binary):
dir = Path(self.cli_binary).parent.absolute()
build_cli(dir)
assert os.path.exists(self.cli_binary), (
"zgs CLI binary not found: %s" % self.cli_binary
)
@ -353,14 +370,12 @@ class TestFramework:
file_to_upload,
):
self._check_cli_binary()
upload_args = [
self.cli_binary,
"upload",
"--url",
blockchain_node_rpc_url,
"--contract",
contract_address,
"--key",
encode_hex(key),
"--node",
@ -372,7 +387,9 @@ class TestFramework:
"--file",
]
output = tempfile.NamedTemporaryFile(dir=self.root_dir, delete=False, prefix="zgs_client_output_")
output = tempfile.NamedTemporaryFile(
dir=self.root_dir, delete=False, prefix="zgs_client_output_"
)
output_name = output.name
output_fileno = output.fileno()
@ -383,7 +400,7 @@ class TestFramework:
stdout=output_fileno,
stderr=output_fileno,
)
return_code = proc.wait(timeout=60)
output.seek(0)
@ -392,17 +409,27 @@ class TestFramework:
line = line.decode("utf-8")
self.log.debug("line: %s", line)
if "root" in line:
filtered_line = re.sub(r'\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?', '', line)
filtered_line = re.sub(
r"\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?",
"",
line,
)
index = filtered_line.find("root=")
if index > 0:
root = filtered_line[index + 5 : index + 5 + 66]
except Exception as ex:
self.log.error("Failed to upload file via CLI tool, output: %s", output_name)
self.log.error(
"Failed to upload file via CLI tool, output: %s", output_name
)
raise ex
finally:
output.close()
assert return_code == 0, "%s upload file failed, output: %s, log: %s" % (self.cli_binary, output_name, lines)
assert return_code == 0, "%s upload file failed, output: %s, log: %s" % (
self.cli_binary,
output_name,
lines,
)
return root
@ -410,8 +437,15 @@ class TestFramework:
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
self.num_deployed_contracts += 1
wait_until(lambda: self.contract.num_submissions() == self.num_deployed_contracts)
self.log.info("Submission completed, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
wait_until(
lambda: self.contract.num_submissions() == self.num_deployed_contracts
)
self.log.info(
"Submission completed, data root: %s, submissions(%s) = %s",
data_root,
len(submissions),
submissions,
)
return data_root
def __upload_file__(self, node_index: int, random_data_size: int) -> str:
@ -426,7 +460,9 @@ class TestFramework:
# Upload file to storage node
segments = submit_data(client, chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
self.log.info(
"segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]
)
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
return data_root
@ -452,7 +488,6 @@ class TestFramework:
if clean:
self.nodes[index].clean_data()
def start_storage_node(self, index):
self.nodes[index].start()
@ -484,7 +519,7 @@ class TestFramework:
if os.path.islink(dst):
os.remove(dst)
elif os.path.isdir(dst):
elif os.path.isdir(dst):
shutil.rmtree(dst)
elif os.path.exists(dst):
os.remove(dst)

View File

@ -3,24 +3,35 @@ import subprocess
import tempfile
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
from utility.utils import blockchain_p2p_port, blockchain_rpc_port, blockchain_ws_port, blockchain_rpc_port_tendermint, pprof_port
from utility.utils import (
blockchain_p2p_port,
blockchain_rpc_port,
blockchain_ws_port,
blockchain_rpc_port_tendermint,
pprof_port,
)
from utility.build_binary import build_zg
def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
assert num_nodes > 0, "Invalid number of blockchain nodes: %s" % num_nodes
if not os.path.exists(binary):
build_zg(os.path.dirname(binary))
build_zg(os.path.dirname(binary))
shell_script = os.path.join(
os.path.dirname(os.path.realpath(__file__)), # test_framework folder
"..", "config", "0gchain-init-genesis.sh"
os.path.dirname(os.path.realpath(__file__)), # test_framework folder
"..",
"config",
"0gchain-init-genesis.sh",
)
zgchaind_dir = os.path.join(root_dir, "0gchaind")
os.mkdir(zgchaind_dir)
log_file = tempfile.NamedTemporaryFile(dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log")
log_file = tempfile.NamedTemporaryFile(
dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log"
)
p2p_port_start = blockchain_p2p_port(0)
ret = subprocess.run(
@ -31,7 +42,11 @@ def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
log_file.close()
assert ret.returncode == 0, "Failed to init 0gchain genesis, see more details in log file: %s" % log_file.name
assert ret.returncode == 0, (
"Failed to init 0gchain genesis, see more details in log file: %s"
% log_file.name
)
class ZGNode(BlockchainNode):
def __init__(
@ -61,19 +76,27 @@ class ZGNode(BlockchainNode):
self.config_file = None
self.args = [
binary, "start",
"--home", data_dir,
binary,
"start",
"--home",
data_dir,
# overwrite json rpc http port: 8545
"--json-rpc.address", "127.0.0.1:%s" % blockchain_rpc_port(index),
"--json-rpc.address",
"127.0.0.1:%s" % blockchain_rpc_port(index),
# overwrite json rpc ws port: 8546
"--json-rpc.ws-address", "127.0.0.1:%s" % blockchain_ws_port(index),
"--json-rpc.ws-address",
"127.0.0.1:%s" % blockchain_ws_port(index),
# overwrite p2p port: 26656
"--p2p.laddr", "tcp://127.0.0.1:%s" % blockchain_p2p_port(index),
"--p2p.laddr",
"tcp://127.0.0.1:%s" % blockchain_p2p_port(index),
# overwrite rpc port: 26657
"--rpc.laddr", "tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index),
"--rpc.laddr",
"tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index),
# overwrite pprof port: 6060
"--rpc.pprof_laddr", "127.0.0.1:%s" % pprof_port(index),
"--log_level", "debug"
"--rpc.pprof_laddr",
"127.0.0.1:%s" % pprof_port(index),
"--log_level",
"debug",
]
for k, v in updated_config.items():
@ -82,4 +105,4 @@ class ZGNode(BlockchainNode):
self.args.append(str(v))
def setup_config(self):
""" Already batch initialized by shell script in framework """
"""Already batch initialized by shell script in framework"""

View File

@ -11,6 +11,7 @@ from utility.utils import (
blockchain_rpc_port,
)
class ZgsNode(TestNode):
def __init__(
self,
@ -98,17 +99,21 @@ class ZgsNode(TestNode):
def zgs_download_segment(self, data_root, start_index, end_index):
return self.rpc.zgs_downloadSegment([data_root, start_index, end_index])
def zgs_download_segment_decoded(self, data_root: str, start_chunk_index: int, end_chunk_index: int) -> bytes:
encodedSegment = self.rpc.zgs_downloadSegment([data_root, start_chunk_index, end_chunk_index])
def zgs_download_segment_decoded(
self, data_root: str, start_chunk_index: int, end_chunk_index: int
) -> bytes:
encodedSegment = self.rpc.zgs_downloadSegment(
[data_root, start_chunk_index, end_chunk_index]
)
return None if encodedSegment is None else base64.b64decode(encodedSegment)
def zgs_get_file_info(self, data_root):
return self.rpc.zgs_getFileInfo([data_root])
return self.rpc.zgs_getFileInfo([data_root, True])
def zgs_get_file_info_by_tx_seq(self, tx_seq):
return self.rpc.zgs_getFileInfoByTxSeq([tx_seq])
def zgs_get_flow_context(self, tx_seq):
return self.rpc.zgs_getFlowContext([tx_seq])
@ -118,9 +123,13 @@ class ZgsNode(TestNode):
def admin_start_sync_file(self, tx_seq):
return self.rpc.admin_startSyncFile([tx_seq])
def admin_start_sync_chunks(self, tx_seq: int, start_chunk_index: int, end_chunk_index: int):
return self.rpc.admin_startSyncChunks([tx_seq, start_chunk_index, end_chunk_index])
def admin_start_sync_chunks(
self, tx_seq: int, start_chunk_index: int, end_chunk_index: int
):
return self.rpc.admin_startSyncChunks(
[tx_seq, start_chunk_index, end_chunk_index]
)
def admin_get_sync_status(self, tx_seq):
return self.rpc.admin_getSyncStatus([tx_seq])
@ -128,8 +137,8 @@ class ZgsNode(TestNode):
def sync_status_is_completed_or_unknown(self, tx_seq):
status = self.rpc.admin_getSyncStatus([tx_seq])
return status == "Completed" or status == "unknown"
def admin_get_file_location(self, tx_seq, all_shards = True):
def admin_get_file_location(self, tx_seq, all_shards=True):
return self.rpc.admin_getFileLocation([tx_seq, all_shards])
def clean_data(self):

View File

@ -9,31 +9,40 @@ from enum import Enum, unique
from utility.utils import is_windows_platform, wait_until
# v1.0.0-ci release
GITHUB_DOWNLOAD_URL="https://api.github.com/repos/0glabs/0g-storage-node/releases/152560136"
GITHUB_DOWNLOAD_URL = (
"https://api.github.com/repos/0glabs/0g-storage-node/releases/152560136"
)
CONFLUX_BINARY = "conflux.exe" if is_windows_platform() else "conflux"
BSC_BINARY = "geth.exe" if is_windows_platform() else "geth"
ZG_BINARY = "0gchaind.exe" if is_windows_platform() else "0gchaind"
CLIENT_BINARY = "0g-storage-client.exe" if is_windows_platform() else "0g-storage-client"
CLIENT_BINARY = (
"0g-storage-client.exe" if is_windows_platform() else "0g-storage-client"
)
CLI_GIT_REV = "98d74b7e7e6084fc986cb43ce2c66692dac094a6"
@unique
class BuildBinaryResult(Enum):
AlreadyExists = 0
Installed = 1
NotInstalled = 2
def build_conflux(dir: str) -> BuildBinaryResult:
# Download or build conflux binary if absent
result = __download_from_github(
dir=dir,
binary_name=CONFLUX_BINARY,
github_url=GITHUB_DOWNLOAD_URL,
github_url=GITHUB_DOWNLOAD_URL,
asset_name=__asset_name(CONFLUX_BINARY, zip=True),
)
if result == BuildBinaryResult.AlreadyExists or result == BuildBinaryResult.Installed:
if (
result == BuildBinaryResult.AlreadyExists
or result == BuildBinaryResult.Installed
):
return result
return __build_from_github(
@ -44,6 +53,7 @@ def build_conflux(dir: str) -> BuildBinaryResult:
compiled_relative_path=["target", "release"],
)
def build_bsc(dir: str) -> BuildBinaryResult:
# Download bsc binary if absent
result = __download_from_github(
@ -55,10 +65,13 @@ def build_bsc(dir: str) -> BuildBinaryResult:
# Requires to download binary successfully, since it is not ready to build
# binary from source code.
assert result != BuildBinaryResult.NotInstalled, "Cannot download binary from github [%s]" % BSC_BINARY
assert result != BuildBinaryResult.NotInstalled, (
"Cannot download binary from github [%s]" % BSC_BINARY
)
return result
def build_zg(dir: str) -> BuildBinaryResult:
# Download or build 0gchain binary if absent
result = __download_from_github(
@ -68,7 +81,10 @@ def build_zg(dir: str) -> BuildBinaryResult:
asset_name=__asset_name(ZG_BINARY, zip=True),
)
if result == BuildBinaryResult.AlreadyExists or result == BuildBinaryResult.Installed:
if (
result == BuildBinaryResult.AlreadyExists
or result == BuildBinaryResult.Installed
):
return result
return __build_from_github(
@ -79,17 +95,18 @@ def build_zg(dir: str) -> BuildBinaryResult:
compiled_relative_path=[],
)
def build_cli(dir: str) -> BuildBinaryResult:
# Build 0g-storage-client binary if absent
return __build_from_github(
dir=dir,
binary_name=CLIENT_BINARY,
github_url="https://github.com/0glabs/0g-storage-client.git",
git_rev=CLI_GIT_REV,
build_cmd="go build",
compiled_relative_path=[],
)
def __asset_name(binary_name: str, zip: bool = False) -> str:
sys = platform.system().lower()
if sys == "linux":
@ -102,24 +119,34 @@ def __asset_name(binary_name: str, zip: bool = False) -> str:
else:
raise RuntimeError("Unable to recognize platform")
def __build_from_github(dir: str, binary_name: str, github_url: str, build_cmd: str, compiled_relative_path: list[str], git_rev = None) -> BuildBinaryResult:
def __build_from_github(
dir: str,
binary_name: str,
github_url: str,
build_cmd: str,
compiled_relative_path: list[str],
git_rev=None,
) -> BuildBinaryResult:
if git_rev is None:
versioned_binary_name = binary_name
elif binary_name.endswith(".exe"):
versioned_binary_name = binary_name.removesuffix(".exe") + f"_{git_rev}.exe"
else:
versioned_binary_name = f"{binary_name}_{git_rev}"
binary_path = os.path.join(dir, binary_name)
versioned_binary_path = os.path.join(dir, versioned_binary_name)
if os.path.exists(versioned_binary_path):
__create_sym_link(versioned_binary_name, binary_name, dir)
return BuildBinaryResult.AlreadyExists
start_time = time.time()
# clone code from github to a temp folder
code_tmp_dir_name = (binary_name[:-4] if is_windows_platform() else binary_name) + "_tmp"
code_tmp_dir_name = (
binary_name[:-4] if is_windows_platform() else binary_name
) + "_tmp"
code_tmp_dir = os.path.join(dir, code_tmp_dir_name)
if os.path.exists(code_tmp_dir):
shutil.rmtree(code_tmp_dir)
@ -145,14 +172,22 @@ def __build_from_github(dir: str, binary_name: str, github_url: str, build_cmd:
shutil.rmtree(code_tmp_dir, ignore_errors=True)
print("Completed to build binary " + binary_name + ", Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True)
print(
"Completed to build binary "
+ binary_name
+ ", Elapsed: "
+ str(int(time.time() - start_time))
+ " seconds",
flush=True,
)
return BuildBinaryResult.Installed
def __create_sym_link(src, dst, path = None):
def __create_sym_link(src, dst, path=None):
if src == dst:
return
origin_path = os.getcwd()
if path is not None:
os.chdir(path)
@ -171,16 +206,19 @@ def __create_sym_link(src, dst, path = None):
os.chdir(origin_path)
def __download_from_github(dir: str, binary_name: str, github_url: str, asset_name: str) -> BuildBinaryResult:
def __download_from_github(
dir: str, binary_name: str, github_url: str, asset_name: str
) -> BuildBinaryResult:
if not os.path.exists(dir):
os.makedirs(dir, exist_ok=True)
binary_path = os.path.join(dir, binary_name)
if os.path.exists(binary_path):
return BuildBinaryResult.AlreadyExists
print("Begin to download binary from github: %s" % binary_name, flush=True)
start_time = time.time()
req = requests.get(github_url)
@ -194,7 +232,7 @@ def __download_from_github(dir: str, binary_name: str, github_url: str, asset_na
if download_url is None:
print(f"Cannot find asset by name {asset_name}", flush=True)
return BuildBinaryResult.NotInstalled
content = requests.get(download_url).content
# Supports to read from zipped binary
@ -203,17 +241,24 @@ def __download_from_github(dir: str, binary_name: str, github_url: str, asset_na
with open(asset_path, "xb") as f:
f.write(content)
shutil.unpack_archive(asset_path, dir)
assert os.path.exists(binary_path), f"Cannot find binary after unzip, binary = {binary_name}, asset = {asset_name}"
assert os.path.exists(
binary_path
), f"Cannot find binary after unzip, binary = {binary_name}, asset = {asset_name}"
else:
with open(binary_path, "xb") as f:
f.write(content)
f.write(content)
if not is_windows_platform():
st = os.stat(binary_path)
os.chmod(binary_path, st.st_mode | stat.S_IEXEC)
wait_until(lambda: os.access(binary_path, os.X_OK), timeout=120)
print("Completed to download binary, Elapsed: " + str(int(time.time() - start_time)) + " seconds", flush=True)
print(
"Completed to download binary, Elapsed: "
+ str(int(time.time() - start_time))
+ " seconds",
flush=True,
)
return BuildBinaryResult.Installed

View File

@ -162,15 +162,20 @@ class MerkleTree:
n = len(data)
if n < ENTRY_SIZE or (n & (n - 1)) != 0:
raise Exception("Input length is not power of 2")
leaves = [Leaf.from_data(data[i:i + ENTRY_SIZE], tree.hasher) for i in range(0, n, ENTRY_SIZE)]
leaves = [
Leaf.from_data(data[i : i + ENTRY_SIZE], tree.hasher)
for i in range(0, n, ENTRY_SIZE)
]
tree.__leaves = leaves
nodes = leaves
while len(nodes) > 1:
next_nodes = []
for i in range(0, len(nodes), 2):
next_nodes.append(Node.from_children(nodes[i], nodes[i+1], tree.hasher))
next_nodes.append(
Node.from_children(nodes[i], nodes[i + 1], tree.hasher)
)
nodes = next_nodes

View File

@ -12,8 +12,20 @@ DEFAULT_PORT_MIN = 11000
DEFAULT_PORT_MAX = 65535
DEFAULT_PORT_RANGE = 500
def print_testcase_result(color, glyph, script, start_time):
print(color[1] + glyph + " Testcase " + script + "\telapsed: " + str(int(time.time() - start_time)) + " seconds" + color[0], flush=True)
print(
color[1]
+ glyph
+ " Testcase "
+ script
+ "\telapsed: "
+ str(int(time.time() - start_time))
+ " seconds"
+ color[0],
flush=True,
)
def run_single_test(py, script, test_dir, index, port_min, port_max):
try:
@ -58,7 +70,14 @@ def run_single_test(py, script, test_dir, index, port_min, port_max):
raise err
print_testcase_result(BLUE, TICK, script, start_time)
def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={}, long_manual_tests: set[str]={}, single_run_tests: set[str]={}):
def run_all(
test_dir: str,
test_subdirs: list[str] = [],
slow_tests: set[str] = {},
long_manual_tests: set[str] = {},
single_run_tests: set[str] = {},
):
tmp_dir = os.path.join(test_dir, "tmp")
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir, exist_ok=True)
@ -102,7 +121,11 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
for file in os.listdir(subdir_path):
if file.endswith("_test.py"):
rel_path = os.path.join(subdir, file)
if rel_path not in slow_tests and rel_path not in long_manual_tests and rel_path not in single_run_tests:
if (
rel_path not in slow_tests
and rel_path not in long_manual_tests
and rel_path not in single_run_tests
):
TEST_SCRIPTS.append(rel_path)
executor = ProcessPoolExecutor(max_workers=options.max_workers)
@ -154,4 +177,3 @@ def run_all(test_dir: str, test_subdirs: list[str]=[], slow_tests: set[str]={},
for c in failed:
print(c)
sys.exit(1)

View File

@ -25,7 +25,13 @@ class RpcCaller:
if isinstance(parsed, Ok):
return parsed.result
else:
print("Failed to call RPC, method = %s(%s), error = %s" % (self.method, str(*args)[-1500:], parsed))
print(
"Failed to call RPC, method = %s(%s), error = %s"
% (self.method, str(*args)[-1500:], parsed)
)
except Exception as ex:
print("Failed to call RPC, method = %s(%s), exception = %s" % (self.method, str(*args)[-1500:], ex))
print(
"Failed to call RPC, method = %s(%s), exception = %s"
% (self.method, str(*args)[-1500:], ex)
)
return None

View File

@ -1,2 +1,2 @@
ENTRY_SIZE = 256
PORA_CHUNK_SIZE = 1024
PORA_CHUNK_SIZE = 1024

View File

@ -5,6 +5,7 @@ from math import log2
from utility.merkle_tree import add_0x_prefix, Leaf, MerkleTree
from utility.spec import ENTRY_SIZE, PORA_CHUNK_SIZE
def log2_pow2(n):
return int(log2(((n ^ (n - 1)) >> 1) + 1))
@ -106,29 +107,27 @@ def create_segment_node(data, offset, batch, size):
else:
tree.add_leaf(Leaf(segment_root(data[start:end])))
return tree.get_root_hash()
segment_root_cached_chunks = None
segment_root_cached_output = None
def segment_root(chunks):
global segment_root_cached_chunks, segment_root_cached_output
if segment_root_cached_chunks == chunks:
return segment_root_cached_output
data_len = len(chunks)
if data_len == 0:
return b"\x00" * 32
tree = MerkleTree()
for i in range(0, data_len, ENTRY_SIZE):
tree.encrypt(chunks[i : i + ENTRY_SIZE])
digest = tree.get_root_hash()
segment_root_cached_chunks = chunks
@ -241,4 +240,4 @@ def data_to_segments(data):
segments.append(segment)
idx += 1
return segments
return segments

View File

@ -5,6 +5,7 @@ import rtoml
import time
import sha3
class PortMin:
# Must be initialized with a unique integer for each process
n = 11000
@ -35,15 +36,19 @@ def blockchain_rpc_port(n):
def blockchain_rpc_port_core(n):
return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
def blockchain_ws_port(n):
return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
def blockchain_rpc_port_tendermint(n):
return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
def pprof_port(n):
return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):
if attempts == float("inf") and timeout == float("inf"):
timeout = 60
@ -135,11 +140,12 @@ def assert_greater_than_or_equal(thing1, thing2):
if thing1 < thing2:
raise AssertionError("%s < %s" % (str(thing1), str(thing2)))
# 14900K has the performance point 100
# 14900K has the performance point 100
def estimate_st_performance():
hasher = sha3.keccak_256()
input = b"\xcc" * (1<<26)
input = b"\xcc" * (1 << 26)
start_time = time.perf_counter()
hasher.update(input)
digest = hasher.hexdigest()
return 10 / (time.perf_counter() - start_time)
return 10 / (time.perf_counter() - start_time)