Compare commits

...

27 Commits
v1.0.0 ... main

Author SHA1 Message Date
molla202
b857728660
grpc add update (#398)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Update config-mainnet-turbo.toml

* Update grpc config-mainnet-turbo.toml
2025-10-24 10:05:10 +08:00
0g-peterzhb
cf11e1b68a
add mainnet turbo config (#397) 2025-10-11 16:16:21 +08:00
0g-peterzhb
46de15a345
fix context error (#396)
* fix context error

* fix context error
2025-10-02 10:39:32 +08:00
0g-peterzhb
88287333b5
update test turbo config (#394) 2025-09-27 22:06:26 +08:00
0g-peterzhb
df570e34d2
a complete fix on null hash (#391)
* a complete fix on null hash

* simplify

* simplify

* fix tests

* fix tests

* fix tests

* no zero

* fix error

* remove unnecessary code

* len is fixed

* remove unnecessary code

* fix cicd
2025-09-18 14:24:50 +08:00
Jennifer Zelo
a3717d6bc1
docs: fix typos (#386) 2025-09-04 20:51:34 +08:00
CrazyFrog
55087eac7f
Update GitHub Actions in CI Workflows (#384)
* Update abi.yml

* Update cc.yml

* Update rust.yml

* Update tests.yml
2025-09-04 20:48:48 +08:00
0g-peterzhb
9a1edae9a2
fix lint (#383) 2025-08-11 09:34:03 +08:00
Ragnar
e41726de78
Update proof.rs (#371) 2025-07-09 20:13:46 +08:00
0g-peterzhb
ac6c2a4c10
fix the grcov version (#380) 2025-07-09 20:12:39 +08:00
Helen Grachtz
d581ad7ba5
fix: incorrect functions and code comments (#328)
* hashset_delay.rs

* Update lib.rs
2025-07-09 17:35:55 +08:00
emmmm
b156519e40
chore(ci): upgrade setup-node to v4 (#372) 2025-07-09 16:38:25 +08:00
CrazyFrog
1f8b822839
Update README.md (#376) 2025-07-09 16:37:05 +08:00
0g-peterzhb
1e18b454de
copy data (#379)
* copy from middle to previous and afterward tx seqs instead of only from the first seq

* fix action

* fix issue

* unnecessary loop

* add commont

* prevent attack on uploading multiple files to increase db read load

* fill all middle seqs
2025-07-09 13:18:10 +08:00
0g-peterzhb
3ba369e9e5
@peter/add grpc (#377)
* add grpc for uploading segments

* add reflection

* add upload segments grpc function

* format code

* fix lint

* fix test
2025-07-07 22:14:06 +08:00
0g-peterzhb
28654efde1
@peter/fix upload (#378)
* record error message when uploading segments
2025-07-03 19:42:06 +08:00
Himess
3de0dc5ba3
Update init.rs (#350) 2025-04-30 21:26:07 +08:00
0g-peterzhb
7972bf81d9
update condition to align with source change (#368) 2025-04-30 20:26:04 +08:00
James Niken
17cb7657cd
ci: bump actions/checkout to v4 (#365) 2025-04-30 15:27:56 +08:00
NeoByteX
d640eab3ad
Fixing Typos and Improving Code Consistency (#344)
* Update file_location_cache.rs

* Update pora.rs
2025-04-30 15:01:25 +08:00
lora
46bc97978e
docs: fix grammatical error in "Mining Reward" section (#327) 2025-04-30 14:59:51 +08:00
ricecodekhmer
fec6aa51c6
improve error message in Proof::new to include actual lengths (#362) 2025-04-30 14:58:50 +08:00
Anna Sholz
27bc9d1426
chore: Fixed error in variable name (entris to entries) (#345) 2025-04-30 14:57:10 +08:00
emmmm
dfc1fc40cf
fix issues (#358)
* Update rate_limiter.rs

* Update libp2p_event_handler.rs
2025-04-30 14:56:34 +08:00
sashaphmn
eb112c1899
docs: added a link to social network X (#338) 2025-04-30 14:55:30 +08:00
Donny
15f4f1eb64
docs: fix grammar in transaction docs (#359) 2025-04-30 14:50:55 +08:00
0g-peterzhb
229523ceec
update the dockerfile to build the node (#366) 2025-04-30 14:50:18 +08:00
50 changed files with 1692 additions and 358 deletions

View File

@ -2,6 +2,12 @@ name: Setup Rust (cache & toolchain)
runs:
using: composite
steps:
- name: Install protoc compiler
shell: bash
run: |
sudo apt-get update
sudo apt-get install -y protobuf-compiler
- name: Install toolchain 1.78.0
uses: actions-rs/toolchain@v1
with:

View File

@ -12,14 +12,14 @@ jobs:
steps:
- name: Clone current repository
uses: actions/checkout@v3
uses: actions/checkout@v5
- name: Get the Git revision from the current repository
id: get-rev
run: echo "rev=$(cat ./storage-contracts-abis/0g-storage-contracts-rev)" >> $GITHUB_OUTPUT
- name: Clone another repository
uses: actions/checkout@v3
uses: actions/checkout@v5
with:
repository: '0glabs/0g-storage-contracts'
path: '0g-storage-contracts'
@ -31,7 +31,7 @@ jobs:
git checkout ${{ steps.get-rev.outputs.rev }}
- name: Set up Node.js
uses: actions/setup-node@v3
uses: actions/setup-node@v4
with:
node-version: '18.17'
cache: 'yarn'
@ -45,4 +45,4 @@ jobs:
- name: Compare files
run: |
./scripts/check_abis.sh ./0g-storage-contracts/artifacts/
./scripts/check_abis.sh ./0g-storage-contracts/artifacts/

View File

@ -29,7 +29,7 @@ jobs:
swap-storage: true
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v5
with:
submodules: recursive
@ -45,13 +45,16 @@ jobs:
RUSTDOCFLAGS: '-Zprofile -Ccodegen-units=1 -Copt-level=0 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Zpanic_abort_tests'
- id: coverage
uses: actions-rs/grcov@v0.1
uses: SierraSoftworks/setup-grcov@v1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
version: 0.9.1
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
file: ${{ steps.coverage.outputs.report }}
# Disable to avoid CI failure as following:
# ['error'] There was an error running the uploader: Error uploading to https://codecov.io: Error: There was an error fetching the storage
# URL during POST: 404 - {'detail': ErrorDetail(string='Could not find a repository, try using repo upload token', code='not_found')}
# fail_ci_if_error: true
# fail_ci_if_error: true

View File

@ -21,7 +21,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v5
with:
submodules: recursive
- name: Setup Rust (cache & toolchain)
@ -37,7 +37,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v5
with:
submodules: recursive
- name: Setup Rust (cache & toolchain)
@ -53,7 +53,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v5
with:
submodules: recursive
- name: Setup Rust (cache & toolchain)
@ -69,4 +69,4 @@ jobs:
command: clippy
# blocks_in_conditions is triggered for tracing::instrument.
# This can be removed after the fix is released.
args: -- -D warnings
args: -- -D warnings

View File

@ -29,8 +29,9 @@ jobs:
swap-storage: true
- name: Checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v5
with:
persist-credentials: false # prevents writing the extraheader
submodules: recursive
- name: Setup Rust (cache & toolchain)
@ -66,4 +67,4 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: test_logs
path: /tmp/zgs_test_*
path: /tmp/zgs_test_*

2
.gitignore vendored
View File

@ -7,3 +7,5 @@ tests/tmp/**
.vscode/*.json
/0g-storage-contracts-dev
/run/.env
**.bin

202
Cargo.lock generated
View File

@ -551,6 +551,34 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "axum"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core 0.3.4",
"bitflags 1.3.2",
"bytes",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.29",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite 0.2.14",
"rustversion",
"serde",
"sync_wrapper 0.1.2",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.7.5"
@ -558,7 +586,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
dependencies = [
"async-trait",
"axum-core",
"axum-core 0.4.5",
"bytes",
"futures-util",
"http 1.2.0",
@ -578,6 +606,23 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.5"
@ -682,7 +727,7 @@ dependencies = [
"lazy_static",
"lazycell",
"peeking_take_while",
"prettyplease",
"prettyplease 0.2.20",
"proc-macro2",
"quote",
"regex",
@ -1189,7 +1234,7 @@ dependencies = [
"futures-core",
"prost 0.13.4",
"prost-types 0.13.4",
"tonic",
"tonic 0.12.3",
"tracing-core",
]
@ -1213,7 +1258,7 @@ dependencies = [
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.12.3",
"tracing",
"tracing-core",
"tracing-subscriber",
@ -2293,7 +2338,7 @@ dependencies = [
"ethers-core",
"ethers-etherscan",
"eyre",
"prettyplease",
"prettyplease 0.2.20",
"proc-macro2",
"quote",
"regex",
@ -3435,6 +3480,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper 0.14.29",
"pin-project-lite 0.2.14",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
@ -6104,6 +6161,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
[[package]]
name = "prettyplease"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86"
dependencies = [
"proc-macro2",
"syn 1.0.109",
]
[[package]]
name = "prettyplease"
version = "0.2.20"
@ -6277,6 +6344,16 @@ dependencies = [
"prost-derive 0.10.1",
]
[[package]]
name = "prost"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive 0.11.9",
]
[[package]]
name = "prost"
version = "0.13.4"
@ -6329,6 +6406,28 @@ dependencies = [
"which",
]
[[package]]
name = "prost-build"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
dependencies = [
"bytes",
"heck 0.4.1",
"itertools 0.10.5",
"lazy_static",
"log",
"multimap",
"petgraph",
"prettyplease 0.1.25",
"prost 0.11.9",
"prost-types 0.11.9",
"regex",
"syn 1.0.109",
"tempfile",
"which",
]
[[package]]
name = "prost-codec"
version = "0.1.0"
@ -6368,6 +6467,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "prost-derive"
version = "0.13.4"
@ -6401,6 +6513,15 @@ dependencies = [
"prost 0.10.4",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost 0.11.9",
]
[[package]]
name = "prost-types"
version = "0.13.4"
@ -6866,11 +6987,12 @@ dependencies = [
[[package]]
name = "rpc"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"append_merkle",
"base64 0.13.1",
"chunk_pool",
"ethereum-types 0.14.1",
"file_location_cache",
"futures",
"futures-channel",
@ -6881,6 +7003,9 @@ dependencies = [
"miner",
"network",
"parking_lot 0.12.3",
"prost 0.11.9",
"prost-build 0.11.9",
"prost-types 0.11.9",
"serde",
"serde_json",
"shared_types",
@ -6889,6 +7014,9 @@ dependencies = [
"sync",
"task_executor",
"tokio",
"tonic 0.9.2",
"tonic-build",
"tonic-reflection",
"tracing",
]
@ -8188,6 +8316,34 @@ dependencies = [
"winnow 0.6.13",
]
[[package]]
name = "tonic"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-trait",
"axum 0.6.20",
"base64 0.21.7",
"bytes",
"futures-core",
"futures-util",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.29",
"hyper-timeout 0.4.1",
"percent-encoding",
"pin-project 1.1.5",
"prost 0.11.9",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic"
version = "0.12.3"
@ -8196,7 +8352,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum",
"axum 0.7.5",
"base64 0.22.1",
"bytes",
"h2 0.4.7",
@ -8204,7 +8360,7 @@ dependencies = [
"http-body 1.0.1",
"http-body-util",
"hyper 1.5.2",
"hyper-timeout",
"hyper-timeout 0.5.2",
"hyper-util",
"percent-encoding",
"pin-project 1.1.5",
@ -8218,6 +8374,32 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07"
dependencies = [
"prettyplease 0.1.25",
"proc-macro2",
"prost-build 0.11.9",
"quote",
"syn 1.0.109",
]
[[package]]
name = "tonic-reflection"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0543d7092032041fbeac1f2c84304537553421a11a623c2301b12ef0264862c7"
dependencies = [
"prost 0.11.9",
"prost-types 0.11.9",
"tokio",
"tokio-stream",
"tonic 0.9.2",
]
[[package]]
name = "tower"
version = "0.4.13"
@ -8246,9 +8428,9 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"

29
Dockerfile Normal file
View File

@ -0,0 +1,29 @@
# ---------- Dockerfile ----------
FROM rust
# 0) Install build deps (same as you had)
RUN apt-get update && \
apt-get install -y clang cmake build-essential pkg-config libssl-dev
# 1) Copy sources and build the binary
WORKDIR /app
COPY . .
RUN cargo build --release
# 2) Keep the binary on $PATH (optional convenience)
RUN install -Dm755 target/release/zgs_node /usr/local/bin/zgs_node
# 3) Persist chain data
VOLUME ["/data"]
###############################################################################
# 4) Runtime flags grab everything from env vars that youll pass with
# `docker run -e …`. Shell-form CMD lets us interpolate ${…} at start-time.
###############################################################################
CMD zgs_node \
--config run/config-testnet-turbo.toml \
--log-config-file run/log_config \
--miner-key "${STORAGE_MINER_PRIVATE_KEY:?missing STORAGE_MINER_PRIVATE_KEY}" \
--blockchain-rpc-endpoint "${STORAGE_BLOCKCHAIN_RPC_ENDPOINT:?missing STORAGE_BLOCKCHAIN_RPC_ENDPOINT}" \
--network-enr-address "${STORAGE_ENR_ADDRESS:?missing STORAGE_ENR_ADDRESS}" \
--db-max-num-chunks "${STORAGE_DB_MAX_NUM_SECTORS:-8000000000}"

View File

@ -1,6 +0,0 @@
FROM rust
VOLUME ["/data"]
COPY . .
RUN apt-get update && apt-get install -y clang cmake build-essential pkg-config libssl-dev
RUN cargo build --release
CMD ["./target/release/zgs_node", "--config", "run/config-testnet-turbo.toml", "--log", "run/log_config"]

View File

@ -16,13 +16,13 @@ Across the two lanes, 0G Storage supports the following features:
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Validated Incentivization**: Utilizes the PoRA (Proof of Random Access) mining algorithm to mitigate the data outsourcing issue and to ensure rewards are distributed to nodes who contribute to the storage network.
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/0g-storage).
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/concepts/storage).
## Documentation
- If you want to run a node, please refer to the [Running a Node](https://docs.0g.ai/run-a-node/storage-node) guide.
- If you want to conduct local testing, please refer to [Onebox Testing](https://github.com/0glabs/0g-storage-node/blob/main/docs/onebox-test.md) guide.
- If you want to build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/build-with-0g/storage-sdk) guide.
- If you want to build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/developer-hub/building-on-0g/storage/sdk) guide.
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
@ -30,3 +30,4 @@ We want to do everything we can to help you be successful while working on your
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)
- [OG X](https://x.com/0G_labs)

View File

@ -16,12 +16,84 @@ use tracing::{trace, warn};
use crate::merkle_tree::MerkleTreeWrite;
pub use crate::merkle_tree::{
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, ZERO_HASHES,
Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead, OptionalHash, ZERO_HASHES,
};
pub use crate::node_manager::{EmptyNodeDatabase, NodeDatabase, NodeManager, NodeTransaction};
pub use proof::{Proof, RangeProof};
pub use sha3::Sha3Algorithm;
// Helper functions for converting between H256 and OptionalHash types
use ethereum_types::H256;
impl AppendMerkleTree<OptionalHash, Sha3Algorithm> {
/// Convert a proof of OptionalHash to a proof of H256
pub fn convert_proof_to_h256(proof: Proof<OptionalHash>) -> Result<Proof<H256>, anyhow::Error> {
let lemma: Result<Vec<H256>, anyhow::Error> = proof
.lemma()
.iter()
.map(|oh| {
oh.0.ok_or_else(|| anyhow::anyhow!("Cannot convert null OptionalHash to H256"))
})
.collect();
Proof::new(lemma?, proof.path().to_vec())
}
/// Convert a range proof of OptionalHash to a range proof of H256
pub fn convert_range_proof_to_h256(
proof: RangeProof<OptionalHash>,
) -> Result<RangeProof<H256>, anyhow::Error> {
Ok(RangeProof {
left_proof: Self::convert_proof_to_h256(proof.left_proof)?,
right_proof: Self::convert_proof_to_h256(proof.right_proof)?,
})
}
/// Convert a Proof<H256> to Proof<OptionalHash>
pub fn convert_proof_from_h256(
proof: Proof<H256>,
) -> Result<Proof<OptionalHash>, anyhow::Error> {
let lemma = proof
.lemma()
.iter()
.map(|h| OptionalHash::some(*h))
.collect();
let path = proof.path().to_vec();
Proof::new(lemma, path)
}
/// Convert a RangeProof<H256> to RangeProof<OptionalHash>
pub fn convert_range_proof_from_h256(
range_proof: RangeProof<H256>,
) -> Result<RangeProof<OptionalHash>, anyhow::Error> {
Ok(RangeProof {
left_proof: Self::convert_proof_from_h256(range_proof.left_proof)?,
right_proof: Self::convert_proof_from_h256(range_proof.right_proof)?,
})
}
/// Generate a proof and convert it to H256
pub fn gen_proof_h256(&self, leaf_index: usize) -> Result<Proof<H256>, anyhow::Error> {
let proof = self.gen_proof(leaf_index)?;
Self::convert_proof_to_h256(proof)
}
/// Generate a range proof and convert it to H256
pub fn gen_range_proof_h256(
&self,
start_index: usize,
end_index: usize,
) -> Result<RangeProof<H256>, anyhow::Error> {
let proof = self.gen_range_proof(start_index, end_index)?;
Self::convert_range_proof_to_h256(proof)
}
/// Get the root as H256 (unwraps the OptionalHash)
pub fn root_h256(&self) -> H256 {
self.root().unwrap()
}
}
pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
/// Keep all the nodes in the latest version. `layers[0]` is the layer of leaves.
node_manager: NodeManager<E>,
@ -148,7 +220,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn append(&mut self, new_leaf: E) {
let start_time = Instant::now();
if new_leaf == E::null() {
if new_leaf.is_null() {
// appending null is not allowed.
return;
}
@ -162,7 +234,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn append_list(&mut self, leaf_list: Vec<E>) {
let start_time = Instant::now();
if leaf_list.contains(&E::null()) {
if leaf_list.iter().any(|leaf| leaf.is_null()) {
// appending null is not allowed.
return;
}
@ -181,7 +253,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// TODO: Optimize to avoid storing the `null` nodes?
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
let start_time = Instant::now();
if subtree_root == E::null() {
if subtree_root.is_null() {
// appending null is not allowed.
bail!("subtree_root is null");
}
@ -197,7 +269,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
let start_time = Instant::now();
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
if subtree_list.iter().any(|(_, root)| root.is_null()) {
// appending null is not allowed.
bail!("subtree_list contains null");
}
@ -217,7 +289,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
pub fn update_last(&mut self, updated_leaf: E) {
let start_time = Instant::now();
if updated_leaf == E::null() {
if updated_leaf.is_null() {
// updating to null is not allowed.
return;
}
@ -237,9 +309,9 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Panics if the leaf is already set and different or the index is out of range.
/// TODO: Batch computing intermediate nodes.
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
if leaf == E::null() {
if leaf.is_null() {
// fill leaf with null is not allowed.
} else if self.node(0, index) == E::null() {
} else if self.node(0, index).is_null() {
self.node_manager.start_transaction();
self.update_node(0, index, leaf);
self.recompute_after_fill_leaves(index, index + 1);
@ -332,7 +404,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// skip padding node.
continue;
}
if self.node(i, position) == E::null() {
if self.node(i, position).is_null() {
self.update_node(i, position, data.clone());
updated_nodes.push((i, position, data))
} else if self.node(i, position) != data {
@ -357,7 +429,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
if position >= self.leaves() {
bail!("Out of bound: position={} end={}", position, self.leaves());
}
if self.node(0, position) != E::null() {
if !self.node(0, position).is_null() {
Ok(Some(self.node(0, position)))
} else {
// The leaf hash is unknown.
@ -472,7 +544,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// Note that if we are recompute a range of an existing tree,
// we do not need to keep these possibly null parent. This is only saved
// for the case of constructing a new tree from the leaves.
let parent = if *left == E::null() || *right == E::null() {
let parent = if left.is_null() || right.is_null() {
E::null()
} else {
A::parent(left, right)
@ -483,7 +555,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
assert_eq!(chunk.len(), 1);
let r = &chunk[0];
// Same as above.
let parent = if *r == E::null() {
let parent = if r.is_null() {
E::null()
} else {
A::parent_single(r, height + self.leaf_height)
@ -501,8 +573,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
match parent_index.cmp(&self.layer_len(height + 1)) {
Ordering::Less => {
// We do not overwrite with null.
if parent != E::null() {
if self.node(height + 1, parent_index) == E::null()
if !parent.is_null() {
if self.node(height + 1, parent_index).is_null()
// The last node in a layer can be updated.
|| (self.node(height + 1, parent_index) != parent
&& parent_index == self.layer_len(height + 1) - 1)
@ -741,7 +813,7 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
type E = E;
fn node(&self, layer: usize, index: usize) -> Self::E {
match self.delta_nodes.get(layer, index).expect("range checked") {
Some(node) if *node != E::null() => node.clone(),
Some(node) if !node.is_null() => node.clone(),
_ => self
.node_manager
.get_node(layer, index)
@ -798,7 +870,7 @@ macro_rules! ensure_eq {
#[cfg(test)]
mod tests {
use crate::merkle_tree::MerkleTreeRead;
use crate::merkle_tree::{MerkleTreeRead, OptionalHash};
use crate::sha3::Sha3Algorithm;
use crate::AppendMerkleTree;
@ -812,21 +884,30 @@ mod tests {
for _ in 0..entry_len {
data.push(H256::random());
}
let mut merkle =
AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data.clone());
let mut merkle = AppendMerkleTree::<OptionalHash, Sha3Algorithm>::new(
vec![OptionalHash::some(H256::zero())],
0,
None,
);
merkle.append_list(data.clone().into_iter().map(OptionalHash::some).collect());
merkle.commit(Some(0));
verify(&data, &mut merkle);
data.push(H256::random());
merkle.append(*data.last().unwrap());
merkle.append(OptionalHash::some(*data.last().unwrap()));
merkle.commit(Some(1));
verify(&data, &mut merkle);
for _ in 0..6 {
data.push(H256::random());
}
merkle.append_list(data[data.len() - 6..].to_vec());
merkle.append_list(
data[data.len() - 6..]
.iter()
.copied()
.map(OptionalHash::some)
.collect(),
);
merkle.commit(Some(2));
verify(&data, &mut merkle);
}
@ -840,9 +921,12 @@ mod tests {
for _ in 0..entry_len {
data.push(H256::random());
}
let mut merkle =
AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data.clone());
let mut merkle = AppendMerkleTree::<OptionalHash, Sha3Algorithm>::new(
vec![OptionalHash::some(H256::zero())],
0,
None,
);
merkle.append_list(data.clone().into_iter().map(OptionalHash::some).collect());
merkle.commit(Some(0));
for i in (0..data.len()).step_by(6) {
@ -850,12 +934,17 @@ mod tests {
let range_proof = merkle.gen_range_proof(i + 1, end + 1).unwrap();
let mut new_data = Vec::new();
for _ in 0..3 {
new_data.push(H256::random());
new_data.push(OptionalHash::some(H256::random()));
}
merkle.append_list(new_data);
let seq = i as u64 / 6 + 1;
merkle.commit(Some(seq));
let r = range_proof.validate::<Sha3Algorithm>(&data[i..end], i + 1);
let optional_data: Vec<OptionalHash> = data[i..end]
.iter()
.copied()
.map(OptionalHash::some)
.collect();
let r = range_proof.validate::<Sha3Algorithm>(&optional_data, i + 1);
assert!(r.is_ok(), "{:?}", r);
merkle.fill_with_range_proof(range_proof).unwrap();
}
@ -865,7 +954,11 @@ mod tests {
#[test]
fn test_proof_at_version() {
let n = [2, 255, 256, 257];
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
let mut merkle = AppendMerkleTree::<OptionalHash, Sha3Algorithm>::new(
vec![OptionalHash::some(H256::zero())],
0,
None,
);
let mut start_pos = 0;
for (tx_seq, &entry_len) in n.iter().enumerate() {
@ -873,7 +966,7 @@ mod tests {
for _ in 0..entry_len {
data.push(H256::random());
}
merkle.append_list(data.clone());
merkle.append_list(data.clone().into_iter().map(OptionalHash::some).collect());
merkle.commit(Some(tx_seq as u64));
for i in (0..data.len()).step_by(6) {
let end = std::cmp::min(start_pos + i + 3, data.len());
@ -882,7 +975,12 @@ mod tests {
.unwrap()
.gen_range_proof(start_pos + i + 1, start_pos + end + 1)
.unwrap();
let r = range_proof.validate::<Sha3Algorithm>(&data[i..end], start_pos + i + 1);
let optional_data: Vec<OptionalHash> = data[i..end]
.iter()
.copied()
.map(OptionalHash::some)
.collect();
let r = range_proof.validate::<Sha3Algorithm>(&optional_data, start_pos + i + 1);
assert!(r.is_ok(), "{:?}", r);
merkle.fill_with_range_proof(range_proof).unwrap();
}
@ -891,16 +989,21 @@ mod tests {
}
}
fn verify(data: &[H256], merkle: &mut AppendMerkleTree<H256, Sha3Algorithm>) {
fn verify(data: &[H256], merkle: &mut AppendMerkleTree<OptionalHash, Sha3Algorithm>) {
for (i, item) in data.iter().enumerate() {
let proof = merkle.gen_proof(i + 1).unwrap();
let r = merkle.validate(&proof, item, i + 1);
let r = merkle.validate(&proof, &OptionalHash::some(*item), i + 1);
assert!(matches!(r, Ok(true)), "{:?}", r);
}
for i in (0..data.len()).step_by(6) {
let end = std::cmp::min(i + 3, data.len());
let range_proof = merkle.gen_range_proof(i + 1, end + 1).unwrap();
let r = range_proof.validate::<Sha3Algorithm>(&data[i..end], i + 1);
let optional_data: Vec<OptionalHash> = data[i..end]
.iter()
.copied()
.map(OptionalHash::some)
.collect();
let r = range_proof.validate::<Sha3Algorithm>(&optional_data, i + 1);
assert!(r.is_ok(), "{:?}", r);
merkle.fill_with_range_proof(range_proof).unwrap();
}

View File

@ -8,6 +8,173 @@ use std::fmt::Debug;
use std::hash::Hash;
use tracing::trace;
/// A wrapper around Option<H256> that properly handles null hashes
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct OptionalHash(pub Option<H256>);
impl OptionalHash {
pub fn some(hash: H256) -> Self {
OptionalHash(Some(hash))
}
pub fn none() -> Self {
OptionalHash(None)
}
pub fn is_some(&self) -> bool {
self.0.is_some()
}
pub fn is_none(&self) -> bool {
self.0.is_none()
}
pub fn unwrap(&self) -> H256 {
self.0.unwrap()
}
pub fn unwrap_or(&self, default: H256) -> H256 {
self.0.unwrap_or(default)
}
pub fn as_ref(&self) -> Option<&H256> {
self.0.as_ref()
}
/// Create OptionalHash from a byte slice
pub fn from_slice(bytes: &[u8]) -> Result<Self, &'static str> {
if bytes.len() != 32 {
return Err("Invalid byte length for H256");
}
let mut hash_bytes = [0u8; 32];
hash_bytes.copy_from_slice(bytes);
Ok(OptionalHash::some(H256(hash_bytes)))
}
/// Convert to bytes for storage (33 bytes: 1 flag + 32 hash)
pub fn as_bytes(&self) -> [u8; 33] {
let mut bytes = [0u8; 33];
match &self.0 {
Some(hash) => {
bytes[0] = 1; // Some flag
bytes[1..].copy_from_slice(hash.as_ref());
}
None => {
bytes[0] = 0; // None flag
// bytes[1..] remain zeros
}
}
bytes
}
/// Create OptionalHash from storage bytes (33 bytes)
pub fn from_bytes(bytes: &[u8; 33]) -> Result<Self, &'static str> {
match bytes[0] {
0 => Ok(OptionalHash::none()),
1 => {
let mut hash_bytes = [0u8; 32];
hash_bytes.copy_from_slice(&bytes[1..]);
Ok(OptionalHash::some(H256(hash_bytes)))
}
_ => Err("Invalid flag byte for OptionalHash"),
}
}
}
// Add From conversions for easier usage
impl From<H256> for OptionalHash {
fn from(hash: H256) -> Self {
OptionalHash::some(hash)
}
}
impl From<Option<H256>> for OptionalHash {
fn from(opt: Option<H256>) -> Self {
OptionalHash(opt)
}
}
impl From<OptionalHash> for Option<H256> {
fn from(opt_hash: OptionalHash) -> Self {
opt_hash.0
}
}
impl AsRef<[u8]> for OptionalHash {
fn as_ref(&self) -> &[u8] {
self.0.as_ref().unwrap().as_ref()
}
}
impl AsMut<[u8]> for OptionalHash {
fn as_mut(&mut self) -> &mut [u8] {
if self.0.is_none() {
self.0 = Some(H256::zero());
}
self.0.as_mut().unwrap().as_mut()
}
}
impl Encode for OptionalHash {
fn is_ssz_fixed_len() -> bool {
true
}
fn ssz_fixed_len() -> usize {
33 // 1 byte for Some/None flag + 32 bytes for hash
}
fn ssz_bytes_len(&self) -> usize {
33
}
fn ssz_append(&self, buf: &mut Vec<u8>) {
match &self.0 {
Some(hash) => {
buf.push(1); // Some flag
hash.ssz_append(buf);
}
None => {
buf.push(0); // None flag
buf.extend_from_slice(&[0u8; 32]); // Padding zeros
}
}
}
}
impl Decode for OptionalHash {
fn is_ssz_fixed_len() -> bool {
true
}
fn ssz_fixed_len() -> usize {
33 // 1 byte for Some/None flag + 32 bytes for hash
}
fn from_ssz_bytes(bytes: &[u8]) -> Result<Self, ssz::DecodeError> {
if bytes.len() != 33 {
return Err(ssz::DecodeError::InvalidByteLength {
len: bytes.len(),
expected: 33,
});
}
match bytes[0] {
0 => Ok(OptionalHash::none()),
1 => {
let hash = H256::from_ssz_bytes(&bytes[1..])?;
Ok(OptionalHash::some(hash))
}
_ => Err(ssz::DecodeError::BytesInvalid(
"Invalid flag byte for OptionalHash".to_string(),
)),
}
}
}
unsafe impl Send for OptionalHash {}
unsafe impl Sync for OptionalHash {}
pub trait HashElement:
Clone + Debug + Eq + Hash + AsRef<[u8]> + AsMut<[u8]> + Decode + Encode + Send + Sync
{
@ -18,13 +185,28 @@ pub trait HashElement:
}
}
impl HashElement for OptionalHash {
fn end_pad(height: usize) -> Self {
OptionalHash::some(ZERO_HASHES[height])
}
fn null() -> Self {
OptionalHash::none()
}
fn is_null(&self) -> bool {
self.is_none()
}
}
// Keep the H256 implementation for backward compatibility
impl HashElement for H256 {
fn end_pad(height: usize) -> Self {
ZERO_HASHES[height]
}
fn null() -> Self {
H256::repeat_byte(1)
H256::repeat_byte(0x01)
}
}
@ -70,7 +252,7 @@ pub trait MerkleTreeRead {
self.leaves()
);
}
if self.node(0, leaf_index) == Self::E::null() {
if self.node(0, leaf_index).is_null() {
bail!("Not ready to generate proof for leaf_index={}", leaf_index);
}
if self.height() == 1 {
@ -102,7 +284,7 @@ pub trait MerkleTreeRead {
index_in_layer >>= 1;
}
lemma.push(self.root());
if lemma.contains(&Self::E::null()) {
if lemma.iter().any(|e| e.is_null()) {
bail!(
"Not enough data to generate proof, lemma={:?} path={:?}",
lemma,

View File

@ -13,7 +13,11 @@ impl<T: HashElement> Proof<T> {
/// Creates new MT inclusion proof
pub fn new(hash: Vec<T>, path: Vec<bool>) -> Result<Proof<T>> {
if hash.len() != path.len() + 2 {
bail!("hash and path length mismatch");
bail!(
"Proof::new: expected hash length = path.len() + 2, but got {} and {}",
hash.len(),
path.len()
);
}
Ok(Proof { lemma: hash, path })
}
@ -198,20 +202,22 @@ impl<E: HashElement> RangeProof<E> {
ensure_eq!(self.left_proof.position(), start_position);
ensure_eq!(self.right_proof.position(), end_position);
let tree_depth = self.left_proof.path().len() + 1;
// TODO: We can avoid copying the first layer.
let mut children_layer = range_leaves.to_vec();
// Avoid copying the first layer by working directly with the slice
let mut children_layer = Vec::new();
let mut current_layer = range_leaves;
for height in 0..(tree_depth - 1) {
let mut parent_layer = Vec::new();
let start_index = if !self.left_proof.path()[height] {
// If the left-most node is the right child, its sibling is not within the data range and should be retrieved from the proof.
let parent = A::parent(&self.left_proof.lemma()[height + 1], &children_layer[0]);
let parent = A::parent(&self.left_proof.lemma()[height + 1], &current_layer[0]);
parent_layer.push(parent);
1
} else {
// The left-most node is the left child, its sibling is just the next child.
0
};
let mut iter = children_layer[start_index..].chunks_exact(2);
let mut iter = current_layer[start_index..].chunks_exact(2);
while let Some([left, right]) = iter.next() {
parent_layer.push(A::parent(left, right))
}
@ -223,10 +229,19 @@ impl<E: HashElement> RangeProof<E> {
}
}
children_layer = parent_layer;
current_layer = &children_layer;
}
// If no iterations occurred, the root should be computed from the original range_leaves
if children_layer.is_empty() {
ensure_eq!(range_leaves.len(), 1);
let computed_root = range_leaves[0].clone();
ensure_eq!(computed_root, self.root());
} else {
ensure_eq!(children_layer.len(), 1);
let computed_root = children_layer.pop().unwrap();
ensure_eq!(computed_root, self.root());
}
ensure_eq!(children_layer.len(), 1);
let computed_root = children_layer.pop().unwrap();
ensure_eq!(computed_root, self.root());
Ok(())
}

View File

@ -1,4 +1,4 @@
use crate::merkle_tree::ZERO_HASHES;
use crate::merkle_tree::{OptionalHash, ZERO_HASHES};
use crate::{Algorithm, HashElement};
use ethereum_types::H256;
use once_cell::sync::Lazy;
@ -50,3 +50,22 @@ impl Algorithm<H256> for Sha3Algorithm {
Self::leaf_raw(data)
}
}
impl Algorithm<OptionalHash> for Sha3Algorithm {
fn parent(left: &OptionalHash, right: &OptionalHash) -> OptionalHash {
match (&left.0, &right.0) {
(Some(l), Some(r)) => {
// Use the H256 implementation directly to ensure identical logic
let result = <Self as Algorithm<H256>>::parent(l, r);
OptionalHash::some(result)
}
_ => OptionalHash::none(),
}
}
fn leaf(data: &[u8]) -> OptionalHash {
// Use the H256 implementation directly to ensure identical logic
let result = <Self as Algorithm<H256>>::leaf(data);
OptionalHash::some(result)
}
}

View File

@ -61,7 +61,7 @@ where
self.insert_at(key, self.default_entry_timeout);
}
/// Inserts an entry that will expire at a given instant. If the entry already exists, the
/// Inserts an entry that will expire at a given duration. If the entry already exists, the
/// timeout is updated.
pub fn insert_at(&mut self, key: K, entry_duration: Duration) {
if self.contains(&key) {

View File

@ -78,7 +78,7 @@ pub fn try_create_int_counter(name: &str, help: &str) -> Result<IntCounter> {
Ok(counter)
}
/// Attempts to create an `IntGauge`, returning `Err` if the registry does not accept the counter
/// Attempts to create an `IntGauge`, returning `Err` if the registry does not accept the gauge
/// (potentially due to naming conflict).
pub fn try_create_int_gauge(name: &str, help: &str) -> Result<IntGauge> {
let opts = Opts::new(name, help);
@ -87,7 +87,7 @@ pub fn try_create_int_gauge(name: &str, help: &str) -> Result<IntGauge> {
Ok(gauge)
}
/// Attempts to create a `Gauge`, returning `Err` if the registry does not accept the counter
/// Attempts to create a `Gauge`, returning `Err` if the registry does not accept the gauge
/// (potentially due to naming conflict).
pub fn try_create_float_gauge(name: &str, help: &str) -> Result<Gauge> {
let opts = Opts::new(name, help);
@ -96,7 +96,7 @@ pub fn try_create_float_gauge(name: &str, help: &str) -> Result<Gauge> {
Ok(gauge)
}
/// Attempts to create a `Histogram`, returning `Err` if the registry does not accept the counter
/// Attempts to create a `Histogram`, returning `Err` if the registry does not accept the histogram
/// (potentially due to naming conflict).
pub fn try_create_histogram(name: &str, help: &str) -> Result<Histogram> {
let opts = HistogramOpts::new(name, help);
@ -105,7 +105,7 @@ pub fn try_create_histogram(name: &str, help: &str) -> Result<Histogram> {
Ok(histogram)
}
/// Attempts to create a `HistogramVec`, returning `Err` if the registry does not accept the counter
/// Attempts to create a `HistogramVec`, returning `Err` if the registry does not accept the histogram
/// (potentially due to naming conflict).
pub fn try_create_histogram_vec(
name: &str,
@ -144,7 +144,7 @@ pub fn try_create_float_gauge_vec(
Ok(counter_vec)
}
/// Attempts to create a `IntCounterVec`, returning `Err` if the registry does not accept the gauge
/// Attempts to create a `IntCounterVec`, returning `Err` if the registry does not accept the counter
/// (potentially due to naming conflict).
pub fn try_create_int_counter_vec(
name: &str,

View File

@ -20,7 +20,7 @@ pub fn init_once() {
extern "C" {
fn GFp_cpuid_setup();
}
static INIT: std::sync::Once = std::sync::ONCE_INIT;
static INIT: std::sync::Once = std::sync::Once::new();
INIT.call_once(|| unsafe { GFp_cpuid_setup() });
}
}

View File

@ -4,7 +4,7 @@
## Atomicity
When an application server linking with the 0G Storage key-value runtime starts a transaction using `BeginTx()` interface, it notifies the runtime that the transaction will work on the current state snapshot constructed by playing the log to the current tail. The further key-value operations before the invocation of `EndTx()` updates the key-values locally in the server without exposing the updates to the log. When `EndTx()` is invoked, the runtime composes a commit record containing the log position the transaction starts from and the read-write set of the transaction. This commit record is then appended to the log.
When an application server linking with the 0G Storage key-value runtime starts a transaction using `BeginTx()` interface, it notifies the runtime that the transaction will work on the current state snapshot constructed by playing the log to the current tail. The further key-value operations before the invocation of `EndTx()` update the key-values locally in the server without exposing the updates to the log. When `EndTx()` is invoked, the runtime composes a commit record containing the log position the transaction starts from and the read-write set of the transaction. This commit record is then appended to the log.
When an application server with the key-value runtime encounters the commit record during playing the log, it identifies a conflict window consisting of all the log entries between the start log position of the transaction and the position of the commit record. The log entries in the conflict window therefore contain the key-value operations concurrent with the transaction submitting the commit record. The runtime further detects whether these concurrent operations contain the updates on the keys belonging to the read set of the transaction. If yes, the transaction is aborted, otherwise committed successfully.

View File

@ -95,7 +95,7 @@ impl<'a> Miner<'a> {
.enumerate()
.zip(scratch_pad.iter().cycle())
.zip(availabilities.into_iter())
.filter_map(|(data, availiable)| availiable.then_some(data))
.filter_map(|(data, available)| available.then_some(data))
{
inc_counter(&PAD_MIX_COUNT);
// Rust can optimize this loop well.

View File

@ -26,6 +26,10 @@ pub type MineContextMessage = Option<PoraPuzzle>;
lazy_static! {
pub static ref EMPTY_HASH: H256 =
H256::from_str("c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").unwrap();
pub static ref COMPUTE_WORKER_CONTEXT_CALLER: Address =
"0x000000000000000000000000000000000000000A"
.parse()
.unwrap();
}
const PORA_VERSION: u64 = 1;
@ -139,6 +143,8 @@ impl MineContextWatcher {
}
let miner_id = self.miner_id.0;
// Use eth_call with specific caller address for read-only access
let WorkerContext {
context,
pora_target,
@ -147,6 +153,7 @@ impl MineContextWatcher {
} = self
.mine_contract
.compute_worker_context(miner_id)
.from(*COMPUTE_WORKER_CONTEXT_CALLER)
.call()
.await
.map_err(|e| format!("Failed to query mining context: {:?}", e))?;

View File

@ -390,7 +390,7 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
.gossipsub
.publish(topic.clone().into(), message_data.clone())
{
warn!(error = ?e, topic = ?topic.kind(), "Failed to publish message");
trace!(error = ?e, topic = ?topic.kind(), "Failed to publish message");
// add to metrics
if let Some(v) = metrics::get_int_gauge(
@ -1017,8 +1017,8 @@ impl std::convert::From<Request> for OutboundRequest {
/// The type of RPC responses the Behaviour informs it has received, and allows for sending.
///
// NOTE: This is an application-level wrapper over the lower network level responses that can be
// sent. The main difference is the absense of Pong and Metadata, which don't leave the
// Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and
// sent. The main difference is the absence of Pong and Metadata, which don't leave the
// Behaviour. For all protocol responses managed by RPC see `RPCResponse` and
// `RPCCodedResponse`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Response {

View File

@ -141,7 +141,7 @@ impl RPCRateLimiterBuilder {
self.set_quota(protocol, Quota::one_every(time_period))
}
/// Allow `n` tokens to be use used every `time_period` for this `protocol`.
/// Allow `n` tokens to be used every `time_period` for this `protocol`.
pub fn n_every(self, protocol: Protocol, n: u64, time_period: Duration) -> Self {
self.set_quota(protocol, Quota::n_every(n, time_period))
}

View File

@ -512,7 +512,7 @@ impl Libp2pEventHandler {
info!(
?addr,
"Create public ip address to broadcase file announcement"
"Create public ip address to broadcast file announcement"
);
Some(addr)

View File

@ -299,7 +299,7 @@ impl RouterService {
}
NetworkMessage::Publish { messages } => {
if self.libp2p.swarm.connected_peers().next().is_none() {
// this is a boardcast message, when current node doesn't have any peers connected, try to connect any peer in config
// this is a broadcast message, when current node doesn't have any peers connected, try to connect any peer in config
for multiaddr in &self.config.libp2p_nodes {
match Swarm::dial(&mut self.libp2p.swarm, multiaddr.clone()) {
Ok(()) => {

View File

@ -1,7 +1,8 @@
[package]
name = "rpc"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
build = "build.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -28,3 +29,12 @@ merkle_tree = { path = "../../common/merkle_tree"}
futures-channel = "^0.3"
metrics = { workspace = true }
parking_lot = "0.12.3"
tonic = { version = "0.9.2", features = ["transport"] }
prost = "0.11.9"
prost-types = "0.11.9"
tonic-reflection = "0.9.2"
ethereum-types = "0.14"
[build-dependencies]
tonic-build = "0.9.2"
prost-build = "0.11.9"

7
node/rpc/build.rs Normal file
View File

@ -0,0 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Compile proto/my_service.proto
tonic_build::configure()
.file_descriptor_set_path("proto/zgs_grpc_descriptor.bin")
.compile(&["proto/zgs_grpc.proto"], &["proto"])?;
Ok(())
}

View File

@ -0,0 +1,48 @@
syntax = "proto3";
package zgs_grpc;
option go_package = "github.com/0glabs/0g-storage-client/node/proto;zgs_grpc";
message Empty {}
/// 32-byte hash root
message DataRoot {
bytes value = 1;
}
/// A proof over a file-segment Merkle tree
message FileProof {
/// sequence of 32-byte hashes
repeated bytes lemma = 1;
/// bit-paths (left=false, right=true) alongside the lemmas
repeated bool path = 2;
}
/// A file segment plus its Merkle proof
message SegmentWithProof {
DataRoot root = 1; // file Merkle root
bytes data = 2; // raw segment bytes
uint64 index = 3; // segment index
FileProof proof = 4; // Merkle proof of this leaf
uint64 file_size = 5; // total file length
}
message UploadSegmentsByTxSeqRequest {
repeated SegmentWithProof segments = 1;
uint64 tx_seq = 2;
}
message PingRequest {
string message = 1;
}
message PingReply {
string message = 1;
}
// A trivial ping service
service ZgsGrpcService {
rpc Ping (PingRequest) returns (PingReply);
rpc UploadSegmentsByTxSeq(UploadSegmentsByTxSeqRequest) returns (Empty);
}

View File

@ -8,6 +8,7 @@ pub struct Config {
pub enabled: bool,
pub listen_address: SocketAddr,
pub listen_address_admin: SocketAddr,
pub listen_address_grpc: SocketAddr,
pub chunks_per_segment: usize,
pub max_request_body_size: u32,
pub max_cache_file_size: usize,
@ -19,6 +20,7 @@ impl Default for Config {
enabled: true,
listen_address: SocketAddr::from_str("0.0.0.0:5678").unwrap(),
listen_address_admin: SocketAddr::from_str("127.0.0.1:5679").unwrap(),
listen_address_grpc: SocketAddr::from_str("0.0.0.0:50051").unwrap(),
chunks_per_segment: 1024,
max_request_body_size: 100 * 1024 * 1024, // 100MB
max_cache_file_size: 10 * 1024 * 1024, // 10MB

View File

@ -8,10 +8,15 @@ mod config;
mod error;
mod middleware;
mod miner;
mod rpc_helper;
pub mod types;
mod zgs;
mod zgs_grpc;
use crate::miner::RpcServer as MinerRpcServer;
use crate::types::SegmentWithProof;
use crate::zgs_grpc::r#impl::ZgsGrpcServiceImpl;
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcServiceServer;
use admin::RpcServer as AdminRpcServer;
use chunk_pool::MemoryChunkPool;
use file_location_cache::FileLocationCache;
@ -20,11 +25,14 @@ use jsonrpsee::core::RpcResult;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
use std::error::Error;
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::sync::Arc;
use storage_async::Store;
use sync::{SyncRequest, SyncResponse, SyncSender};
use task_executor::ShutdownReason;
use tokio::sync::broadcast;
use tonic::transport::Server;
use tonic_reflection::server::Builder as ReflectionBuilder;
use zgs::RpcServer as ZgsRpcServer;
use zgs_miner::MinerMessage;
@ -33,6 +41,12 @@ pub use config::Config as RPCConfig;
pub use miner::RpcClient as ZgsMinerRpcClient;
pub use zgs::RpcClient as ZgsRPCClient;
pub mod zgs_grpc_proto {
tonic::include_proto!("zgs_grpc");
}
const DESCRIPTOR_SET: &[u8] = include_bytes!("../proto/zgs_grpc_descriptor.bin");
/// A wrapper around all the items required to spawn the HTTP server.
///
/// The server will gracefully handle the case where any fields are `None`.
@ -73,7 +87,7 @@ pub async fn run_server(
(run_server_all(ctx).await?, None)
};
info!("Server started");
info!("Rpc Server started");
Ok(handles)
}
@ -133,3 +147,76 @@ async fn run_server_public_private(
Ok((handle_public, Some(handle_private)))
}
pub async fn run_grpc_server(ctx: Context) -> Result<(), Box<dyn Error>> {
let grpc_addr = ctx.config.listen_address_grpc;
let reflection = ReflectionBuilder::configure()
.register_encoded_file_descriptor_set(DESCRIPTOR_SET)
.build()?;
let server = ZgsGrpcServiceServer::new(ZgsGrpcServiceImpl { ctx });
Server::builder()
.add_service(server)
.add_service(reflection)
.serve(grpc_addr)
.await?;
Ok(())
}
enum SegmentIndex {
Single(usize),
Range(usize, usize), // [start, end]
}
impl Debug for SegmentIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
Self::Single(val) => write!(f, "{}", val),
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
}
}
}
struct SegmentIndexArray {
items: Vec<SegmentIndex>,
}
impl Debug for SegmentIndexArray {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self.items.first() {
None => write!(f, "NULL"),
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
_ => write!(f, "{:?}", self.items),
}
}
}
impl SegmentIndexArray {
fn new(segments: &[SegmentWithProof]) -> Self {
let mut items = Vec::new();
let mut current = match segments.first() {
None => return SegmentIndexArray { items },
Some(seg) => SegmentIndex::Single(seg.index),
};
for index in segments.iter().skip(1).map(|seg| seg.index) {
match current {
SegmentIndex::Single(val) if val + 1 == index => {
current = SegmentIndex::Range(val, index)
}
SegmentIndex::Range(start, end) if end + 1 == index => {
current = SegmentIndex::Range(start, index)
}
_ => {
items.push(current);
current = SegmentIndex::Single(index);
}
}
}
items.push(current);
SegmentIndexArray { items }
}
}

View File

@ -0,0 +1,99 @@
use crate::error;
use crate::types::SegmentWithProof;
use crate::Context;
use chunk_pool::SegmentInfo;
use jsonrpsee::core::RpcResult;
use shared_types::Transaction;
/// Put a single segment (mirrors your old `put_segment`)
pub async fn put_segment(ctx: &Context, segment: SegmentWithProof) -> RpcResult<()> {
debug!(root = %segment.root, index = %segment.index, "putSegment");
// fetch optional tx
let maybe_tx = ctx
.log_store
.get_tx_by_data_root(&segment.root, false)
.await?;
put_segment_with_maybe_tx(ctx, segment, maybe_tx).await
}
/// Put a segment, given an optional Transaction (mirrors `put_segment_with_maybe_tx`)
pub async fn put_segment_with_maybe_tx(
ctx: &Context,
segment: SegmentWithProof,
maybe_tx: Option<Transaction>,
) -> RpcResult<()> {
ctx.chunk_pool.validate_segment_size(&segment.data)?;
if let Some(tx) = &maybe_tx {
if tx.data_merkle_root != segment.root {
return Err(error::internal_error("data root and tx seq not match"));
}
}
// decide cache vs write
let need_cache = if ctx.chunk_pool.check_already_has_cache(&segment.root).await {
true
} else {
check_need_cache(ctx, &maybe_tx, segment.file_size).await?
};
segment.validate(ctx.config.chunks_per_segment)?;
let seg_info = SegmentInfo {
root: segment.root,
seg_data: segment.data,
seg_proof: segment.proof,
seg_index: segment.index,
chunks_per_segment: ctx.config.chunks_per_segment,
};
if need_cache {
ctx.chunk_pool.cache_chunks(seg_info).await?;
} else {
let file_id = chunk_pool::FileID {
root: seg_info.root,
tx_id: maybe_tx.unwrap().id(),
};
ctx.chunk_pool
.write_chunks(seg_info, file_id, segment.file_size)
.await?;
}
Ok(())
}
/// The old `check_need_cache`
pub async fn check_need_cache(
ctx: &Context,
maybe_tx: &Option<Transaction>,
file_size: usize,
) -> RpcResult<bool> {
if let Some(tx) = maybe_tx {
if tx.size != file_size as u64 {
return Err(error::invalid_params(
"file_size",
"segment file size not matched with tx file size",
));
}
if ctx.log_store.check_tx_completed(tx.seq).await? {
return Err(error::invalid_params(
"root",
"already uploaded and finalized",
));
}
if ctx.log_store.check_tx_pruned(tx.seq).await? {
return Err(error::invalid_params("root", "already pruned"));
}
Ok(false)
} else {
if file_size > ctx.config.max_cache_file_size {
return Err(error::invalid_params(
"file_size",
"caching of large file when tx is unavailable is not supported",
));
}
Ok(true)
}
}

View File

@ -1,5 +1,6 @@
use crate::error;
use crate::{error, zgs_grpc_proto};
use append_merkle::ZERO_HASHES;
use ethereum_types::H256 as EthH256;
use jsonrpsee::core::RpcResult;
use merkle_light::hash::Algorithm;
use merkle_light::merkle::{log2_pow2, next_pow2, MerkleTree};
@ -11,12 +12,14 @@ use shared_types::{
Transaction, CHUNK_SIZE,
};
use std::collections::HashSet;
use std::convert::TryFrom;
use std::hash::Hasher;
use std::net::IpAddr;
use std::time::Instant;
use storage::config::ShardConfig;
use storage::log_store::log_manager::bytes_to_entries;
use storage::H256;
use tonic::Status as GrpcStatus;
const ZERO_HASH: [u8; 32] = [
0xd3, 0x97, 0xb3, 0xb0, 0x43, 0xd8, 0x7f, 0xcd, 0x6f, 0xad, 0x12, 0x91, 0xff, 0xb, 0xfd, 0x16,
@ -76,6 +79,77 @@ pub struct SegmentWithProof {
pub file_size: usize,
}
/// Convert the proto DataRoot → your apps DataRoot
impl TryFrom<zgs_grpc_proto::DataRoot> for DataRoot {
type Error = GrpcStatus;
fn try_from(value: zgs_grpc_proto::DataRoot) -> Result<Self, GrpcStatus> {
let bytes = value.value;
if bytes.len() != 32 {
return Err(GrpcStatus::invalid_argument(format!(
"Invalid hash length: got {}, want 32",
bytes.len()
)));
}
// assume AppDataRoot is a newtype around H256:
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
Ok(EthH256(arr))
}
}
/// Convert proto FileProof → your apps FileProof
impl TryFrom<zgs_grpc_proto::FileProof> for FileProof {
type Error = GrpcStatus;
fn try_from(value: zgs_grpc_proto::FileProof) -> Result<Self, GrpcStatus> {
// turn each `bytes` into an H256
let mut lemma = Vec::with_capacity(value.lemma.len());
for bin in value.lemma {
if bin.len() != 32 {
return Err(GrpcStatus::invalid_argument(format!(
"Invalid hash length: got {}, want 32",
bin.len()
)));
}
let mut arr = [0u8; 32];
arr.copy_from_slice(&bin);
lemma.push(H256(arr));
}
Ok(FileProof {
lemma,
path: value.path,
})
}
}
/// Convert the full SegmentWithProof
impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof {
type Error = GrpcStatus;
fn try_from(grpc_segment: zgs_grpc_proto::SegmentWithProof) -> Result<Self, GrpcStatus> {
let root = grpc_segment.root.unwrap().try_into()?;
let data = grpc_segment.data;
// index is u64 in proto, usize in app
let index = grpc_segment.index.try_into().map_err(|_| {
GrpcStatus::invalid_argument(format!("Invalid segment index: {}", grpc_segment.index))
})?;
let proof = grpc_segment.proof.unwrap().try_into()?;
let file_size = grpc_segment.file_size.try_into().map_err(|_| {
GrpcStatus::invalid_argument(format!("Invalid file size: {}", grpc_segment.file_size))
})?;
Ok(SegmentWithProof {
root,
data,
index,
proof,
file_size,
})
}
}
impl SegmentWithProof {
/// Splits file into segments and returns the total number of segments and the last segment size.
pub fn split_file_into_segments(

View File

@ -1,12 +1,10 @@
use super::api::RpcServer;
use crate::error;
use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use crate::Context;
use chunk_pool::{FileID, SegmentInfo};
use crate::{error, rpc_helper, SegmentIndexArray};
use jsonrpsee::core::async_trait;
use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig;
use storage::log_store::tx_store::TxStatus;
use storage::{try_option, H256};
@ -39,7 +37,7 @@ impl RpcServer for RpcServerImpl {
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
info!(root = %segment.root, index = %segment.index, "zgs_uploadSegment");
self.put_segment(segment).await
rpc_helper::put_segment(&self.ctx, segment).await
}
async fn upload_segment_by_tx_seq(
@ -49,7 +47,7 @@ impl RpcServer for RpcServerImpl {
) -> RpcResult<()> {
info!(tx_seq = %tx_seq, index = %segment.index, "zgs_uploadSegmentByTxSeq");
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
self.put_segment_with_maybe_tx(segment, maybe_tx).await
rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx).await
}
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
@ -61,7 +59,7 @@ impl RpcServer for RpcServerImpl {
info!(%root, ?indices, "zgs_uploadSegments");
for segment in segments.into_iter() {
self.put_segment(segment).await?;
rpc_helper::put_segment(&self.ctx, segment).await?;
}
Ok(())
@ -77,8 +75,17 @@ impl RpcServer for RpcServerImpl {
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
for segment in segments.into_iter() {
self.put_segment_with_maybe_tx(segment, maybe_tx.clone())
.await?;
match rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx.clone()).await
{
Ok(()) => {} // success
Err(e)
if e.to_string()
.contains("segment has already been uploaded or is being uploaded") =>
{
debug!(?e, "duplicate segment - skipping");
}
Err(e) => return Err(e),
}
}
Ok(())
@ -224,44 +231,44 @@ impl RpcServer for RpcServerImpl {
}
impl RpcServerImpl {
async fn check_need_cache(
&self,
maybe_tx: &Option<Transaction>,
file_size: usize,
) -> RpcResult<bool> {
if let Some(tx) = maybe_tx {
if tx.size != file_size as u64 {
return Err(error::invalid_params(
"file_size",
"segment file size not matched with tx file size",
));
}
// async fn check_need_cache(
// &self,
// maybe_tx: &Option<Transaction>,
// file_size: usize,
// ) -> RpcResult<bool> {
// if let Some(tx) = maybe_tx {
// if tx.size != file_size as u64 {
// return Err(error::invalid_params(
// "file_size",
// "segment file size not matched with tx file size",
// ));
// }
// Transaction already finalized for the specified file data root.
if self.ctx.log_store.check_tx_completed(tx.seq).await? {
return Err(error::invalid_params(
"root",
"already uploaded and finalized",
));
}
// // Transaction already finalized for the specified file data root.
// if self.ctx.log_store.check_tx_completed(tx.seq).await? {
// return Err(error::invalid_params(
// "root",
// "already uploaded and finalized",
// ));
// }
if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
return Err(error::invalid_params("root", "already pruned"));
}
// if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
// return Err(error::invalid_params("root", "already pruned"));
// }
Ok(false)
} else {
//Check whether file is small enough to cache in the system
if file_size > self.ctx.config.max_cache_file_size {
return Err(error::invalid_params(
"file_size",
"caching of large file when tx is unavailable is not supported",
));
}
// Ok(false)
// } else {
// //Check whether file is small enough to cache in the system
// if file_size > self.ctx.config.max_cache_file_size {
// return Err(error::invalid_params(
// "file_size",
// "caching of large file when tx is unavailable is not supported",
// ));
// }
Ok(true)
}
}
// Ok(true)
// }
// }
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
let (finalized, pruned) = match self.ctx.log_store.get_store().get_tx_status(tx.seq)? {
@ -301,69 +308,69 @@ impl RpcServerImpl {
})
}
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
debug!(root = %segment.root, index = %segment.index, "putSegment");
// async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
// debug!(root = %segment.root, index = %segment.index, "putSegment");
let maybe_tx = self
.ctx
.log_store
.get_tx_by_data_root(&segment.root, false)
.await?;
// let maybe_tx = self
// .ctx
// .log_store
// .get_tx_by_data_root(&segment.root, false)
// .await?;
self.put_segment_with_maybe_tx(segment, maybe_tx).await
}
// self.put_segment_with_maybe_tx(segment, maybe_tx).await
// }
async fn put_segment_with_maybe_tx(
&self,
segment: SegmentWithProof,
maybe_tx: Option<Transaction>,
) -> RpcResult<()> {
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
// async fn put_segment_with_maybe_tx(
// &self,
// segment: SegmentWithProof,
// maybe_tx: Option<Transaction>,
// ) -> RpcResult<()> {
// self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
if let Some(tx) = &maybe_tx {
if tx.data_merkle_root != segment.root {
return Err(error::internal_error("data root and tx seq not match"));
}
}
// if let Some(tx) = &maybe_tx {
// if tx.data_merkle_root != segment.root {
// return Err(error::internal_error("data root and tx seq not match"));
// }
// }
let mut need_cache = false;
if self
.ctx
.chunk_pool
.check_already_has_cache(&segment.root)
.await
{
need_cache = true;
}
// let mut need_cache = false;
// if self
// .ctx
// .chunk_pool
// .check_already_has_cache(&segment.root)
// .await
// {
// need_cache = true;
// }
if !need_cache {
need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
}
// if !need_cache {
// need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
// }
segment.validate(self.ctx.config.chunks_per_segment)?;
// segment.validate(self.ctx.config.chunks_per_segment)?;
let seg_info = SegmentInfo {
root: segment.root,
seg_data: segment.data,
seg_proof: segment.proof,
seg_index: segment.index,
chunks_per_segment: self.ctx.config.chunks_per_segment,
};
// let seg_info = SegmentInfo {
// root: segment.root,
// seg_data: segment.data,
// seg_proof: segment.proof,
// seg_index: segment.index,
// chunks_per_segment: self.ctx.config.chunks_per_segment,
// };
if need_cache {
self.ctx.chunk_pool.cache_chunks(seg_info).await?;
} else {
let file_id = FileID {
root: seg_info.root,
tx_id: maybe_tx.unwrap().id(),
};
self.ctx
.chunk_pool
.write_chunks(seg_info, file_id, segment.file_size)
.await?;
}
Ok(())
}
// if need_cache {
// self.ctx.chunk_pool.cache_chunks(seg_info).await?;
// } else {
// let file_id = FileID {
// root: seg_info.root,
// tx_id: maybe_tx.unwrap().id(),
// };
// self.ctx
// .chunk_pool
// .write_chunks(seg_info, file_id, segment.file_size)
// .await?;
// }
// Ok(())
// }
async fn get_segment_by_tx_seq(
&self,
@ -436,61 +443,3 @@ impl RpcServerImpl {
}))
}
}
enum SegmentIndex {
Single(usize),
Range(usize, usize), // [start, end]
}
impl Debug for SegmentIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
match self {
Self::Single(val) => write!(f, "{}", val),
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
}
}
}
struct SegmentIndexArray {
items: Vec<SegmentIndex>,
}
impl Debug for SegmentIndexArray {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
match self.items.first() {
None => write!(f, "NULL"),
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
_ => write!(f, "{:?}", self.items),
}
}
}
impl SegmentIndexArray {
fn new(segments: &[SegmentWithProof]) -> Self {
let mut items = Vec::new();
let mut current = match segments.first() {
None => return SegmentIndexArray { items },
Some(seg) => SegmentIndex::Single(seg.index),
};
for index in segments.iter().skip(1).map(|seg| seg.index) {
match current {
SegmentIndex::Single(val) if val + 1 == index => {
current = SegmentIndex::Range(val, index)
}
SegmentIndex::Range(start, end) if end + 1 == index => {
current = SegmentIndex::Range(start, index)
}
_ => {
items.push(current);
current = SegmentIndex::Single(index);
}
}
}
items.push(current);
SegmentIndexArray { items }
}
}

View File

@ -0,0 +1,62 @@
use crate::types::SegmentWithProof as RpcSegment;
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcService;
use crate::zgs_grpc_proto::{Empty, PingReply, PingRequest, UploadSegmentsByTxSeqRequest};
use crate::{rpc_helper, Context, SegmentIndexArray};
pub struct ZgsGrpcServiceImpl {
pub ctx: Context,
}
#[tonic::async_trait]
impl ZgsGrpcService for ZgsGrpcServiceImpl {
async fn ping(
&self,
request: tonic::Request<PingRequest>,
) -> Result<tonic::Response<PingReply>, tonic::Status> {
let msg = request.into_inner().message;
let reply = PingReply {
message: format!("Echo: {}", msg),
};
Ok(tonic::Response::new(reply))
}
async fn upload_segments_by_tx_seq(
&self,
request: tonic::Request<UploadSegmentsByTxSeqRequest>,
) -> Result<tonic::Response<Empty>, tonic::Status> {
let req = request.into_inner();
let segments = req.segments;
let tx_seq = req.tx_seq;
let rpc_segments = segments
.into_iter()
.map(|s| {
RpcSegment::try_from(s)
.map_err(|e| tonic::Status::invalid_argument(format!("Invalid segment: {}", e)))
})
.collect::<Result<Vec<_>, _>>()?;
let indices = SegmentIndexArray::new(&rpc_segments);
info!(%tx_seq, ?indices, "grpc_zgs_uploadSegmentsByTxSeq");
let maybe_tx = self
.ctx
.log_store
.get_tx_by_seq_number(tx_seq)
.await
.map_err(|e| {
tonic::Status::internal(format!(
"Failed to get transaction by sequence number: {}",
e
))
})?;
for segment in rpc_segments.into_iter() {
rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx.clone())
.await
.map_err(|e| tonic::Status::internal(format!("Failed to put segment: {}", e)))?;
}
// Return an empty response
Ok(tonic::Response::new(Empty {}))
}
}

View File

@ -0,0 +1 @@
pub mod r#impl;

View File

@ -2,7 +2,7 @@ mod proof;
use anyhow::{anyhow, bail, Error};
use append_merkle::{
AppendMerkleTree, Proof as RawProof, RangeProof as RawRangeProof, Sha3Algorithm,
AppendMerkleTree, OptionalHash, Proof as RawProof, RangeProof as RawRangeProof, Sha3Algorithm,
};
use ethereum_types::{Address, H256, U256};
use merkle_light::merkle::MerkleTree;
@ -32,7 +32,7 @@ pub type DataRoot = H256;
pub type FlowProof = RawProof<H256>;
pub type FlowRangeProof = RawRangeProof<H256>;
pub type Merkle = AppendMerkleTree<H256, Sha3Algorithm>;
pub type Merkle = AppendMerkleTree<OptionalHash, Sha3Algorithm>;
// Each chunk is 32 bytes.
pub const CHUNK_SIZE: usize = 256;

View File

@ -9,6 +9,7 @@ pub fn cli_app() -> Command {
arg!(--"blockchain-rpc-endpoint" [URL] "Sets blockchain RPC endpoint (Default: http://127.0.0.1:8545)")
)
.arg(arg!(--"db-max-num-chunks" [NUM] "Sets the max number of chunks to store in db (Default: None)"))
.arg(arg!(--"network-enr-address" [URL] "Sets the network ENR address (Default: None)"))
.allow_external_subcommands(true)
.version(zgs_version::VERSION)
}

View File

@ -298,7 +298,7 @@ impl ClientBuilder {
mine_service_sender: mine_send,
};
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx)
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx.clone())
.await
.map_err(|e| format!("Unable to start HTTP RPC server: {:?}", e))?;
@ -307,6 +307,15 @@ impl ClientBuilder {
executor.spawn(admin_rpc_handle, "rpc_admin");
}
executor.spawn(
async move {
rpc::run_grpc_server(ctx.clone())
.await
.expect("Failed to start gRPC server");
},
"grpc",
);
Ok(self)
}

View File

@ -12,7 +12,9 @@ use crate::log_store::{
use crate::{try_option, ZgsKeyValueDB};
use any::Any;
use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeRead, NodeDatabase, NodeTransaction};
use append_merkle::{
AppendMerkleTree, MerkleTreeRead, NodeDatabase, NodeTransaction, OptionalHash,
};
use itertools::Itertools;
use kvdb::DBTransaction;
use parking_lot::RwLock;
@ -72,7 +74,8 @@ impl FlowStore {
batch_index
)
})?;
merkle.gen_proof(sector_index)
let optional_proof = merkle.gen_proof(sector_index)?;
AppendMerkleTree::convert_proof_to_h256(optional_proof)
}
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
@ -577,12 +580,12 @@ fn layer_size_key(layer: usize) -> Vec<u8> {
pub struct NodeDBTransaction(DBTransaction);
impl NodeDatabase<DataRoot> for FlowDBStore {
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<DataRoot>> {
impl NodeDatabase<OptionalHash> for FlowDBStore {
fn get_node(&self, layer: usize, pos: usize) -> Result<Option<OptionalHash>> {
Ok(self
.kvdb
.get(COL_FLOW_MPT_NODES, &encode_mpt_node_key(layer, pos))?
.map(|v| DataRoot::from_slice(&v)))
.map(|v| OptionalHash::from_bytes(v.as_slice().try_into().unwrap()).unwrap()))
}
fn get_layer_size(&self, layer: usize) -> Result<Option<usize>> {
@ -592,11 +595,11 @@ impl NodeDatabase<DataRoot> for FlowDBStore {
}
}
fn start_transaction(&self) -> Box<dyn NodeTransaction<DataRoot>> {
fn start_transaction(&self) -> Box<dyn NodeTransaction<OptionalHash>> {
Box::new(NodeDBTransaction(self.kvdb.transaction()))
}
fn commit(&self, tx: Box<dyn NodeTransaction<DataRoot>>) -> Result<()> {
fn commit(&self, tx: Box<dyn NodeTransaction<OptionalHash>>) -> Result<()> {
let db_tx: Box<NodeDBTransaction> = tx
.into_any()
.downcast()
@ -605,21 +608,21 @@ impl NodeDatabase<DataRoot> for FlowDBStore {
}
}
impl NodeTransaction<DataRoot> for NodeDBTransaction {
fn save_node(&mut self, layer: usize, pos: usize, node: &DataRoot) {
impl NodeTransaction<OptionalHash> for NodeDBTransaction {
fn save_node(&mut self, layer: usize, pos: usize, node: &OptionalHash) {
self.0.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(layer, pos),
node.as_bytes(),
&node.as_bytes(),
);
}
fn save_node_list(&mut self, nodes: &[(usize, usize, &DataRoot)]) {
fn save_node_list(&mut self, nodes: &[(usize, usize, &OptionalHash)]) {
for (layer_index, position, data) in nodes {
self.0.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(*layer_index, *position),
data.as_bytes(),
&data.as_bytes(),
);
}
}

View File

@ -204,9 +204,9 @@ impl EntryBatch {
}
}
}
Ok(Some(
try_option!(self.to_merkle_tree(is_first_chunk)?).root(),
))
Ok(try_option!(self.to_merkle_tree(is_first_chunk)?)
.root()
.into())
}
pub fn submit_seal_result(&mut self, answer: SealAnswer) -> Result<()> {
@ -243,7 +243,7 @@ impl EntryBatch {
pub fn to_merkle_tree(&self, is_first_chunk: bool) -> Result<Option<Merkle>> {
let initial_leaves = if is_first_chunk {
vec![H256::zero()]
vec![H256::zero().into()]
} else {
vec![]
};
@ -256,7 +256,7 @@ impl EntryBatch {
);
merkle.append_list(data_to_merkle_leaves(&leaf_data).expect("aligned"));
}
merkle.append_subtree(subtree.subtree_height, subtree.root)?;
merkle.append_subtree(subtree.subtree_height, subtree.root.into())?;
}
if merkle.leaves() != SECTORS_PER_LOAD {
let leaf_data = try_option!(

View File

@ -9,7 +9,7 @@ use crate::log_store::{
};
use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm};
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, OptionalHash, Sha3Algorithm};
use ethereum_types::H256;
use kvdb_rocksdb::{Database, DatabaseConfig};
use merkle_light::merkle::{log2_pow2, MerkleTree};
@ -55,13 +55,10 @@ const PAD_DELAY: Duration = Duration::from_secs(2);
// Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20;
static PAD_SEGMENT_ROOT: Lazy<H256> = Lazy::new(|| {
Merkle::new(
data_to_merkle_leaves(&[0; ENTRY_SIZE * PORA_CHUNK_SIZE]).unwrap(),
0,
None,
)
.root()
static PAD_SEGMENT_ROOT: Lazy<OptionalHash> = Lazy::new(|| {
let h256_leaves = data_to_merkle_leaves(&[0; ENTRY_SIZE * PORA_CHUNK_SIZE]).unwrap();
Merkle::new(h256_leaves, 0, None).root()
});
pub struct UpdateFlowMessage {
pub pad_data: usize,
@ -130,7 +127,8 @@ impl MerkleManager {
fn try_initialize(&mut self, flow_store: &FlowStore) -> Result<()> {
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
self.last_chunk_merkle.append(H256::zero());
self.last_chunk_merkle
.append(OptionalHash::some(H256::zero()));
self.pora_chunks_merkle
.update_last(self.last_chunk_merkle.root());
} else if self.last_chunk_merkle.leaves() != 0 {
@ -222,9 +220,17 @@ impl LogStoreChunkWrite for LogManager {
self.append_entries(flow_entry_array, &mut merkle)?;
if let Some(file_proof) = maybe_file_proof {
// Convert H256 proof to OptionalHash proof
let optional_proof = AppendMerkleTree::convert_proof_from_h256(file_proof)?;
// Convert H256 merkle nodes to OptionalHash merkle nodes
let optional_nodes: Vec<(usize, OptionalHash)> = tx
.merkle_nodes
.into_iter()
.map(|(depth, hash)| (depth, OptionalHash::some(hash)))
.collect();
merkle.pora_chunks_merkle.fill_with_file_proof(
file_proof,
tx.merkle_nodes,
optional_proof,
optional_nodes,
tx.start_entry_index,
)?;
}
@ -291,6 +297,7 @@ impl LogStoreWrite for LogManager {
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
if self.check_tx_completed(old_tx_seq)? {
// copy and finalize once, then stop
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
}
}
@ -315,8 +322,18 @@ impl LogStoreWrite for LogManager {
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
// Check if there are other same-root transaction not finalized.
if same_root_seq_list.first() == Some(&tx_seq) {
// If this is the first tx with this data root, copy and finalize all same-root txs.
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
} else {
// If this is not the first tx with this data root, and the first one is not finalized.
let maybe_first_seq = same_root_seq_list.first().cloned();
if let Some(first_seq) = maybe_first_seq {
if !self.check_tx_completed(first_seq)? {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
}
}
}
self.tx_store.finalize_tx(tx_seq)?;
Ok(())
} else {
@ -346,14 +363,25 @@ impl LogStoreWrite for LogManager {
// TODO: Should we double check the tx merkle root?
let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
self.tx_store.finalize_tx(tx_seq)?;
let same_root_seq_list = self
.tx_store
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
// Check if there are other same-root transaction not finalized.
if same_root_seq_list.first() == Some(&tx_seq) {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
} else {
// If this is not the first tx with this data root, copy and finalize the first one.
let maybe_first_seq = same_root_seq_list.first().cloned();
if let Some(first_seq) = maybe_first_seq {
if !self.check_tx_completed(first_seq)? {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
}
}
}
self.tx_store.finalize_tx(tx_seq)?;
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
Ok(true)
} else {
@ -402,9 +430,9 @@ impl LogStoreWrite for LogManager {
// `merkle` is used in `validate_range_proof`.
let mut merkle = self.merkle.write();
if valid {
merkle
.pora_chunks_merkle
.fill_with_range_proof(data.proof.clone())?;
merkle.pora_chunks_merkle.fill_with_range_proof(
AppendMerkleTree::convert_range_proof_from_h256(data.proof.clone())?,
)?;
}
Ok(valid)
}
@ -615,7 +643,7 @@ impl LogStoreRead for LogManager {
let tx = self
.get_tx_by_seq_number(tx_seq)?
.ok_or_else(|| anyhow!("tx missing"))?;
let leaves = data_to_merkle_leaves(&data.chunks.data)?;
let leaves = data_to_merkle_leaves_h256(&data.chunks.data)?;
data.proof.validate::<Sha3Algorithm>(
&leaves,
(data.chunks.start_index + tx.start_entry_index) as usize,
@ -624,7 +652,7 @@ impl LogStoreRead for LogManager {
.merkle
.read_recursive()
.pora_chunks_merkle
.check_root(&data.proof.root()))
.check_root(&data.proof.root().into()))
}
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>> {
@ -664,7 +692,7 @@ impl LogStoreRead for LogManager {
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
let merkle = self.merkle.read_recursive();
Ok((
merkle.pora_chunks_merkle.root(),
merkle.pora_chunks_merkle.root().unwrap(),
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
))
}
@ -849,7 +877,9 @@ impl LogManager {
None => self.gen_proof_at_version(flow_index, None),
Some(root) => {
let merkle = self.merkle.read_recursive();
let tx_seq = merkle.pora_chunks_merkle.tx_seq_at_root(&root)?;
let tx_seq = merkle
.pora_chunks_merkle
.tx_seq_at_root(&OptionalHash::from(root))?;
self.gen_proof_at_version(flow_index, Some(tx_seq))
}
}
@ -863,11 +893,15 @@ impl LogManager {
let merkle = self.merkle.read_recursive();
let seg_index = sector_to_segment(flow_index);
let top_proof = match maybe_tx_seq {
None => merkle.pora_chunks_merkle.gen_proof(seg_index)?,
Some(tx_seq) => merkle
.pora_chunks_merkle
.at_version(tx_seq)?
.gen_proof(seg_index)?,
None => AppendMerkleTree::convert_proof_to_h256(
merkle.pora_chunks_merkle.gen_proof(seg_index)?,
)?,
Some(tx_seq) => AppendMerkleTree::convert_proof_to_h256(
merkle
.pora_chunks_merkle
.at_version(tx_seq)?
.gen_proof(seg_index)?,
)?,
};
// TODO(zz): Maybe we can decide that all proofs are at the PoRA chunk level, so
@ -884,13 +918,17 @@ impl LogManager {
.gen_proof_in_batch(seg_index, flow_index as usize % PORA_CHUNK_SIZE)?
} else {
match maybe_tx_seq {
None => merkle
.last_chunk_merkle
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
Some(tx_version) => merkle
.last_chunk_merkle
.at_version(tx_version)?
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
None => AppendMerkleTree::convert_proof_to_h256(
merkle
.last_chunk_merkle
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
)?,
Some(tx_version) => AppendMerkleTree::convert_proof_to_h256(
merkle
.last_chunk_merkle
.at_version(tx_version)?
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
)?,
}
};
entry_proof(&top_proof, &sub_proof)
@ -916,9 +954,10 @@ impl LogManager {
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
merkle
.last_chunk_merkle
.append_subtree(subtree_depth, subtree_root)?;
.append_subtree(subtree_depth, OptionalHash::some(subtree_root))?;
if merkle.last_chunk_merkle.leaves() == subtree_size {
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
merkle
.pora_chunks_merkle
.append_subtree(1, merkle.last_chunk_merkle.root())?;
@ -938,9 +977,10 @@ impl LogManager {
// the chunks boundary.
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
assert!(subtree_size >= PORA_CHUNK_SIZE);
merkle
.pora_chunks_merkle
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
merkle.pora_chunks_merkle.append_subtree(
subtree_depth - log2_pow2(PORA_CHUNK_SIZE),
OptionalHash::some(subtree_root),
)?;
}
}
@ -975,9 +1015,8 @@ impl LogManager {
let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad {
is_full_empty = false;
merkle
.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?);
let pad_leaves = data_to_merkle_leaves(&pad_data)?;
merkle.last_chunk_merkle.append_list(pad_leaves);
merkle
.pora_chunks_merkle
.update_last(merkle.last_chunk_merkle.root());
@ -985,9 +1024,8 @@ impl LogManager {
if last_chunk_pad != 0 {
is_full_empty = false;
// Pad the last chunk.
merkle
.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
let last_chunk_leaves = data_to_merkle_leaves(&pad_data[..last_chunk_pad])?;
merkle.last_chunk_merkle.append_list(last_chunk_leaves);
merkle
.pora_chunks_merkle
.update_last(merkle.last_chunk_merkle.root());
@ -997,7 +1035,7 @@ impl LogManager {
// Pad with more complete chunks.
let mut start_index = last_chunk_pad / ENTRY_SIZE;
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
merkle.pora_chunks_merkle.append(PAD_SEGMENT_ROOT.clone());
start_index += PORA_CHUNK_SIZE;
}
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
@ -1082,7 +1120,7 @@ impl LogManager {
if chunk_index < merkle.pora_chunks_merkle.leaves() as u64 {
merkle
.pora_chunks_merkle
.fill_leaf(chunk_index as usize, chunk_root);
.fill_leaf(chunk_index as usize, OptionalHash::some(chunk_root));
} else {
// TODO(zz): This assumption may be false in the future.
unreachable!("We always insert tx nodes before put_chunks");
@ -1173,8 +1211,8 @@ impl LogManager {
let mut to_tx_offset_list = Vec::with_capacity(to_tx_seq_list.len());
for seq in to_tx_seq_list {
// No need to copy data for completed tx.
if self.check_tx_completed(seq)? {
// No need to copy data for completed tx and itself
if self.check_tx_completed(seq)? || from_tx_seq == seq {
continue;
}
let tx = self
@ -1231,7 +1269,7 @@ impl LogManager {
let mut to_insert_subtrees = Vec::new();
let mut start_index = 0;
for (subtree_height, root) in subtree_list {
to_insert_subtrees.push((start_index, subtree_height, root));
to_insert_subtrees.push((start_index, subtree_height, root.unwrap()));
start_index += 1 << (subtree_height - 1);
}
self.flow_store
@ -1279,14 +1317,14 @@ macro_rules! try_option {
/// This should be called with input checked.
pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
Ok(FileMerkleTree::new(
data_to_merkle_leaves(leaf_data)?
data_to_merkle_leaves_h256(leaf_data)?
.into_iter()
.map(|h| h.0)
.collect::<Vec<[u8; 32]>>(),
))
}
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<OptionalHash>> {
let start_time = Instant::now();
if leaf_data.len() % ENTRY_SIZE != 0 {
bail!("merkle_tree: mismatched data size");
@ -1309,6 +1347,12 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
Ok(r)
}
/// Convenience function that combines data_to_merkle_leaves and conversion to H256
pub fn data_to_merkle_leaves_h256(leaf_data: &[u8]) -> Result<Vec<H256>> {
let optional_hashes = data_to_merkle_leaves(leaf_data)?;
Ok(optional_hashes.into_iter().map(|oh| oh.unwrap()).collect())
}
pub fn bytes_to_entries(size_bytes: u64) -> u64 {
if size_bytes % ENTRY_SIZE as u64 == 0 {
size_bytes / ENTRY_SIZE as u64

View File

@ -1,9 +1,9 @@
use crate::log_store::log_manager::{
data_to_merkle_leaves, sub_merkle_tree, tx_subtree_root_list_padded, LogConfig, LogManager,
PORA_CHUNK_SIZE,
data_to_merkle_leaves, data_to_merkle_leaves_h256, sub_merkle_tree,
tx_subtree_root_list_padded, LogConfig, LogManager, PORA_CHUNK_SIZE,
};
use crate::log_store::{LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite};
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
use append_merkle::{Algorithm, AppendMerkleTree, MerkleTreeRead, OptionalHash, Sha3Algorithm};
use ethereum_types::H256;
use rand::random;
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
@ -22,11 +22,17 @@ fn test_put_get() {
data[i * CHUNK_SIZE] = random();
}
let (padded_chunks, _) = compute_padded_chunk_size(data_size);
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap());
let mut merkle = AppendMerkleTree::<OptionalHash, Sha3Algorithm>::new(
vec![OptionalHash::some(H256::zero())],
0,
None,
);
let padding_leaves = data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap();
merkle.append_list(padding_leaves);
let mut data_padded = data.clone();
data_padded.append(&mut vec![0u8; CHUNK_SIZE]);
merkle.append_list(data_to_merkle_leaves(&data_padded).unwrap());
let data_leaves = data_to_merkle_leaves(&data_padded).unwrap();
merkle.append_list(data_leaves);
merkle.commit(Some(0));
let tx_merkle = sub_merkle_tree(&data).unwrap();
let tx = Transaction {
@ -78,16 +84,17 @@ fn test_put_get() {
.unwrap()
.unwrap();
assert_eq!(chunk_with_proof.chunk, chunk_array.chunk_at(i).unwrap());
assert_eq!(
chunk_with_proof.proof,
merkle.gen_proof(i + start_offset).unwrap()
merkle.gen_proof_h256(i + start_offset).unwrap()
);
let r = chunk_with_proof.proof.validate::<Sha3Algorithm>(
&Sha3Algorithm::leaf(&chunk_with_proof.chunk.0),
i + start_offset,
);
assert!(r.is_ok(), "proof={:?} \n r={:?}", chunk_with_proof.proof, r);
assert!(merkle.check_root(&chunk_with_proof.proof.root()));
assert!(merkle.check_root(&chunk_with_proof.proof.root().into()));
}
for i in (0..chunk_count).step_by(PORA_CHUNK_SIZE / 3) {
let end = std::cmp::min(i + PORA_CHUNK_SIZE, chunk_count);
@ -102,7 +109,7 @@ fn test_put_get() {
assert!(chunk_array_with_proof
.proof
.validate::<Sha3Algorithm>(
&data_to_merkle_leaves(&chunk_array_with_proof.chunks.data).unwrap(),
&data_to_merkle_leaves_h256(&chunk_array_with_proof.chunks.data).unwrap(),
i + start_offset
)
.is_ok());
@ -119,12 +126,12 @@ fn test_root() {
}
let mt = sub_merkle_tree(&data).unwrap();
println!("{:?} {}", mt.root(), hex::encode(mt.root()));
let append_mt = AppendMerkleTree::<H256, Sha3Algorithm>::new(
let append_mt = AppendMerkleTree::<OptionalHash, Sha3Algorithm>::new(
data_to_merkle_leaves(&data).unwrap(),
0,
None,
);
assert_eq!(mt.root(), append_mt.root().0);
assert_eq!(mt.root(), append_mt.root().unwrap().0);
}
}

View File

@ -6,7 +6,7 @@ use crate::log_store::log_manager::{
use crate::log_store::metrics;
use crate::{try_option, LogManager, ZgsKeyValueDB};
use anyhow::{anyhow, Result};
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
use append_merkle::{AppendMerkleTree, MerkleTreeRead, OptionalHash, Sha3Algorithm};
use ethereum_types::H256;
use merkle_light::merkle::log2_pow2;
use shared_types::{DataRoot, Transaction};
@ -329,7 +329,7 @@ impl TransactionStore {
&self,
pora_chunk_index: usize,
mut tx_seq: u64,
) -> Result<AppendMerkleTree<H256, Sha3Algorithm>> {
) -> Result<AppendMerkleTree<OptionalHash, Sha3Algorithm>> {
let last_chunk_start_index = pora_chunk_index as u64 * PORA_CHUNK_SIZE as u64;
let mut tx_list = Vec::new();
// Find the first tx within the last chunk.
@ -384,9 +384,13 @@ impl TransactionStore {
}
let mut merkle = if last_chunk_start_index == 0 {
// The first entry hash is initialized as zero.
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(vec![H256::zero()], 1, None)
AppendMerkleTree::<OptionalHash, Sha3Algorithm>::new_with_depth(
vec![H256::zero().into()],
1,
None,
)
} else {
AppendMerkleTree::<H256, Sha3Algorithm>::new_with_depth(
AppendMerkleTree::<OptionalHash, Sha3Algorithm>::new_with_depth(
vec![],
log2_pow2(PORA_CHUNK_SIZE) + 1,
None,
@ -400,9 +404,12 @@ impl TransactionStore {
cmp::min(first_subtree, PORA_CHUNK_SIZE) - (merkle.leaves() % first_subtree);
merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(pad_len))?);
}
// Since we are building the last merkle with a given last tx_seq, it's ensured
// that appending subtrees will not go beyond the max size.
merkle.append_subtree_list(subtree_list)?;
// Convert H256 to OptionalHash for append_subtree_list
let subtree_list_optional_hash = subtree_list
.into_iter()
.map(|(depth, hash)| (depth, hash.into()))
.collect();
merkle.append_subtree_list(subtree_list_optional_hash)?;
merkle.commit(Some(tx_seq));
}
Ok(merkle)

View File

@ -0,0 +1,361 @@
# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
#######################################################################
### Network Config Options ###
#######################################################################
# Data directory where node's keyfile is stored.
# network_dir = "network"
# IP address to listen on.
# network_listen_address = "0.0.0.0"
# The address to broadcast to peers about which address we are listening on. Generally,
# configure public IP address for UDP discovery. If not specified, program will try to
# detect public IP address automatically.
# network_enr_address = ""
# The tcp port to broadcast to peers in order to reach back for libp2p services.
# network_enr_tcp_port = 1234
# The udp port to broadcast to peers in order to reach back for discovery.
# network_enr_udp_port = 1234
# The TCP port that libp2p listens on.
# network_libp2p_port = 1234
# UDP port that discovery listens on.
# network_discovery_port = 1234
# Target number of connected peers. can be 100
# network_target_peers = 50
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
# configured as well to enable UDP discovery.
network_boot_nodes = ["/ip4/34.66.131.173/udp/1234/p2p/16Uiu2HAmG81UgZ1JJLx9T2HqELgJNP36ChHzYkCdA9HdxvAbb5jQ","/ip4/34.60.163.4/udp/1234/p2p/16Uiu2HAmL3DoA7e7mbxs7CkeCPtNrAcfJFFtLpJDr2HWuR6QwJ8k","/ip4/34.169.236.186/udp/1234/p2p/16Uiu2HAm489RdhEgZUFmNTR4jdLEE4HjrvwaPCkEpSYSgvqi1CbR","/ip4/34.71.110.60/udp/1234/p2p/16Uiu2HAmBfGfbLNRegcqihiuXhgSXWNpgiGm6EwW2SYexfPUNUHQ"]
# List of libp2p nodes to initially connect to.
# network_libp2p_nodes = []
# Indicates if the user has set the network to be in private mode. Currently this
# prevents sending client identifying information over identify.
# network_private = false
# Disables the discovery protocol from starting.
# network_disable_discovery = false
#######################################################################
### UDP Discovery Config Options ###
#######################################################################
# The request timeout for each UDP request.
# discv5_request_timeout_secs = 5
# The timeout after which a `QueryPeer` in an ongoing query is marked unresponsive.
# Unresponsive peers don't count towards the parallelism limits for a query.
# Hence, we may potentially end up making more requests to good peers.
# discv5_query_peer_timeout_secs = 2
# The number of retries for each UDP request.
# discv5_request_retries = 1
# The number of peers to request in parallel in a single query.
# discv5_query_parallelism = 5
# Reports all discovered ENR's when traversing the DHT to the event stream.
# discv5_report_discovered_peers = false
# Disables the incoming packet filter.
# discv5_disable_packet_filter = false
# Disable to limit the number of IP addresses from the same
# /24 subnet in the kbuckets table. This is to mitigate eclipse attacks.
# discv5_disable_ip_limit = false
#######################################################################
### Log Sync Config Options ###
#######################################################################
# RPC endpoint to sync event logs on EVM compatible blockchain.
# blockchain_rpc_endpoint = "http://127.0.0.1:8545"
# Flow contract address to sync event logs.
log_contract_address = "0x62D4144dB0F0a6fBBaeb6296c785C71B3D57C526"
# Block number to sync event logs from blockchain. Generally, this is
# the block number when flow contract deployed.
log_sync_start_block_number = 2387557
# Number of blocks to confirm a transaction.
confirmation_block_count = 1
# Maximum number of event logs to poll at a time.
# log_page_size = 999
# Maximum data size to cache in memory (by default, 100MB).
# max_cache_data_size = 104857600
# TTL to cache data in memory.
# cache_tx_seq_ttl = 500
# The number of retries after a RPC request times out.
# rate_limit_retries = 100
# The nubmer of retries for rate limited responses.
# timeout_retries = 100
# The duration to wait before retry, in ms.
# initial_backoff = 500
# The duration between each paginated getLogs RPC call, in ms.
# This is set to avoid triggering the throttling mechanism in the RPC server.
# recover_query_delay = 50
# The counter assumed the finalized block behind the latest block.
# default_finalized_block_count = 100
# Remove finalized block trigger interval.
# remove_finalized_block_interval_minutes = 30
# Watch_loop (eth_getLogs) trigger interval.
# watch_loop_wait_time_ms = 500
#######################################################################
### Chunk Pool Config Options ###
#######################################################################
# Maximum number of threads to upload segments of a single file simultaneously.
# chunk_pool_write_window_size = 2
# Maximum data size of cached segment in pool (by default, 4MB).
# chunk_pool_max_cached_chunks_all = 4194304
# Maximum number of threads to upload segments for all files simultaneously.
# chunk_pool_max_writings = 64
# Expiration time to cache uploaded segments in memory.
# chunk_pool_expiration_time_secs = 300
#######################################################################
### DB Config Options ###
#######################################################################
# Directory to store data.
# db_dir = "db"
#######################################################################
### Misc Config Options ###
#######################################################################
# Log configuration file.
# log_config_file = "log_config"
# Log directory.
# log_directory = "log"
#######################################################################
### Mine Config Options ###
#######################################################################
# Mine contract address for incentive.
mine_contract_address = "0xCd01c5Cd953971CE4C2c9bFb95610236a7F414fe"
# Miner key is used to sign blockchain transaction for incentive.
# The value should be a hex string of length 64 without 0x prefix.
#
# Note, the corresponding address should have enough tokens to pay
# transaction gas fee.
# miner_key = ""
# Period for querying mine context on chain (in seconds)
#
# Note: During each query period, nodes will issue 3 `eth_call` requests.
# If your blockchain RPC endpoint is a public or priced node, please be
# cautious not to set the period too short.
#
# mine_context_query_seconds = 5
# CPU Usage percentage for PoRA mining. 100 means one CPU core is fully loaded.
#
# miner_cpu_percentage = 100
#######################################################################
### Sharding Config Options ###
#######################################################################
# The max number of chunk entries to store in db.
# Each entry is 256B, so the db size is roughly limited to
# `256 * db_max_num_sectors` Bytes.
# If this limit is reached, the node will update its `shard_position`
# and store only half data.
#
# db_max_num_sectors = 4000000000
# The format is <shard_id>/<shard_number>, where the shard number is 2^n.
# This only applies if there is no stored shard config in db.
# shard_position = "0/1"
reward_contract_address = "0x457aC76B58ffcDc118AABD6DbC63ff9072880870"
# The time interval to check if we should half `shard_position` to prune data.
#
# prune_check_time_s = 60
# The number of chunk entries to delete in a batch when we prune data.
#
# prune_batch_size = 1024
# The time interval to wait between each prune batch deletion to avoid
# IO resource exhaustion.
#
# prune_batch_wait_time_ms = 1000
#######################################################################
### Network Peer DB Config Options ###
#######################################################################
[network_peer_db]
# The maximum number of disconnected nodes to remember.
max_disconnected_peers = 10000
# The maximum number of banned nodes to remember.
max_banned_peers = 10000
#######################################################################
### Router Config Options ###
#######################################################################
[router]
# Timeout to publish file announcements in batch.
# batcher_timeout = "1s"
# Number of files in an announcement to publish in batch.
batcher_file_capacity = 10
# Number of announcements in a pubsub message to publish in batch.
batcher_announcement_capacity = 100
#######################################################################
### File Sync Config Options ###
#######################################################################
[sync]
# Enable file sync among peers automatically. When enabled, each node will store
# all files, and sufficient disk space is required.
auto_sync_enabled = true
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously. to watch, can increase
# max_sync_files = 8
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true
# Maximum number of continous failures to terminate a file sync.
# max_request_failures = 5
# Timeout to dial peers.
# peer_connect_timeout = "15s"
# Timeout to disconnect peers.
# peer_disconnect_timeout = "15s"
# Timeout to find peers via FIND_FILE P2P pubsub message.
# peer_find_timeout = "120s"
# Timeout to download data from remote peer.
# peer_chunks_download_timeout = "15s"
# Maximum network bandwidth (B/s) to sync files. Default value is 0,
# which indicates no limitation. TODO: 50 MBps
# max_bandwidth_bytes = 50 * 1024 * 1024
# Maximum threads to sync files in sequence.
# max_sequential_workers = 0
# Maximum threads to sync files randomly.
# max_random_workers = 2
# Timeout to terminate a file sync in sequence.
# sequential_find_peer_timeout = "60s"
# Timeout to terminate a file sync randomly.
# random_find_peer_timeout = "500s"
#######################################################################
### File Location Cache Options ###
#######################################################################
# [file_location_cache]
# File location cache is a cache that maintains storage positions of files.
# Storage location information is represented by the IP address of the storage node and the timestamp indicating when the node declared that it stores the corresponding file.
# It has both a global capacity limit and a limit on the capacity for location information of each individual file.
# When the cache is full, the storage position information with oldest timestamp will be replaced.
# Global cache capacity.
# max_entries_total = 1000000
# Location information capacity for each file.
# max_entries_per_file = 4
# Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 86400
#######################################################################
### RPC Config Options ###
#######################################################################
[rpc]
# Whether to provide RPC service.
# enabled = true
# HTTP server address to bind for public RPC.
# listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# listen_address_admin = "0.0.0.0:5679"
## Grpc server address to bind
# listen_address_grpc = "0.0.0.0:50051"
# Number of chunks for a single segment.
# chunks_per_segment = 1024
# Maximum data size of RPC request body (by default, 10MB).
# max_request_body_size = 10485760
# Maximum file size that allowed to cache in memory (by default, 10MB).
# max_cache_file_size = 10485760
#######################################################################
### Metrics Options ###
#######################################################################
# [metrics]
# Whether to enable metrics.
# enabled = false
# Interval to output metrics periodically, e.g. "10s", "30s" or "60s".
# report_interval = "10s"
# File name to output metrics periodically.
# file_report_output = ""
# Influxdb configurations to output metrics periodically.
# influxdb_report_host = ""
# influxdb_report_db = ""
# influxdb_report_username = ""
# influxdb_report_password = ""
# Storage node name as a tag.
# influxdb_report_node = ""

View File

@ -305,6 +305,9 @@ auto_sync_enabled = true
# HTTP server address to bind for admin and debug RPC.
# listen_address_admin = "127.0.0.1:5679"
## Grpc server address to bind
# listen_address_grpc = "0.0.0.0:50051"
# Number of chunks for a single segment.
# chunks_per_segment = 1024

View File

@ -33,7 +33,7 @@
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
# configured as well to enable UDP discovery.
network_boot_nodes = ["/ip4/47.251.117.133/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQWu3i1FL1aFYeh9wiQTNu4pDCgps","/ip4/47.76.61.226/udp/1234/p2p/16Uiu2HAm2k6ua2mGgvZ8rTMV8GhpW71aVzkQWy7D37TTDuLCpgmX"]
network_boot_nodes = ["/ip4/35.236.80.213/udp/1234/p2p/16Uiu2HAm1w2Lkr4vsnHUgHiyQBpVXmDuvuLP9SDUZaY5tkZudSME", "/ip4/34.102.76.235/udp/1234/p2p/16Uiu2HAmPQ9WTyYbstNPFX4Va8gH5cfkLJ5fJL9h7U4sgJyaHbcm"]
# List of libp2p nodes to initially connect to.
# network_libp2p_nodes = []
@ -80,14 +80,14 @@ network_boot_nodes = ["/ip4/47.251.117.133/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJ
# blockchain_rpc_endpoint = "http://127.0.0.1:8545"
# Flow contract address to sync event logs.
log_contract_address = "0xbD2C3F0E65eDF5582141C35969d66e34629cC768"
log_contract_address = "0x22E03a6A89B950F1c82ec5e74F8eCa321a105296"
# Block number to sync event logs from blockchain. Generally, this is
# the block number when flow contract deployed.
log_sync_start_block_number = 595059
log_sync_start_block_number = 1
# Number of blocks to confirm a transaction.
# confirmation_block_count = 3
confirmation_block_count = 1
# Maximum number of event logs to poll at a time.
# log_page_size = 999
@ -125,13 +125,13 @@ log_sync_start_block_number = 595059
#######################################################################
# Maximum number of threads to upload segments of a single file simultaneously.
# chunk_pool_write_window_size = 4
chunk_pool_write_window_size = 2
# Maximum data size of cached segment in pool (by default, 4MB).
# chunk_pool_max_cached_chunks_all = 4194304
# Maximum number of threads to upload segments for all files simultaneously.
# chunk_pool_max_writings = 16
chunk_pool_max_writings = 128
# Expiration time to cache uploaded segments in memory.
# chunk_pool_expiration_time_secs = 300
@ -158,7 +158,7 @@ log_sync_start_block_number = 595059
#######################################################################
# Mine contract address for incentive.
mine_contract_address = "0x6815F41019255e00D6F34aAB8397a6Af5b6D806f"
mine_contract_address = "0x00A9E9604b0538e06b268Fb297Df333337f9593b"
# Miner key is used to sign blockchain transaction for incentive.
# The value should be a hex string of length 64 without 0x prefix.
@ -194,7 +194,7 @@ db_max_num_sectors = 4000000000
# This only applies if there is no stored shard config in db.
# shard_position = "0/2"
reward_contract_address = "0x51998C4d486F406a788B766d93510980ae1f9360"
reward_contract_address = "0xA97B57b4BdFEA2D0a25e535bd849ad4e6C440A69"
# The time interval to check if we should half `shard_position` to prune data.
#
# prune_check_time_s = 60
@ -215,10 +215,10 @@ reward_contract_address = "0x51998C4d486F406a788B766d93510980ae1f9360"
# [network_peer_db]
# The maximum number of disconnected nodes to remember.
# max_disconnected_peers = 500
max_disconnected_peers = 10000
# The maximum number of banned nodes to remember.
# max_banned_peers = 1000
max_banned_peers = 10000
#######################################################################
### Router Config Options ###
@ -244,6 +244,10 @@ batcher_announcement_capacity = 100
# all files, and sufficient disk space is required.
auto_sync_enabled = true
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 16
@ -317,6 +321,9 @@ auto_sync_enabled = true
# HTTP server address to bind for admin and debug RPC.
# listen_address_admin = "127.0.0.1:5679"
## Grpc server address to bind
# listen_address_grpc = "0.0.0.0:50051"
# Number of chunks for a single segment.
# chunks_per_segment = 1024

View File

@ -319,6 +319,9 @@
# HTTP server address to bind for admin and debug RPC.
# listen_address_admin = "127.0.0.1:5679"
## Grpc server address to bind
# listen_address_grpc = "0.0.0.0:50051"
# Number of chunks for a single segment.
# chunks_per_segment = 1024

View File

@ -65,11 +65,11 @@ class CliSubmissionTest(TestFramework):
wait_until(lambda: client.zgs_get_file_info(root) is not None)
wait_until(lambda: client.zgs_get_file_info(root)["finalized"])
num_of_entris = bytes_to_entries(size)
if num_of_entris > 1:
start_idx = random.randint(0, num_of_entris - 2)
num_of_entries = bytes_to_entries(size)
if num_of_entries > 1:
start_idx = random.randint(0, num_of_entries - 2)
end_idx = min(
random.randint(start_idx + 1, num_of_entris - 1), start_idx + ENTRY_SIZE
random.randint(start_idx + 1, num_of_entries - 1), start_idx + ENTRY_SIZE
)
assert_equal(
@ -94,9 +94,9 @@ class CliSubmissionTest(TestFramework):
wait_until(lambda: self.nodes[i].zgs_get_file_info(root)["finalized"])
# start_idx = random.randint(0, num_of_entris - 1)
# start_idx = random.randint(0, num_of_entries - 1)
# end_idx = min(
# random.randint(start_idx + 1, num_of_entris), start_idx + ENTRY_SIZE
# random.randint(start_idx + 1, num_of_entries), start_idx + ENTRY_SIZE
# )
assert_equal(

View File

@ -162,7 +162,7 @@ class TestNode:
self.stderr.seek(0)
stderr = self.stderr.read().decode("utf-8").strip()
# TODO: Check how to avoid `pthread lock: Invalid argument`.
if stderr != expected_stderr and stderr != "pthread lock: Invalid argument":
if stderr != expected_stderr and stderr != "pthread lock: Invalid argument" and "pthread_mutex_lock" not in stderr:
# print process status for debug
if self.return_code is None:
self.log.info("Process is still running")

View File

@ -8,6 +8,7 @@ from utility.utils import (
initialize_toml_config,
p2p_port,
rpc_port,
grpc_port,
blockchain_rpc_port,
)
@ -37,6 +38,7 @@ class ZgsNode(TestNode):
libp2p_nodes.append(f"/ip4/127.0.0.1/tcp/{p2p_port(i)}")
rpc_listen_address = f"127.0.0.1:{rpc_port(index)}"
grpc_listen_address = f"127.0.0.1:{grpc_port(index)}"
indexed_config = {
"network_libp2p_port": p2p_port(index),
@ -44,6 +46,7 @@ class ZgsNode(TestNode):
"rpc": {
"listen_address": rpc_listen_address,
"listen_address_admin": rpc_listen_address,
"listen_address_grpc": grpc_listen_address,
},
"network_libp2p_nodes": libp2p_nodes,
"log_contract_address": log_contract_address,

View File

@ -11,7 +11,7 @@ class PortMin:
n = 11000
MAX_NODES = 100
MAX_NODES = 50
MAX_BLOCKCHAIN_NODES = 50
@ -23,30 +23,33 @@ def p2p_port(n):
def rpc_port(n):
return PortMin.n + MAX_NODES + n
def grpc_port(n):
return PortMin.n + 2 * MAX_NODES + n
def blockchain_p2p_port(n):
assert n <= MAX_BLOCKCHAIN_NODES
return PortMin.n + MAX_NODES + MAX_BLOCKCHAIN_NODES + n
return PortMin.n + 3 * MAX_NODES + n
def blockchain_rpc_port(n):
return PortMin.n + MAX_NODES + 2 * MAX_BLOCKCHAIN_NODES + n
return PortMin.n + 3 * MAX_NODES + MAX_BLOCKCHAIN_NODES + n
def blockchain_rpc_port_core(n):
return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
return PortMin.n + 3 * MAX_NODES + 2 * MAX_BLOCKCHAIN_NODES + n
def blockchain_ws_port(n):
return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
return PortMin.n + 3 * MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
def blockchain_rpc_port_tendermint(n):
return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
return PortMin.n + 3 * MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
def pprof_port(n):
return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n
return PortMin.n + 3 * MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):