Compare commits

...

22 Commits

Author SHA1 Message Date
Peter Zhang
c8fbb13443 change to executor 2024-10-03 18:38:18 +08:00
Peter Zhang
7e0da61256 update store usage 2024-10-03 16:42:17 +08:00
Peter Zhang
522900ec32 range root should also add back missing padded data 2024-10-02 20:59:43 +08:00
Peter Zhang
7e19cd7e94 add missing padded subtree leaves to root map when recover the merkle 2024-10-02 20:59:43 +08:00
Peter Zhang
9960598d0e only async full empty segment 2024-10-02 20:59:43 +08:00
Peter Zhang
14d19b812e support async padding 2024-10-02 20:59:43 +08:00
MiniFrenchBread
949462084a
fix: finalize check on tx sync (#220)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* fix: finalize

* fix: check

* fix: end_segment_index

* fix: fmt
2024-09-30 18:43:11 +08:00
MiniFrenchBread
07ac814e57
test: use fixed 0g binary (#218)
* test: use latest 0g binary

* feat: use bug fixed 0g binary
2024-09-30 14:56:38 +08:00
peilun-conflux
69b71fc0b2
Fix issue in reverting the last incomplete tx. (#215) 2024-09-27 10:02:47 +08:00
Bo QIU
ae6ecfec96
reduce default sync threads (#214) 2024-09-26 14:55:00 +08:00
Bo QIU
ad80b22a1b
Optimize rpc config (#213) 2024-09-25 16:48:40 +08:00
Bo QIU
84c415e959
opt zgs version (#211) 2024-09-24 16:42:38 +08:00
Bo QIU
9cde84ae15
Refactor network peer db configs (#209) 2024-09-24 11:59:34 +08:00
Bo QIU
1dd7bf7734
Remove the 0gchain genesis init script for mac os (#208)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Remove the 0gchain genesis init script for mac os

* fix on mac
2024-09-20 19:16:53 +08:00
MiniFrenchBread
5849e9c2ba
fix: finalize file does not need to save (#206)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* fix: finalize file does not need to save

* fix: rust fmt
2024-09-19 19:14:45 +08:00
0g-peterzhb
b7badcda5e
do not fail on pruner first reward chunk revert error (#205) 2024-09-19 17:33:56 +08:00
bruno-valante
1434b94495
Resolve dependency issue from atty (#204)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-09-14 19:06:52 +08:00
bruno-valante
da0e2c7031
Fix issues found in auditions (#203) 2024-09-14 18:19:16 +08:00
peilun-conflux
10bd71046b
Add some input validation for append_merkle. (#202)
* Add some input validation for `append_merkle`.

* Fix clippy.
2024-09-14 17:39:07 +08:00
Bo QIU
a153955246
Add log entry sync info in zgs_getStatus rpc (#200)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-09-14 10:11:12 +08:00
Joel Liu
f878a4849c
Handle cases where sequence is not continuous during catch-up (#199)
* Handle cases where the sequence is not continuous during the catch-up process

* get block hash from rpc if not found in local cache
2024-09-14 09:05:11 +08:00
Bo QIU
a9f5169c15
Allow user to configure network bandwidth for file sync (#198)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-09-13 16:58:27 +08:00
55 changed files with 1003 additions and 570 deletions

389
Cargo.lock generated
View File

@ -157,6 +157,55 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
version = "0.6.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1"
[[package]]
name = "anstyle-parse"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8"
dependencies = [
"anstyle",
"windows-sys 0.52.0",
]
[[package]]
name = "anyhow"
version = "1.0.86"
@ -460,17 +509,6 @@ dependencies = [
"wildmatch",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "auto_impl"
version = "1.2.0"
@ -601,6 +639,9 @@ name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
dependencies = [
"serde",
]
[[package]]
name = "bitmaps"
@ -690,9 +731,9 @@ dependencies = [
[[package]]
name = "bstr"
version = "1.9.1"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706"
checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c"
dependencies = [
"memchr",
"serde",
@ -718,9 +759,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.6.0"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
dependencies = [
"serde",
]
@ -859,7 +900,7 @@ dependencies = [
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.5",
"windows-targets 0.52.6",
]
[[package]]
@ -937,28 +978,30 @@ dependencies = [
[[package]]
name = "clap"
version = "3.2.25"
version = "4.5.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac"
dependencies = [
"atty",
"bitflags 1.3.2",
"clap_builder",
]
[[package]]
name = "clap_builder"
version = "4.5.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"indexmap 1.9.3",
"once_cell",
"strsim 0.10.0",
"termcolor",
"textwrap",
"strsim 0.11.1",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cmake"
@ -1021,6 +1064,12 @@ dependencies = [
"thiserror",
]
[[package]]
name = "colorchoice"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "concurrent-queue"
version = "2.5.0"
@ -1032,11 +1081,12 @@ dependencies = [
[[package]]
name = "config"
version = "0.13.4"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23738e11972c7643e4ec947840fc463b6a571afcd3e735bdfce7d03c7a784aca"
checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be"
dependencies = [
"async-trait",
"convert_case",
"json5",
"lazy_static",
"nom",
@ -1045,7 +1095,7 @@ dependencies = [
"rust-ini",
"serde",
"serde_json",
"toml 0.5.11",
"toml 0.8.14",
"yaml-rust",
]
@ -1068,6 +1118,26 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom 0.2.15",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@ -1082,6 +1152,15 @@ dependencies = [
"serde_json",
]
[[package]]
name = "convert_case"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -1136,19 +1215,19 @@ dependencies = [
[[package]]
name = "criterion"
version = "0.4.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb"
checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f"
dependencies = [
"anes",
"atty",
"cast",
"ciborium",
"clap",
"criterion-plot",
"is-terminal",
"itertools 0.10.5",
"lazy_static",
"num-traits",
"once_cell",
"oorandom",
"plotters",
"rayon",
@ -1611,9 +1690,12 @@ dependencies = [
[[package]]
name = "dlv-list"
version = "0.3.0"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
dependencies = [
"const-random",
]
[[package]]
name = "dns-lookup"
@ -2466,12 +2548,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "funty"
version = "1.1.0"
@ -2729,9 +2805,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "globset"
version = "0.4.14"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57da3b9b5b85bd66f31093f8c408b90a74431672542466497dcbdfdc02034be1"
checksum = "15f1ce686646e7f1e19bf7d5533fe443a45dbfb990e00629110797578b42fb19"
dependencies = [
"aho-corasick",
"bstr",
@ -2854,6 +2930,12 @@ dependencies = [
"ahash 0.7.8",
]
[[package]]
name = "hashbrown"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
[[package]]
name = "hashbrown"
version = "0.14.5"
@ -2921,15 +3003,6 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.9"
@ -3383,6 +3456,23 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "is-terminal"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b"
dependencies = [
"hermit-abi 0.4.0",
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.10.5"
@ -3767,7 +3857,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d"
dependencies = [
"cfg-if",
"windows-targets 0.52.5",
"windows-targets 0.52.6",
]
[[package]]
@ -4912,6 +5002,7 @@ dependencies = [
"directory",
"dirs 4.0.0",
"discv5",
"duration-str",
"error-chain",
"eth2_ssz",
"eth2_ssz_derive",
@ -4929,7 +5020,6 @@ dependencies = [
"lighthouse_metrics",
"lru",
"parking_lot 0.12.3",
"prometheus-client",
"rand 0.8.5",
"regex",
"serde",
@ -5197,20 +5287,14 @@ dependencies = [
[[package]]
name = "ordered-multimap"
version = "0.4.3"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a"
checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e"
dependencies = [
"dlv-list",
"hashbrown 0.12.3",
"hashbrown 0.13.2",
]
[[package]]
name = "os_str_bytes"
version = "6.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
[[package]]
name = "overload"
version = "0.1.1"
@ -5335,7 +5419,7 @@ dependencies = [
"libc",
"redox_syscall 0.5.2",
"smallvec",
"windows-targets 0.52.5",
"windows-targets 0.52.6",
]
[[package]]
@ -6100,19 +6184,6 @@ dependencies = [
"nibble_vec",
]
[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"rdrand",
"winapi",
]
[[package]]
name = "rand"
version = "0.7.3"
@ -6157,21 +6228,6 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.5.1"
@ -6228,15 +6284,6 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@ -6310,15 +6357,6 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]]
name = "reqwest"
version = "0.11.27"
@ -6467,13 +6505,14 @@ dependencies = [
[[package]]
name = "ron"
version = "0.7.1"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a"
checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
dependencies = [
"base64 0.13.1",
"bitflags 1.3.2",
"base64 0.21.7",
"bitflags 2.6.0",
"serde",
"serde_derive",
]
[[package]]
@ -6548,9 +6587,9 @@ dependencies = [
[[package]]
name = "rust-ini"
version = "0.18.0"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df"
checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091"
dependencies = [
"cfg-if",
"ordered-multimap",
@ -7241,8 +7280,9 @@ dependencies = [
"serde_json",
"shared_types",
"static_assertions",
"tempdir",
"task_executor",
"tiny-keccak",
"tokio",
"tracing",
"typenum",
"zgs_seal",
@ -7287,6 +7327,12 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "strum"
version = "0.24.1"
@ -7468,26 +7514,17 @@ dependencies = [
"tracing",
]
[[package]]
name = "tempdir"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8"
dependencies = [
"rand 0.4.6",
"remove_dir_all",
]
[[package]]
name = "tempfile"
version = "3.10.1"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64"
dependencies = [
"cfg-if",
"fastrand 2.1.0",
"once_cell",
"rustix 0.38.34",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@ -7501,21 +7538,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9"
[[package]]
name = "thiserror"
version = "1.0.61"
@ -8235,6 +8257,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "0.8.2"
@ -8508,7 +8536,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.5",
"windows-targets 0.52.6",
]
[[package]]
@ -8526,7 +8554,16 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.5",
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
@ -8546,18 +8583,18 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm 0.52.5",
"windows_aarch64_msvc 0.52.5",
"windows_i686_gnu 0.52.5",
"windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6",
"windows_i686_gnullvm",
"windows_i686_msvc 0.52.5",
"windows_x86_64_gnu 0.52.5",
"windows_x86_64_gnullvm 0.52.5",
"windows_x86_64_msvc 0.52.5",
"windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
]
[[package]]
@ -8568,9 +8605,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
@ -8586,9 +8623,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
@ -8604,15 +8641,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
@ -8628,9 +8665,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
@ -8646,9 +8683,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
@ -8658,9 +8695,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
@ -8676,9 +8713,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winnow"

View File

@ -137,13 +137,20 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
}
/// Return the new merkle root.
pub fn append(&mut self, new_leaf: E) {
if new_leaf == E::null() {
// appending null is not allowed.
return;
}
self.layers[0].push(new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1);
}
pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
if leaf_list.contains(&E::null()) {
// appending null is not allowed.
return;
}
let start_index = self.leaves();
self.layers[0].append(&mut leaf_list);
self.recompute_after_append_leaves(start_index);
@ -155,6 +162,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Other nodes in the subtree will be set to `null` nodes.
/// TODO: Optimize to avoid storing the `null` nodes?
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
if subtree_root == E::null() {
// appending null is not allowed.
bail!("subtree_root is null");
}
let start_index = self.leaves();
self.append_subtree_inner(subtree_depth, subtree_root)?;
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
@ -162,6 +173,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
// appending null is not allowed.
bail!("subtree_list contains null");
}
for (subtree_depth, subtree_root) in subtree_list {
let start_index = self.leaves();
self.append_subtree_inner(subtree_depth, subtree_root)?;
@ -173,6 +188,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Change the value of the last leaf and return the new merkle root.
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
pub fn update_last(&mut self, updated_leaf: E) {
if updated_leaf == E::null() {
// updating to null is not allowed.
return;
}
if self.layers[0].is_empty() {
// Special case for the first data.
self.layers[0].push(updated_leaf);
@ -183,10 +202,12 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
/// Fill an unknown `null` leaf with its real value.
/// Panics if the leaf changes the merkle root or the index is out of range.
/// Panics if the leaf is already set and different or the index is out of range.
/// TODO: Batch computing intermediate nodes.
pub fn fill_leaf(&mut self, index: usize, leaf: E) {
if self.layers[0][index] == E::null() {
if leaf == E::null() {
// fill leaf with null is not allowed.
} else if self.layers[0][index] == E::null() {
self.layers[0][index] = leaf;
self.recompute_after_fill_leaves(index, index + 1);
} else if self.layers[0][index] != leaf {
@ -349,7 +370,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
right_most_nodes.push((layer.len() - 1, layer.last().unwrap().clone()));
}
let root = self.root().clone();
assert_eq!(root, right_most_nodes.last().unwrap().1);
self.delta_nodes_map
.insert(tx_seq, DeltaNodes::new(right_most_nodes));
self.root_to_tx_seq_map.insert(root, tx_seq);

View File

@ -74,10 +74,7 @@ pub trait MerkleTreeRead {
bail!("Not ready to generate proof for leaf_index={}", leaf_index);
}
if self.height() == 1 {
return Ok(Proof::new(
vec![self.root().clone(), self.root().clone()],
vec![],
));
return Proof::new(vec![self.root().clone(), self.root().clone()], vec![]);
}
let mut lemma: Vec<Self::E> = Vec::with_capacity(self.height()); // path + root
let mut path: Vec<bool> = Vec::with_capacity(self.height() - 2); // path - 1
@ -112,7 +109,7 @@ pub trait MerkleTreeRead {
path
);
}
Ok(Proof::new(lemma, path))
Proof::new(lemma, path)
}
fn gen_range_proof(&self, start_index: usize, end_index: usize) -> Result<RangeProof<Self::E>> {

View File

@ -11,9 +11,11 @@ pub struct Proof<T: HashElement> {
impl<T: HashElement> Proof<T> {
/// Creates new MT inclusion proof
pub fn new(hash: Vec<T>, path: Vec<bool>) -> Proof<T> {
assert_eq!(hash.len() - 2, path.len());
Proof { lemma: hash, path }
pub fn new(hash: Vec<T>, path: Vec<bool>) -> Result<Proof<T>> {
if hash.len() != path.len() + 2 {
bail!("hash and path length mismatch");
}
Ok(Proof { lemma: hash, path })
}
pub fn new_empty() -> Proof<T> {
@ -58,10 +60,10 @@ impl<T: HashElement> Proof<T> {
bail!("Invalid proof");
}
if *item != self.item() {
bail!("Proof item unmatch");
bail!("Proof item mismatch");
}
if position != self.position() {
bail!("Proof position unmatch");
bail!("Proof position mismatch");
}
Ok(())
}
@ -88,7 +90,7 @@ impl<T: HashElement> Proof<T> {
/// Return `Vec<(index_in_layer, data)>`.
pub fn proof_nodes_in_tree(&self) -> Vec<(usize, T)> {
let mut r = Vec::with_capacity(self.lemma.len());
let mut r = Vec::with_capacity(self.lemma.len() - 1);
let mut pos = 0;
r.push((0, self.root()));
for (i, is_left) in self.path.iter().rev().enumerate() {
@ -108,7 +110,7 @@ impl<T: HashElement> Proof<T> {
tx_merkle_nodes: Vec<(usize, T)>,
tx_merkle_nodes_size: usize,
) -> Vec<(usize, T)> {
let mut r = Vec::with_capacity(self.lemma.len());
let mut r = Vec::with_capacity(self.path.len());
let mut subtree_pos = 0;
let mut root_pos = 0;
let mut in_subtree = tx_merkle_nodes_size == 1;
@ -222,7 +224,7 @@ impl<E: HashElement> RangeProof<E> {
}
children_layer = parent_layer;
}
assert_eq!(children_layer.len(), 1);
ensure_eq!(children_layer.len(), 1);
let computed_root = children_layer.pop().unwrap();
ensure_eq!(computed_root, self.root());

View File

@ -7,41 +7,42 @@ use target_info::Target;
///
/// ## Example
///
/// `Lighthouse/v1.5.1-67da032+`
/// `v0.5.2` or `v0.5.2-1-67da032+`
pub const VERSION: &str = git_version!(
args = [
"--always",
"--dirty=+",
"--abbrev=7",
// NOTE: using --match instead of --exclude for compatibility with old Git
"--match=thiswillnevermatchlol"
// "--match=thiswillnevermatchlol"
"--tags",
],
prefix = "zgs/v0.0.1-",
// prefix = "zgs/v0.0.1-",
fallback = "unknown"
);
/// Returns `VERSION`, but with platform information appended to the end.
/// Returns `VERSION`, but with `zgs` prefix and platform information appended to the end.
///
/// ## Example
///
/// `zgs/v0.0.1-67da032+/x86_64-linux`
/// `zgs/v0.5.2/x86_64-linux`
pub fn version_with_platform() -> String {
format!("{}/{}-{}", VERSION, Target::arch(), Target::os())
format!("zgs/{}/{}-{}", VERSION, Target::arch(), Target::os())
}
#[cfg(test)]
mod test {
use super::*;
use regex::Regex;
// #[cfg(test)]
// mod test {
// use super::*;
// use regex::Regex;
#[test]
fn version_formatting() {
let re =
Regex::new(r"^zgs/v[0-9]+\.[0-9]+\.[0-9]+(-rc.[0-9])?-[[:xdigit:]]{7}\+?$").unwrap();
assert!(
re.is_match(VERSION),
"version doesn't match regex: {}",
VERSION
);
}
}
// #[test]
// fn version_formatting() {
// let re =
// Regex::new(r"^v[0-9]+\.[0-9]+\.[0-9]+(-rc.[0-9])?-[[:xdigit:]]{7}\+?$").unwrap();
// assert!(
// re.is_match(VERSION),
// "version doesn't match regex: {}",
// VERSION
// );
// }
// }

View File

@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
anyhow = { version = "1.0.58", features = ["backtrace"] }
clap = { version = "3.2.5", features = ["cargo"] }
clap = { version = "4.5.17", features = ["cargo", "string"] }
ctrlc = "3.2.2"
error-chain = "0.12.4"
ethereum-types = "0.14"
@ -35,7 +35,7 @@ chunk_pool = { path = "./chunk_pool" }
itertools = "0.10.5"
serde = { version = "1.0.137", features = ["derive"] }
duration-str = "0.5.1"
config = "0.13.1"
config = "0.14"
public-ip = "0.2"
ethers = "2.0.14"
metrics = { workspace = true }

View File

@ -11,7 +11,7 @@ append_merkle = { path = "../../common/append_merkle" }
async-trait = "0.1.56"
ethereum-types = "0.14"
futures = "0.3.21"
jsonrpsee = { version = "0.14.0", features = ["full"] }
jsonrpsee = { version = "0.14", features = ["full"] }
shared_types = { path = "../shared_types" }
task_executor = { path = "../../common/task_executor" }
tokio = "1.19.2"

View File

@ -145,6 +145,15 @@ impl LogEntryFetcher {
}
};
let log_latest_block_number = match store.get_log_latest_block_number() {
Ok(Some(b)) => b,
Ok(None) => 0,
Err(e) => {
error!("get log latest block number error: e={:?}", e);
0
}
};
if let Some(processed_block_number) = processed_block_number {
let finalized_block_number =
match provider.get_block(BlockNumber::Finalized).await {
@ -168,25 +177,24 @@ impl LogEntryFetcher {
};
if let Some(finalized_block_number) = finalized_block_number {
if processed_block_number >= finalized_block_number {
let mut pending_keys = vec![];
for (key, _) in block_hash_cache.read().await.iter() {
if *key < finalized_block_number {
pending_keys.push(*key);
} else {
break;
}
let safe_block_number = std::cmp::min(
std::cmp::min(log_latest_block_number, finalized_block_number),
processed_block_number,
);
let mut pending_keys = vec![];
for (key, _) in block_hash_cache.read().await.iter() {
if *key < safe_block_number {
pending_keys.push(*key);
} else {
break;
}
}
for key in pending_keys.into_iter() {
if let Err(e) = store.delete_block_hash_by_number(key) {
error!(
"remove block tx for number {} error: e={:?}",
key, e
);
} else {
block_hash_cache.write().await.remove(&key);
}
for key in pending_keys.into_iter() {
if let Err(e) = store.delete_block_hash_by_number(key) {
error!("remove block tx for number {} error: e={:?}", key, e);
} else {
block_hash_cache.write().await.remove(&key);
}
}
}
@ -313,6 +321,7 @@ impl LogEntryFetcher {
&mut progress_reset_history,
watch_loop_wait_time_ms,
&block_hash_cache,
provider.as_ref(),
)
.await;
@ -384,6 +393,10 @@ impl LogEntryFetcher {
);
}
if block.logs_bloom.is_none() {
bail!("block {:?} logs bloom is none", block.number);
}
if from_block_number > 0 && block.parent_hash != parent_block_hash {
// reorg happened
let (parent_block_number, block_hash) = revert_one_block(
@ -412,13 +425,22 @@ impl LogEntryFetcher {
block.number
);
}
if Some(block.parent_hash) != parent_block_hash {
if parent_block_hash.is_none() || Some(block.parent_hash) != parent_block_hash {
bail!(
"parent block hash mismatch, expected {:?}, actual {}",
parent_block_hash,
block.parent_hash
);
}
if block_number == to_block_number && block.hash.is_none() {
bail!("block {:?} hash is none", block.number);
}
if block.logs_bloom.is_none() {
bail!("block {:?} logs bloom is none", block.number);
}
parent_block_hash = block.hash;
blocks.insert(block_number, block);
}
@ -470,7 +492,7 @@ impl LogEntryFetcher {
}
let tx = txs_hm[&log.transaction_index];
if log.transaction_hash != Some(tx.hash) {
if log.transaction_hash.is_none() || log.transaction_hash != Some(tx.hash) {
warn!(
"log tx hash mismatch, log transaction {:?}, block transaction {:?}",
log.transaction_hash,
@ -478,7 +500,9 @@ impl LogEntryFetcher {
);
return Ok(progress);
}
if log.transaction_index != tx.transaction_index {
if log.transaction_index.is_none()
|| log.transaction_index != tx.transaction_index
{
warn!(
"log tx index mismatch, log tx index {:?}, block transaction index {:?}",
log.transaction_index,
@ -565,6 +589,7 @@ async fn check_watch_process(
progress_reset_history: &mut BTreeMap<u64, (Instant, usize)>,
watch_loop_wait_time_ms: u64,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
provider: &Provider<RetryClient<Http>>,
) {
let mut min_received_progress = None;
while let Ok(v) = watch_progress_rx.try_recv() {
@ -626,7 +651,21 @@ async fn check_watch_process(
tokio::time::sleep(Duration::from_secs(RETRY_WAIT_MS)).await;
}
} else {
panic!("parent block {} expect exist", *progress - 1);
warn!(
"get block hash for block {} from RPC, assume there is no org",
*progress - 1
);
match provider.get_block(*progress - 1).await {
Ok(Some(v)) => {
v.hash.expect("parent block hash expect exist");
}
Ok(None) => {
panic!("parent block {} expect exist", *progress - 1);
}
Err(e) => {
panic!("parent block {} expect exist, error {}", *progress - 1, e);
}
}
}
};
}

View File

@ -6,14 +6,16 @@ use ethereum_types::H256;
use ethers::{prelude::Middleware, types::BlockNumber};
use futures::FutureExt;
use jsonrpsee::tracing::{debug, error, warn};
use shared_types::{ChunkArray, Transaction};
use shared_types::{bytes_to_chunks, ChunkArray, Transaction};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::{ShutdownReason, TaskExecutor};
use thiserror::Error;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{oneshot, RwLock};
@ -25,6 +27,17 @@ const RETRY_WAIT_MS: u64 = 500;
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10;
/// Errors while handle data
#[derive(Error, Debug)]
pub enum HandleDataError {
/// Sequence Error
#[error("transaction seq is great than expected, expect block number {0}")]
SeqError(u64),
/// Other Errors
#[error("{0}")]
CommonError(#[from] anyhow::Error),
}
#[derive(Clone, Debug)]
pub enum LogSyncEvent {
/// Chain reorg detected without any operation yet.
@ -189,13 +202,51 @@ impl LogSyncManager {
} else {
// Keep catching-up data until we are close to the latest height.
loop {
log_sync_manager
// wait tx receipt is ready
if let Ok(Some(block)) = log_sync_manager
.log_fetcher
.provider()
.get_block_with_txs(finalized_block_number)
.await
{
if let Some(tx) = block.transactions.first() {
loop {
match log_sync_manager
.log_fetcher
.provider()
.get_transaction_receipt(tx.hash)
.await
{
Ok(Some(_)) => break,
_ => {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
}
}
while let Err(e) = log_sync_manager
.catch_up_data(
executor_clone.clone(),
start_block_number,
finalized_block_number,
)
.await?;
.await
{
match e {
HandleDataError::SeqError(block_number) => {
warn!("seq error occurred, retry from {}", block_number);
start_block_number = block_number;
tokio::time::sleep(Duration::from_secs(1)).await;
}
_ => {
return Err(e.into());
}
}
}
start_block_number = finalized_block_number.saturating_add(1);
let new_finalized_block =
@ -214,6 +265,18 @@ impl LogSyncManager {
warn!("catch_up_end send fails, possibly auto_sync is not enabled");
}
log_sync_manager
.log_fetcher
.start_remove_finalized_block_task(
&executor_clone,
log_sync_manager.store.clone(),
log_sync_manager.block_hash_cache.clone(),
log_sync_manager.config.default_finalized_block_count,
log_sync_manager
.config
.remove_finalized_block_interval_minutes,
);
let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel();
let watch_rx = log_sync_manager.log_fetcher.start_watch(
@ -296,7 +359,7 @@ impl LogSyncManager {
&mut self,
mut rx: UnboundedReceiver<LogFetchProgress>,
watch_progress_tx: &Option<UnboundedSender<u64>>,
) -> Result<()> {
) -> Result<(), HandleDataError> {
let mut log_latest_block_number =
if let Some(block_number) = self.store.get_log_latest_block_number()? {
block_number
@ -362,13 +425,15 @@ impl LogSyncManager {
} else {
continue;
}
} else {
return Err(HandleDataError::SeqError(log_latest_block_number));
}
}
}
if stop {
// Unexpected error.
bail!("log sync write error");
return Err(anyhow!("log sync write error").into());
}
if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
// TODO: Do we need to wait until all receivers are initialized?
@ -413,6 +478,36 @@ impl LogSyncManager {
error!("put_tx data error: e={:?}", e);
return false;
}
} else {
// check if current node need to save at least one segment
let store = self.store.clone();
let shard_config = store.get_shard_config();
let start_segment_index = tx.start_entry_index as usize / PORA_CHUNK_SIZE;
let sector_size = bytes_to_chunks(tx.size as usize);
let end_segment_index = start_segment_index
+ ((sector_size + PORA_CHUNK_SIZE - 1) / PORA_CHUNK_SIZE)
- 1;
let mut can_finalize = false;
if end_segment_index < shard_config.shard_id {
can_finalize = true;
} else {
// check if there is a number N between [start_segment_index, end_segment_index] that satisfy:
// N % num_shard = shard_id
let min_n_gte_start =
(start_segment_index + shard_config.num_shard - 1 - shard_config.shard_id)
/ shard_config.num_shard;
let max_n_lte_end =
(end_segment_index - shard_config.shard_id) / shard_config.num_shard;
if min_n_gte_start > max_n_lte_end {
can_finalize = true;
}
}
if can_finalize {
if let Err(e) = store.finalize_tx_with_hash(tx.seq, tx.hash()) {
error!("finalize file that does not need to store: e={:?}", e);
return false;
}
}
}
self.data_cache.garbage_collect(self.next_tx_seq);
self.next_tx_seq += 1;
@ -447,7 +542,7 @@ impl LogSyncManager {
executor_clone: TaskExecutor,
start_block_number: u64,
finalized_block_number: u64,
) -> Result<()> {
) -> Result<(), HandleDataError> {
if start_block_number < finalized_block_number {
let recover_rx = self.log_fetcher.start_recover(
start_block_number,
@ -457,14 +552,6 @@ impl LogSyncManager {
);
self.handle_data(recover_rx, &None).await?;
}
self.log_fetcher.start_remove_finalized_block_task(
&executor_clone,
self.store.clone(),
self.block_hash_cache.clone(),
self.config.default_finalized_block_count,
self.config.remove_finalized_block_interval_minutes,
);
Ok(())
}
}
@ -489,6 +576,10 @@ async fn get_start_block_number_with_hash(
.get(&block_number)
{
return Ok((block_number, val.block_hash));
} else {
warn!("get block hash for block {} from RPC", block_number);
let block_hash = log_sync_manager.get_block(block_number.into()).await?.1;
return Ok((block_number, block_hash));
}
}

View File

@ -98,7 +98,7 @@ impl MineRangeConfig {
let self_start_position = self.start_position?;
let self_end_position = self.end_position?;
if self.start_position >= self.end_position {
if self_start_position >= self_end_position {
return Some(false);
}
Some(

View File

@ -79,7 +79,7 @@ impl<'a> Miner<'a> {
inc_counter(&LOADING_COUNT);
let MineLoadChunk {
loaded_chunk,
avalibilities,
availabilities,
} = self
.loader
.load_sealed_data(recall_position / SECTORS_PER_LOAD as u64)
@ -92,8 +92,8 @@ impl<'a> Miner<'a> {
.into_iter()
.enumerate()
.zip(scratch_pad.iter().cycle())
.zip(avalibilities.into_iter())
.filter_map(|(data, avaliable)| avaliable.then_some(data))
.zip(availabilities.into_iter())
.filter_map(|(data, availiable)| availiable.then_some(data))
{
inc_counter(&PAD_MIX_COUNT);
// Rust can optimize this loop well.
@ -114,7 +114,7 @@ impl<'a> Miner<'a> {
difficulty_scale_x64.as_u128() as f64 / u64::MAX as f64
);
inc_counter(&HIT_COUNT);
// Undo mix data when find a valid solition
// Undo mix data when find a valid solution
for (x, y) in sealed_data.iter_mut().zip(scratch_pad.iter()) {
*x ^= y;
}
@ -171,7 +171,7 @@ impl<'a> Miner<'a> {
) -> U256 {
let mut hasher = Blake2b512::new();
hasher.update([0u8; 24]);
hasher.update(seal_index.to_be_bytes());
hasher.update((seal_index as u64).to_be_bytes());
hasher.update(pad_seed);
hasher.update([0u8; 32]);

View File

@ -31,12 +31,13 @@ impl RecallRange {
}
pub fn load_position(&self, seed: [u8; 32]) -> Option<u64> {
let (_, origin_recall_offset) = U256::from_big_endian(&seed)
.div_mod(U256::from((self.mining_length as usize) / SECTORS_PER_LOAD));
let origin_recall_offset = U256::from_big_endian(&seed)
.checked_rem(U256::from((self.mining_length as usize) / SECTORS_PER_LOAD))?;
let origin_recall_offset = origin_recall_offset.as_u64();
let recall_offset = (origin_recall_offset & self.shard_mask) | self.shard_id;
Some(self.start_position + recall_offset * SECTORS_PER_LOAD as u64)
self.start_position
.checked_add(recall_offset * SECTORS_PER_LOAD as u64)
}
pub fn difficulty_scale_x64(&self, flow_length: u64) -> U256 {

View File

@ -18,7 +18,7 @@ use crate::watcher::MineContextMessage;
use zgs_spec::{BYTES_PER_SEAL, SECTORS_PER_SEAL};
const SUBMISSION_RETIES: usize = 15;
const SUBMISSION_RETRIES: usize = 15;
pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
@ -154,7 +154,7 @@ impl Submitter {
let pending_transaction: PendingTransaction<'_, _> = submission_call
.send()
.await
.map_err(|e| format!("Fail to send mine answer transaction: {:?}", e))?;
.map_err(|e| format!("Fail to send PoRA submission transaction: {:?}", e))?;
debug!(
"Signed submission transaction hash: {:?}",
@ -162,13 +162,13 @@ impl Submitter {
);
let receipt = pending_transaction
.retries(SUBMISSION_RETIES)
.retries(SUBMISSION_RETRIES)
.interval(Duration::from_secs(2))
.await
.map_err(|e| format!("Fail to execute mine answer transaction: {:?}", e))?
.map_err(|e| format!("Fail to execute PoRA submission transaction: {:?}", e))?
.ok_or(format!(
"Mine answer transaction dropped after {} retries",
SUBMISSION_RETIES
"PoRA submission transaction dropped after {} retries",
SUBMISSION_RETRIES
))?;
info!("Submit PoRA success, receipt: {:?}", receipt);

View File

@ -47,8 +47,6 @@ impl MineContextWatcher {
provider: Arc<MineServiceMiddleware>,
config: &MinerConfig,
) -> broadcast::Receiver<MineContextMessage> {
let provider = provider;
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
let flow_contract = ZgsFlow::new(config.flow_address, provider.clone());

View File

@ -21,7 +21,6 @@ lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
lru = "0.7.7"
parking_lot = "0.12.1"
prometheus-client = "0.16.0"
rand = "0.8.5"
regex = "1.5.6"
serde = { version = "1.0.137", features = ["derive"] }
@ -41,6 +40,7 @@ unsigned-varint = { version = "=0.7.1", features = ["codec"] }
if-addrs = "0.10.1"
slog = "2.7.0"
igd = "0.12.1"
duration-str = "0.5.1"
[dependencies.libp2p]
version = "0.45.1"
@ -49,7 +49,7 @@ features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dn
[dev-dependencies]
exit-future = "0.2.0"
tempfile = "3.3.0"
tempfile = "3.12.0"
tracing-test = "0.2.2"
unused_port = { path = "../../common/unused_port" }
void = "1.0.2"

View File

@ -1,3 +1,4 @@
use crate::peer_manager::peerdb::PeerDBConfig;
use crate::types::GossipKind;
use crate::{Enr, PeerIdSerialized};
use directory::{
@ -126,6 +127,8 @@ pub struct Config {
/// The id of the storage network.
pub network_id: NetworkIdentity,
pub peer_db: PeerDBConfig,
}
impl Default for Config {
@ -204,6 +207,7 @@ impl Default for Config {
topics: Vec::new(),
metrics_enabled: false,
network_id: Default::default(),
peer_db: Default::default(),
}
}
}

View File

@ -74,8 +74,6 @@ impl<'de> Deserialize<'de> for PeerIdSerialized {
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage};
pub use prometheus_client;
pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;
pub use discovery::{CombinedKeyExt, EnrExt};

View File

@ -3,13 +3,15 @@ use crate::{
multiaddr::{Multiaddr, Protocol},
Enr, Gossipsub, PeerId,
};
use duration_str::deserialize_duration;
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
use score::{PeerAction, ReportSource, Score, ScoreState};
use std::cmp::Ordering;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
use std::time::Instant;
use std::{cmp::Ordering, time::Duration};
use sync_status::SyncStatus;
pub mod client;
@ -17,21 +19,41 @@ pub mod peer_info;
pub mod score;
pub mod sync_status;
/// Max number of disconnected nodes to remember.
const MAX_DC_PEERS: usize = 500;
/// The maximum number of banned nodes to remember.
pub const MAX_BANNED_PEERS: usize = 1000;
/// We ban an IP if there are more than `BANNED_PEERS_PER_IP_THRESHOLD` banned peers with this IP.
const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5;
/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing
/// them in lighthouse.
const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1;
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a
/// disconnected state.
const DIAL_TIMEOUT: u64 = 15;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(default)]
pub struct PeerDBConfig {
/// The maximum number of disconnected nodes to remember.
pub max_disconnected_peers: usize,
/// The maximum number of banned nodes to remember.
pub max_banned_peers: usize,
/// We ban an IP if there are more than `BANNED_PEERS_PER_IP_THRESHOLD` banned peers with this IP.
pub banned_peers_per_ip_threshold: usize,
/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing them in lighthouse.
pub allowed_negative_gossipsub_factor: f32,
/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a disconnected state.
#[serde(deserialize_with = "deserialize_duration")]
pub dail_timeout: Duration,
}
impl Default for PeerDBConfig {
fn default() -> Self {
Self {
max_disconnected_peers: 500,
max_banned_peers: 1000,
banned_peers_per_ip_threshold: 5,
allowed_negative_gossipsub_factor: 0.1,
dail_timeout: Duration::from_secs(15),
}
}
}
/// Storage of known peers, their reputation and information
pub struct PeerDB {
config: PeerDBConfig,
/// The collection of known connected peers, their status and reputation
peers: HashMap<PeerId, PeerInfo>,
/// The number of disconnected nodes in the database.
@ -41,13 +63,14 @@ pub struct PeerDB {
}
impl PeerDB {
pub fn new(trusted_peers: Vec<PeerId>) -> Self {
pub fn new(config: PeerDBConfig, trusted_peers: Vec<PeerId>) -> Self {
// Initialize the peers hashmap with trusted peers
let peers = trusted_peers
.into_iter()
.map(|peer_id| (peer_id, PeerInfo::trusted_peer_info()))
.collect();
Self {
config,
disconnected_peers: 0,
banned_peers_count: BannedPeersCount::default(),
peers,
@ -316,9 +339,7 @@ impl PeerDB {
.iter()
.filter_map(|(peer_id, info)| {
if let PeerConnectionStatus::Dialing { since } = info.connection_status() {
if (*since) + std::time::Duration::from_secs(DIAL_TIMEOUT)
< std::time::Instant::now()
{
if (*since) + self.config.dail_timeout < std::time::Instant::now() {
return Some(*peer_id);
}
}
@ -422,7 +443,7 @@ impl PeerDB {
peers.sort_unstable_by(|(.., s1), (.., s2)| s2.partial_cmp(s1).unwrap_or(Ordering::Equal));
let mut to_ignore_negative_peers =
(target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize;
(target_peers as f32 * self.config.allowed_negative_gossipsub_factor).ceil() as usize;
for (peer_id, info, score) in peers {
let previous_state = info.score_state();
@ -946,11 +967,11 @@ impl PeerDB {
let excess_peers = self
.banned_peers_count
.banned_peers()
.saturating_sub(MAX_BANNED_PEERS);
.saturating_sub(self.config.max_banned_peers);
let mut unbanned_peers = Vec::with_capacity(excess_peers);
// Remove excess banned peers
while self.banned_peers_count.banned_peers() > MAX_BANNED_PEERS {
while self.banned_peers_count.banned_peers() > self.config.max_banned_peers {
if let Some((to_drop, unbanned_ips)) = if let Some((id, info, _)) = self
.peers
.iter()
@ -982,7 +1003,7 @@ impl PeerDB {
}
// Remove excess disconnected peers
while self.disconnected_peers > MAX_DC_PEERS {
while self.disconnected_peers > self.config.max_disconnected_peers {
if let Some(to_drop) = self
.peers
.iter()
@ -1210,7 +1231,7 @@ mod tests {
}
fn get_db() -> PeerDB {
PeerDB::new(vec![])
PeerDB::new(PeerDBConfig::default(), vec![])
}
#[test]
@ -1265,7 +1286,7 @@ mod tests {
use std::collections::BTreeMap;
let mut peer_list = BTreeMap::new();
for id in 0..MAX_DC_PEERS + 1 {
for id in 0..pdb.config.max_disconnected_peers + 1 {
let new_peer = PeerId::random();
pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_list.insert(id, new_peer);
@ -1276,11 +1297,15 @@ mod tests {
pdb.inject_disconnect(p);
// Allow the timing to update correctly
}
assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS);
assert_eq!(pdb.disconnected_peers, pdb.config.max_disconnected_peers);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
// Only the oldest peer should have been removed
for (id, peer_id) in peer_list.iter().rev().take(MAX_DC_PEERS) {
for (id, peer_id) in peer_list
.iter()
.rev()
.take(pdb.config.max_disconnected_peers)
{
println!("Testing id {}", id);
assert!(
pdb.peer_info(peer_id).is_some(),
@ -1301,7 +1326,7 @@ mod tests {
use std::collections::BTreeMap;
let mut peer_list = BTreeMap::new();
for id in 0..MAX_DC_PEERS + 20 {
for id in 0..pdb.config.max_disconnected_peers + 20 {
let new_peer = PeerId::random();
pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_list.insert(id, new_peer);
@ -1314,7 +1339,7 @@ mod tests {
println!("{}", pdb.disconnected_peers);
peer_list.clear();
for id in 0..MAX_DC_PEERS + 20 {
for id in 0..pdb.config.max_disconnected_peers + 20 {
let new_peer = PeerId::random();
pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_list.insert(id, new_peer);
@ -1345,7 +1370,7 @@ mod tests {
fn test_disconnected_are_bounded() {
let mut pdb = get_db();
for _ in 0..MAX_DC_PEERS + 1 {
for _ in 0..pdb.config.max_disconnected_peers + 1 {
let p = PeerId::random();
pdb.connect_ingoing(&p, "/ip4/0.0.0.0".parse().unwrap(), None);
}
@ -1356,14 +1381,14 @@ mod tests {
}
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS);
assert_eq!(pdb.disconnected_peers, pdb.config.max_disconnected_peers);
}
#[test]
fn test_banned_are_bounded() {
let mut pdb = get_db();
for _ in 0..MAX_BANNED_PEERS + 1 {
for _ in 0..pdb.config.max_banned_peers + 1 {
let p = PeerId::random();
pdb.connect_ingoing(&p, "/ip4/0.0.0.0".parse().unwrap(), None);
}
@ -1374,7 +1399,10 @@ mod tests {
pdb.inject_disconnect(&p);
}
assert_eq!(pdb.banned_peers_count.banned_peers(), MAX_BANNED_PEERS);
assert_eq!(
pdb.banned_peers_count.banned_peers(),
pdb.config.max_banned_peers
);
}
#[test]
@ -1908,7 +1936,7 @@ mod tests {
#[allow(clippy::float_cmp)]
fn test_trusted_peers_score() {
let trusted_peer = PeerId::random();
let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer]);
let mut pdb: PeerDB = PeerDB::new(PeerDBConfig::default(), vec![trusted_peer]);
pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None);

View File

@ -84,6 +84,7 @@ impl<AppReqId: ReqId> Service<AppReqId> {
.iter()
.map(|x| PeerId::from(x.clone()))
.collect(),
config.peer_db,
config.network_id.clone(),
));

View File

@ -1,5 +1,6 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::peer_manager::peerdb::PeerDB;
use crate::peer_manager::peerdb::PeerDBConfig;
use crate::Client;
use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
@ -34,6 +35,7 @@ impl NetworkGlobals {
tcp_port: u16,
udp_port: u16,
trusted_peers: Vec<PeerId>,
peer_db_config: PeerDBConfig,
network_id: NetworkIdentity,
) -> Self {
NetworkGlobals {
@ -42,7 +44,7 @@ impl NetworkGlobals {
listen_multiaddrs: RwLock::new(Vec::new()),
listen_port_tcp: AtomicU16::new(tcp_port),
listen_port_udp: AtomicU16::new(udp_port),
peers: RwLock::new(PeerDB::new(trusted_peers)),
peers: RwLock::new(PeerDB::new(peer_db_config, trusted_peers)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
network_id: RwLock::new(network_id),
}
@ -110,6 +112,13 @@ impl NetworkGlobals {
let enr_key: discv5::enr::CombinedKey =
discv5::enr::CombinedKey::from_libp2p(&keypair).unwrap();
let enr = discv5::enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
NetworkGlobals::new(enr, 9000, 9000, vec![], Default::default())
NetworkGlobals::new(
enr,
9000,
9000,
vec![],
Default::default(),
Default::default(),
)
}
}

View File

@ -9,7 +9,7 @@ use common::{
swarm,
};
use network::{
peer_manager::{self, config::Config, PeerManagerEvent},
peer_manager::{config::Config, peerdb::PeerDBConfig, PeerManagerEvent},
NetworkGlobals, PeerAction, PeerInfo, PeerManager, ReportSource,
};
@ -101,7 +101,7 @@ async fn banned_peers_consistency() {
};
let excess_banned_peers = 15;
let peers_to_ban = peer_manager::peerdb::MAX_BANNED_PEERS + excess_banned_peers;
let peers_to_ban = PeerDBConfig::default().max_banned_peers + excess_banned_peers;
// Build all the dummy peers needed.
let (mut swarm_pool, peers) = {

View File

@ -198,11 +198,11 @@ impl Pruner {
))),
Ordering::Equal => Ok(None),
Ordering::Greater => {
bail!(
error!(
"Unexpected first_rewardable_chunk revert: old={} new={}",
self.first_rewardable_chunk,
new_first_rewardable
self.first_rewardable_chunk, new_first_rewardable
);
Ok(None)
}
}
}

View File

@ -423,7 +423,7 @@ impl Libp2pEventHandler {
let addr = self.get_listen_addr_or_add().await?;
let timestamp = timestamp_now();
let shard_config = self.store.get_store().flow().get_shard_config();
let shard_config = self.store.get_store().get_shard_config();
let msg = AnnounceFile {
tx_ids,
@ -699,7 +699,7 @@ impl Libp2pEventHandler {
}
// notify sync layer if shard config matches
let my_shard_config = self.store.get_store().flow().get_shard_config();
let my_shard_config = self.store.get_store().get_shard_config();
if my_shard_config.intersect(&announced_shard_config) {
for tx_id in msg.tx_ids.iter() {
self.send_to_sync(SyncMessage::AnnounceFileGossip {
@ -911,7 +911,9 @@ mod tests {
let (network_send, network_recv) = mpsc::unbounded_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let store = LogManager::memorydb(LogConfig::default()).unwrap();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
Self {
runtime,
network_globals: Arc::new(network_globals),
@ -948,8 +950,14 @@ mod tests {
let keypair = Keypair::generate_secp256k1();
let enr_key = CombinedKey::from_libp2p(&keypair).unwrap();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let network_globals =
NetworkGlobals::new(enr, 30000, 30000, vec![], Default::default());
let network_globals = NetworkGlobals::new(
enr,
30000,
30000,
vec![],
Default::default(),
Default::default(),
);
let listen_addr: Multiaddr = "/ip4/127.0.0.1/tcp/30000".parse().unwrap();
network_globals.listen_multiaddrs.write().push(listen_addr);

View File

@ -1,11 +1,27 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, str::FromStr};
#[derive(Clone)]
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub enabled: bool,
pub listen_address: SocketAddr,
pub listen_address_admin: Option<SocketAddr>,
pub listen_address_admin: SocketAddr,
pub chunks_per_segment: usize,
pub max_request_body_size: u32,
pub max_cache_file_size: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
enabled: true,
listen_address: SocketAddr::from_str("0.0.0.0:5678").unwrap(),
listen_address_admin: SocketAddr::from_str("127.0.0.1:5679").unwrap(),
chunks_per_segment: 1024,
max_request_body_size: 100 * 1024 * 1024, // 100MB
max_cache_file_size: 10 * 1024 * 1024, // 10MB
}
}
}

View File

@ -20,7 +20,6 @@ use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use network::NetworkGlobals;
use network::NetworkMessage;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use storage_async::Store;
use sync::{SyncRequest, SyncResponse, SyncSender};
@ -69,9 +68,10 @@ impl Context {
pub async fn run_server(
ctx: Context,
) -> Result<(HttpServerHandle, Option<HttpServerHandle>), Box<dyn Error>> {
let handles = match ctx.config.listen_address_admin {
Some(listen_addr_private) => run_server_public_private(ctx, listen_addr_private).await?,
None => (run_server_all(ctx).await?, None),
let handles = if ctx.config.listen_address.port() != ctx.config.listen_address_admin.port() {
run_server_public_private(ctx).await?
} else {
(run_server_all(ctx).await?, None)
};
info!("Server started");
@ -107,7 +107,6 @@ async fn run_server_all(ctx: Context) -> Result<HttpServerHandle, Box<dyn Error>
/// Run 2 RPC servers (public & private) for different namespace RPCs.
async fn run_server_public_private(
ctx: Context,
listen_addr_private: SocketAddr,
) -> Result<(HttpServerHandle, Option<HttpServerHandle>), Box<dyn Error>> {
// public rpc
let zgs = (zgs::RpcServerImpl { ctx: ctx.clone() }).into_rpc();
@ -127,7 +126,7 @@ async fn run_server_public_private(
.start(zgs)?;
let handle_private = server_builder(ctx.clone())
.build(listen_addr_private)
.build(ctx.config.listen_address_admin)
.await?
.start(admin)?;

View File

@ -29,6 +29,7 @@ pub struct Status {
pub connected_peers: usize,
pub log_sync_height: u64,
pub log_sync_block: H256,
pub next_tx_seq: u64,
pub network_identity: NetworkIdentity,
}

View File

@ -26,10 +26,13 @@ impl RpcServer for RpcServerImpl {
.get_sync_progress()?
.unwrap_or_default();
let next_tx_seq = self.ctx.log_store.get_store().next_tx_seq();
Ok(Status {
connected_peers: self.ctx.network_globals.connected_peers(),
log_sync_height: sync_progress.0,
log_sync_block: sync_progress.1,
next_tx_seq,
network_identity: self.ctx.network_globals.network_id(),
})
}
@ -180,7 +183,7 @@ impl RpcServer for RpcServerImpl {
async fn get_shard_config(&self) -> RpcResult<ShardConfig> {
debug!("zgs_getShardConfig");
let shard_config = self.ctx.log_store.get_store().flow().get_shard_config();
let shard_config = self.ctx.log_store.get_store().get_shard_config();
Ok(shard_config)
}

View File

@ -364,7 +364,7 @@ impl TryFrom<FileProof> for FlowProof {
if lemma.len() != value.path.len() + 2 {
Err(anyhow!("invalid file proof"))
} else {
Ok(Self::new(lemma, value.path))
Self::new(lemma, value.path)
}
}
}

View File

@ -1,6 +1,6 @@
use clap::{arg, command, Command};
pub fn cli_app<'a>() -> Command<'a> {
pub fn cli_app() -> Command {
command!()
.arg(arg!(-c --config <FILE> "Sets a custom config file"))
.arg(arg!(--"log-config-file" [FILE] "Sets log configuration file (Default: log_config)"))
@ -10,4 +10,5 @@ pub fn cli_app<'a>() -> Command<'a> {
)
.arg(arg!(--"db-max-num-chunks" [NUM] "Sets the max number of chunks to store in db (Default: None)"))
.allow_external_subcommands(true)
.version(zgs_version::VERSION)
}

View File

@ -89,9 +89,10 @@ impl ClientBuilder {
/// Initializes in-memory storage.
pub fn with_memory_store(mut self) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
// TODO(zz): Set config.
let store = Arc::new(
LogManager::memorydb(LogConfig::default())
LogManager::memorydb(LogConfig::default(), executor)
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
);
@ -109,8 +110,9 @@ impl ClientBuilder {
/// Initializes RocksDB storage.
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(LogConfig::default(), &config.db_dir)
LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);

View File

@ -48,7 +48,7 @@ macro_rules! build_config{
let mut config = RawConfiguration::default();
// read from config file
if let Some(config_file) = matches.value_of("config") {
if let Some(config_file) = matches.get_one::<String>("config") {
let config_value = std::fs::read_to_string(config_file)
.map_err(|e| format!("failed to read configuration file: {:?}", e))?
.parse::<toml::Value>()
@ -67,7 +67,7 @@ macro_rules! build_config{
// read from command line
$(
#[allow(unused_variables)]
if let Some(value) = matches.value_of(underscore_to_hyphen!(stringify!($name))) {
if let Some(value) = matches.get_one::<String>(&underscore_to_hyphen!(stringify!($name))) {
if_not_vector!($($type)+, THEN {
config.$name = if_option!($($type)+,
THEN{ Some(value.parse().map_err(|_| concat!("Invalid ", stringify!($name)).to_owned())?) }

View File

@ -7,7 +7,6 @@ use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
use miner::MinerConfig;
use network::NetworkConfig;
use pruner::PrunerConfig;
use rpc::RPCConfig;
use shared_types::{NetworkIdentity, ProtocolVersion};
use std::net::IpAddr;
use std::time::Duration;
@ -96,6 +95,8 @@ impl ZgsConfig {
network_config.target_peers = self.network_target_peers;
network_config.private = self.network_private;
network_config.peer_db = self.network_peer_db;
Ok(network_config)
}
@ -105,32 +106,6 @@ impl ZgsConfig {
})
}
pub fn rpc_config(&self) -> Result<RPCConfig, String> {
let listen_address = self
.rpc_listen_address
.parse::<std::net::SocketAddr>()
.map_err(|e| format!("Unable to parse rpc_listen_address: {:?}", e))?;
let listen_address_admin = if self.rpc_listen_address_admin.is_empty() {
None
} else {
Some(
self.rpc_listen_address_admin
.parse::<std::net::SocketAddr>()
.map_err(|e| format!("Unable to parse rpc_listen_address_admin: {:?}", e))?,
)
};
Ok(RPCConfig {
enabled: self.rpc_enabled,
listen_address,
listen_address_admin,
max_request_body_size: self.max_request_body_size,
chunks_per_segment: self.rpc_chunks_per_segment,
max_cache_file_size: self.rpc_max_cache_file_size,
})
}
pub fn log_sync_config(&self) -> Result<LogSyncConfig, String> {
let contract_address = self
.log_contract_address

View File

@ -48,14 +48,6 @@ build_config! {
(remove_finalized_block_interval_minutes, (u64), 30)
(watch_loop_wait_time_ms, (u64), 500)
// rpc
(rpc_enabled, (bool), true)
(rpc_listen_address, (String), "0.0.0.0:5678".to_string())
(rpc_listen_address_admin, (String), "127.0.0.1:5679".to_string())
(max_request_body_size, (u32), 100*1024*1024) // 100MB
(rpc_chunks_per_segment, (usize), 1024)
(rpc_max_cache_file_size, (usize), 10*1024*1024) //10MB
// chunk pool
(chunk_pool_write_window_size, (usize), 4)
(chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G
@ -91,6 +83,9 @@ build_config! {
pub struct ZgsConfig {
pub raw_conf: RawConfiguration,
/// Network peer db config, configured by [network_peer_db] section by `config` crate.
pub network_peer_db: network::peer_manager::peerdb::PeerDBConfig,
// router config, configured by [router] section by `config` crate.
pub router: router::Config,
@ -100,6 +95,9 @@ pub struct ZgsConfig {
// file location cache config, configured by [file_location_cache] section by `config` crate.
pub file_location_cache: file_location_cache::Config,
// rpc config, configured by [rpc] section by `config` crate.
pub rpc: rpc::RPCConfig,
// metrics config, configured by [metrics] section by `config` crate.
pub metrics: metrics::MetricsConfiguration,
}

View File

@ -13,7 +13,6 @@ use std::error::Error;
async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client, String> {
let network_config = config.network_config().await?;
let storage_config = config.storage_config()?;
let rpc_config = config.rpc_config()?;
let log_sync_config = config.log_sync_config()?;
let miner_config = config.mine_config()?;
let router_config = config.router_config(&network_config)?;
@ -33,7 +32,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await?
.with_pruner(pruner_config)
.await?
.with_rpc(rpc_config, config.chunk_pool_config()?)
.with_rpc(config.rpc, config.chunk_pool_config()?)
.await?
.with_router(router_config)?
.build()

View File

@ -95,23 +95,22 @@ impl Store {
&self,
seal_index_max: usize,
) -> anyhow::Result<Option<Vec<SealTask>>> {
self.spawn(move |store| store.flow().pull_seal_chunk(seal_index_max))
self.spawn(move |store| store.pull_seal_chunk(seal_index_max))
.await
}
pub async fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> anyhow::Result<()> {
self.spawn(move |store| store.flow().submit_seal_result(answers))
self.spawn(move |store| store.submit_seal_result(answers))
.await
}
pub async fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
self.spawn(move |store| store.flow().load_sealed_data(chunk_index))
self.spawn(move |store| store.load_sealed_data(chunk_index))
.await
}
pub async fn get_num_entries(&self) -> Result<u64> {
self.spawn(move |store| store.flow().get_num_entries())
.await
self.spawn(move |store| store.get_num_entries()).await
}
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {
@ -122,7 +121,7 @@ impl Store {
pub async fn update_shard_config(&self, shard_config: ShardConfig) {
self.spawn(move |store| {
store.flow().update_shard_config(shard_config);
store.update_shard_config(shard_config);
Ok(())
})
.await

View File

@ -29,12 +29,13 @@ itertools = "0.13.0"
serde = { version = "1.0.197", features = ["derive"] }
parking_lot = "0.12.3"
serde_json = "1.0.127"
tokio = { version = "1.10.0", features = ["sync"] }
task_executor = { path = "../../common/task_executor" }
[dev-dependencies]
tempdir = "0.3.7"
rand = "0.8.5"
hex-literal = "0.3.4"
criterion = "0.4"
criterion = "0.5"
[[bench]]
name = "benchmark"

View File

@ -14,14 +14,18 @@ use storage::{
},
LogManager,
};
use task_executor::test_utils::TestRuntime;
fn write_performance(c: &mut Criterion) {
if Path::new("db_write").exists() {
fs::remove_dir_all("db_write").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_write")
LogManager::rocksdb(LogConfig::default(), "db_write", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
@ -105,8 +109,12 @@ fn read_performance(c: &mut Criterion) {
fs::remove_dir_all("db_read").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_read")
LogManager::rocksdb(LogConfig::default(), "db_read", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));

View File

@ -261,30 +261,30 @@ mod tests {
#[test]
fn test_shard_intersect() {
// 1 shard
assert_eq!(new_config(0, 1).intersect(&new_config(0, 1)), true);
assert!(new_config(0, 1).intersect(&new_config(0, 1)));
// either is 1 shard
assert_eq!(new_config(0, 1).intersect(&new_config(0, 2)), true);
assert_eq!(new_config(0, 1).intersect(&new_config(1, 2)), true);
assert_eq!(new_config(0, 2).intersect(&new_config(0, 1)), true);
assert_eq!(new_config(1, 2).intersect(&new_config(0, 1)), true);
assert!(new_config(0, 1).intersect(&new_config(0, 2)));
assert!(new_config(0, 1).intersect(&new_config(1, 2)));
assert!(new_config(0, 2).intersect(&new_config(0, 1)));
assert!(new_config(1, 2).intersect(&new_config(0, 1)));
// same shards
assert_eq!(new_config(1, 4).intersect(&new_config(0, 4)), false);
assert_eq!(new_config(1, 4).intersect(&new_config(1, 4)), true);
assert_eq!(new_config(1, 4).intersect(&new_config(2, 4)), false);
assert_eq!(new_config(1, 4).intersect(&new_config(3, 4)), false);
assert!(!new_config(1, 4).intersect(&new_config(0, 4)));
assert!(new_config(1, 4).intersect(&new_config(1, 4)));
assert!(!new_config(1, 4).intersect(&new_config(2, 4)));
assert!(!new_config(1, 4).intersect(&new_config(3, 4)));
// left shards is less
assert_eq!(new_config(1, 2).intersect(&new_config(0, 4)), false);
assert_eq!(new_config(1, 2).intersect(&new_config(1, 4)), false);
assert_eq!(new_config(1, 2).intersect(&new_config(2, 4)), true);
assert_eq!(new_config(1, 2).intersect(&new_config(3, 4)), true);
assert!(!new_config(1, 2).intersect(&new_config(0, 4)));
assert!(!new_config(1, 2).intersect(&new_config(1, 4)));
assert!(new_config(1, 2).intersect(&new_config(2, 4)));
assert!(new_config(1, 2).intersect(&new_config(3, 4)));
// right shards is less
assert_eq!(new_config(1, 4).intersect(&new_config(0, 2)), true);
assert_eq!(new_config(1, 4).intersect(&new_config(1, 2)), false);
assert_eq!(new_config(2, 4).intersect(&new_config(0, 2)), false);
assert_eq!(new_config(2, 4).intersect(&new_config(1, 2)), true);
assert!(new_config(1, 4).intersect(&new_config(0, 2)));
assert!(!new_config(1, 4).intersect(&new_config(1, 2)));
assert!(!new_config(2, 4).intersect(&new_config(0, 2)));
assert!(new_config(2, 4).intersect(&new_config(1, 2)));
}
}

View File

@ -3,7 +3,8 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::error::Error;
use crate::log_store::log_manager::{
bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
};
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB};
@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
use itertools::Itertools;
use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof};
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::cmp::Ordering;
@ -203,7 +204,7 @@ impl FlowRead for FlowStore {
for (seal_index, (sealed, validity)) in mine_chunk
.loaded_chunk
.iter_mut()
.zip(mine_chunk.avalibilities.iter_mut())
.zip(mine_chunk.availabilities.iter_mut())
.enumerate()
{
if let Some(data) = batch.get_sealed_data(seal_index as u16) {
@ -441,6 +442,10 @@ impl FlowDBStore {
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
let mut leaf_list = Vec::new();
let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?;
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
@ -475,25 +480,26 @@ impl FlowDBStore {
expected_index += 1;
}
Ordering::Greater => {
bail!(
"unexpected chunk leaf in range, expected={}, get={}, range={:?}",
expected_index,
batch_index,
range_root,
);
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = None;
root_list.push((1, root));
expected_index += 1;
}
}
}
} else if expected_index == batch_index {
} else {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
root_list.push((subtree_depth, root));
expected_index += 1 << (subtree_depth - 1);
} else {
bail!(
"unexpected range root: expected={} get={}",
expected_index,
batch_index
);
}
}
let extra_node_list = self.get_mpt_node_list()?;

View File

@ -189,15 +189,8 @@ impl EntryBatchData {
}
pub fn insert_data(&mut self, start_byte: usize, mut data: Vec<u8>) -> Result<Vec<u16>> {
assert!(start_byte % BYTES_PER_SECTOR == 0);
assert!(data.len() % BYTES_PER_SECTOR == 0);
if data.is_empty() || self.get(start_byte, data.len()) == Some(&data) {
// TODO(zz): This assumes the caller has processed chain reorg (truncate flow) before
// inserting new data, and the data of the same file are always inserted with the
// same pattern.
return Ok(vec![]);
}
assert_eq!(start_byte % BYTES_PER_SECTOR, 0);
assert_eq!(data.len() % BYTES_PER_SECTOR, 0);
// Check if the entry is completed
let (list, subtree_list) = if let EntryBatchData::Incomplete(x) = self {

View File

@ -127,6 +127,14 @@ impl EntryBatch {
/// Return `Error` if the new data overlaps with old data.
/// Convert `Incomplete` to `Completed` if the chunk is completed after the insertion.
pub fn insert_data(&mut self, offset: usize, data: Vec<u8>) -> Result<Vec<u16>> {
if data.is_empty()
|| self
.get_unsealed_data(offset, data.len() / BYTES_PER_SECTOR)
.as_ref()
== Some(&data)
{
return Ok(vec![]);
}
self.data.insert_data(offset * BYTES_PER_SECTOR, data)
}

View File

@ -1,3 +1,4 @@
use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
use crate::log_store::tx_store::TransactionStore;
use crate::log_store::{
@ -20,11 +21,12 @@ use shared_types::{
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex;
use super::LogStoreInner;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
/// 256 Bytes
pub const ENTRY_SIZE: usize = 256;
@ -45,11 +47,18 @@ pub const COL_NUM: u32 = 9;
// Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20;
pub struct UpdateFlowMessage {
pub root_map: BTreeMap<usize, (H256, usize)>,
pub pad_data: usize,
pub tx_start_flow_index: u64,
}
pub struct LogManager {
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore,
flow_store: FlowStore,
flow_store: Arc<FlowStore>,
merkle: RwLock<MerkleManager>,
sender: mpsc::Sender<UpdateFlowMessage>,
}
struct MerkleManager {
@ -139,16 +148,6 @@ pub struct LogConfig {
pub flow: FlowConfig,
}
impl LogStoreInner for LogManager {
fn flow(&self) -> &dyn super::Flow {
&self.flow_store
}
fn flow_mut(&mut self) -> &mut dyn super::Flow {
&mut self.flow_store
}
}
impl LogStoreChunkWrite for LogManager {
fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
let mut merkle = self.merkle.write();
@ -252,7 +251,7 @@ impl LogStoreWrite for LogManager {
debug!("recovery with tx_seq={}", tx.seq);
} else {
// This is not supposed to happen since we have checked the tx seq in log entry sync.
error!("tx unmatch, expected={} get={:?}", expected_seq, tx);
error!("tx mismatch, expected={} get={:?}", expected_seq, tx);
bail!("unexpected tx!");
}
}
@ -389,6 +388,14 @@ impl LogStoreWrite for LogManager {
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
self.tx_store.delete_block_hash_by_number(block_number)
}
fn update_shard_config(&self, shard_config: ShardConfig) {
self.flow_store.update_shard_config(shard_config)
}
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.flow_store.submit_seal_result(answers)
}
}
impl LogStoreChunkRead for LogManager {
@ -579,24 +586,48 @@ impl LogStoreRead for LogManager {
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
self.tx_store.check_tx_pruned(tx_seq)
}
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
self.flow_store.pull_seal_chunk(seal_index_max)
}
fn get_num_entries(&self) -> Result<u64> {
self.flow_store.get_num_entries()
}
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
self.flow_store.load_sealed_data(chunk_index)
}
fn get_shard_config(&self) -> ShardConfig {
self.flow_store.get_shard_config()
}
}
impl LogManager {
pub fn rocksdb(config: LogConfig, path: impl AsRef<Path>) -> Result<Self> {
pub fn rocksdb(
config: LogConfig,
path: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
db_config.enable_statistics = true;
let db = Arc::new(Database::open(&db_config, path)?);
Self::new(db, config)
Self::new(db, config, executor)
}
pub fn memorydb(config: LogConfig) -> Result<Self> {
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(db, config)
Self::new(db, config, executor)
}
fn new(db: Arc<dyn ZgsKeyValueDB>, config: LogConfig) -> Result<Self> {
fn new(
db: Arc<dyn ZgsKeyValueDB>,
config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let tx_store = TransactionStore::new(db.clone())?;
let flow_store = FlowStore::new(db.clone(), config.flow);
let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
let mut initial_data = flow_store.get_chunk_root_list()?;
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
@ -700,14 +731,21 @@ impl LogManager {
pora_chunks_merkle,
last_chunk_merkle,
});
let log_manager = Self {
let (sender, receiver) = mpsc::channel();
let mut log_manager = Self {
db,
tx_store,
flow_store,
merkle,
sender,
};
log_manager.start_receiver(receiver, executor);
if let Some(tx) = last_tx_to_insert {
log_manager.revert_to(tx.seq - 1)?;
log_manager.put_tx(tx)?;
let mut merkle = log_manager.merkle.write();
for (index, h) in extra_leaves {
@ -727,6 +765,41 @@ impl LogManager {
Ok(log_manager)
}
fn start_receiver(
&mut self,
rx: mpsc::Receiver<UpdateFlowMessage>,
executor: task_executor::TaskExecutor,
) {
let flow_store = self.flow_store.clone();
executor.spawn(
async move {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the root index.
flow_store.put_batch_root_list(data.root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store
.append_entries(ChunkArray {
data: vec![0; data.pad_data],
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
error!("Receiver error");
}
};
}
},
"pad_tx",
);
// Wait for the spawned thread to finish
// let _ = handle.join().expect("Thread panicked");
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
match maybe_root {
None => self.gen_proof_at_version(flow_index, None),
@ -863,6 +936,7 @@ impl LogManager {
);
if extra != 0 {
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree.
@ -874,6 +948,7 @@ impl LogManager {
let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad {
is_full_empty = false;
merkle
.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?);
@ -882,6 +957,7 @@ impl LogManager {
.update_last(*merkle.last_chunk_merkle.root());
} else {
if last_chunk_pad != 0 {
is_full_empty = false;
// Pad the last chunk.
merkle
.last_chunk_merkle
@ -910,16 +986,26 @@ impl LogManager {
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
}
// Update the root index.
self.flow_store.put_batch_root_list(root_map)?;
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
let data_size = pad_data.len() / ENTRY_SIZE;
self.flow_store.append_entries(ChunkArray {
data: pad_data,
start_index: tx_start_flow_index,
})?;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
root_map,
pad_data: pad_data.len(),
tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
self.flow_store
.append_entries(ChunkArray {
data: pad_data.to_vec(),
start_index: tx_start_flow_index,
})
.unwrap();
}
tx_start_flow_index += data_size as u64;
if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index, &mut *merkle)?;
@ -1173,7 +1259,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
if leaf_data.len() % ENTRY_SIZE != 0 {
bail!("merkle_tree: unmatch data size");
bail!("merkle_tree: mismatched data size");
}
// If the data size is small, using `rayon` would introduce more overhead.
let r = if leaf_data.len() >= ENTRY_SIZE * 8 {
@ -1211,7 +1297,7 @@ fn entry_proof(top_proof: &FlowProof, sub_proof: &FlowProof) -> Result<FlowProof
assert!(lemma.pop().is_some());
lemma.extend_from_slice(&top_proof.lemma()[1..]);
path.extend_from_slice(top_proof.path());
Ok(FlowProof::new(lemma, path))
FlowProof::new(lemma, path)
}
pub fn split_nodes(data_size: usize) -> Vec<usize> {

View File

@ -76,6 +76,14 @@ pub trait LogStoreRead: LogStoreChunkRead {
/// Return flow root and length.
fn get_context(&self) -> Result<(DataRoot, u64)>;
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>>;
fn get_num_entries(&self) -> Result<u64>;
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
fn get_shard_config(&self) -> ShardConfig;
}
pub trait LogStoreChunkRead {
@ -145,6 +153,10 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
) -> Result<bool>;
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>;
fn update_shard_config(&self, shard_config: ShardConfig);
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
}
pub trait LogStoreChunkWrite {
@ -168,31 +180,22 @@ pub trait LogChunkStore: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync +
impl<T: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + 'static> LogChunkStore for T {}
pub trait Store:
LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static
LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
{
}
impl<
T: LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static,
> Store for T
{
}
pub trait LogStoreInner {
fn flow(&self) -> &dyn Flow;
fn flow_mut(&mut self) -> &mut dyn Flow;
}
impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
pub struct MineLoadChunk {
// Use `Vec` instead of array to avoid thread stack overflow.
pub loaded_chunk: Vec<[u8; BYTES_PER_SEAL]>,
pub avalibilities: [bool; SEALS_PER_LOAD],
pub availabilities: [bool; SEALS_PER_LOAD],
}
impl Default for MineLoadChunk {
fn default() -> Self {
Self {
loaded_chunk: vec![[0u8; BYTES_PER_SEAL]; SEALS_PER_LOAD],
avalibilities: [false; SEALS_PER_LOAD],
availabilities: [false; SEALS_PER_LOAD],
}
}
}

View File

@ -8,11 +8,15 @@ use ethereum_types::H256;
use rand::random;
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
use std::cmp;
use task_executor::test_utils::TestRuntime;
#[test]
fn test_put_get() {
let config = LogConfig::default();
let store = LogManager::memorydb(config.clone()).unwrap();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(config.clone(), executor).unwrap();
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
// Aligned with size.
let start_offset = 1024;
@ -169,8 +173,10 @@ fn test_put_tx() {
fn create_store() -> LogManager {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(config).unwrap()
LogManager::memorydb(config, executor).unwrap()
}
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {

View File

@ -1,10 +1,14 @@
use std::sync::Arc;
use metrics::{register_timer, Counter, CounterUsize, Histogram, Sample, Timer};
use metrics::{
register_meter, register_timer, Counter, CounterUsize, Histogram, Meter, Sample, Timer,
};
lazy_static::lazy_static! {
pub static ref SERIAL_SYNC_FILE_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_file_completed");
pub static ref SERIAL_SYNC_CHUNKS_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_chunks_completed");
pub static ref SERIAL_SYNC_SEGMENT_BANDWIDTH: Arc<dyn Meter> = register_meter("sync_controllers_serial_sync_segment_bandwidth");
pub static ref SERIAL_SYNC_SEGMENT_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_controllers_serial_sync_segment_latency", 1024);
pub static ref SERIAL_SYNC_SEGMENT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_segment_timeout");
pub static ref SERIAL_SYNC_UNEXPECTED_ERRORS: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_unexpected_errors");

View File

@ -11,6 +11,7 @@ use network::{
};
use rand::Rng;
use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
use ssz::Encode;
use std::{sync::Arc, time::Instant};
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage_async::Store;
@ -255,6 +256,17 @@ impl SerialSyncController {
/// Randomly select a peer to sync the next segment.
fn try_request_next(&mut self) {
// limits network bandwidth if configured
if self.config.max_bandwidth_bytes > 0 {
let m1 = metrics::SERIAL_SYNC_SEGMENT_BANDWIDTH.rate1() as u64;
if m1 > self.config.max_bandwidth_bytes {
self.state = SyncState::AwaitingDownload {
since: (Instant::now() + self.config.bandwidth_wait_timeout).into(),
};
return;
}
}
// request next chunk array
let from_chunk = self.next_chunk;
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
@ -407,6 +419,8 @@ impl SerialSyncController {
}
pub async fn on_response(&mut self, from_peer_id: PeerId, response: ChunkArrayWithProof) {
metrics::SERIAL_SYNC_SEGMENT_BANDWIDTH.mark(response.ssz_bytes_len());
if self.handle_on_response_mismatch(from_peer_id) {
return;
}
@ -478,7 +492,7 @@ impl SerialSyncController {
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
let shard_config = self.store.get_store().flow().get_shard_config();
let shard_config = self.store.get_store().get_shard_config();
let next_chunk = segment_to_sector(shard_config.next_segment_index(
sector_to_segment(from_chunk),
sector_to_segment(self.tx_start_chunk_in_flow),
@ -1608,7 +1622,7 @@ mod tests {
let num_chunks = 123;
let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config).unwrap());
let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
}

View File

@ -43,6 +43,9 @@ pub struct Config {
pub peer_wait_outgoing_connection_timeout: Duration,
#[serde(deserialize_with = "deserialize_duration")]
pub peer_next_chunks_request_wait_timeout: Duration,
pub max_bandwidth_bytes: u64,
#[serde(deserialize_with = "deserialize_duration")]
pub bandwidth_wait_timeout: Duration,
// auto sync config
#[serde(deserialize_with = "deserialize_duration")]
@ -63,7 +66,7 @@ impl Default for Config {
// sync service config
heartbeat_interval: Duration::from_secs(5),
auto_sync_enabled: false,
max_sync_files: 32,
max_sync_files: 8,
sync_file_by_rpc_enabled: true,
sync_file_on_announcement_enabled: false,
@ -76,12 +79,14 @@ impl Default for Config {
peer_chunks_download_timeout: Duration::from_secs(15),
peer_wait_outgoing_connection_timeout: Duration::from_secs(10),
peer_next_chunks_request_wait_timeout: Duration::from_secs(3),
max_bandwidth_bytes: 0,
bandwidth_wait_timeout: Duration::from_secs(5),
// auto sync config
auto_sync_idle_interval: Duration::from_secs(3),
auto_sync_error_interval: Duration::from_secs(10),
max_sequential_workers: 0,
max_random_workers: 30,
max_random_workers: 2,
sequential_find_peer_timeout: Duration::from_secs(60),
random_find_peer_timeout: Duration::from_secs(500),
}

View File

@ -807,7 +807,7 @@ impl SyncService {
}
async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
let shard_config = store.get_store().flow().get_shard_config();
let shard_config = store.get_store().get_shard_config();
let start_segment = sector_to_segment(tx.start_entry_index());
let end =
bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);
@ -1294,7 +1294,9 @@ mod tests {
let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
let executor = runtime.task_executor.clone();
let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let file_location_cache: Arc<FileLocationCache> =

View File

@ -9,6 +9,8 @@ use storage::{
LogManager,
};
use task_executor::test_utils::TestRuntime;
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
/// The first store is for local node, and data not stored. The second store is for peers, and all
/// transactions are finalized for file sync.
@ -22,8 +24,11 @@ pub fn create_2_store(
Vec<Vec<u8>>,
) {
let config = LogConfig::default();
let mut store = LogManager::memorydb(config.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config).unwrap();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config, executor).unwrap();
let mut offset = 1;
let mut txs = vec![];
@ -115,7 +120,10 @@ pub mod tests {
impl TestStoreRuntime {
pub fn new_store() -> impl LogStore {
LogManager::memorydb(LogConfig::default()).unwrap()
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(LogConfig::default(), executor).unwrap()
}
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {

View File

@ -120,28 +120,6 @@ log_sync_start_block_number = 595059
# Watch_loop (eth_getLogs) trigger interval.
# watch_loop_wait_time_ms = 500
#######################################################################
### RPC Config Options ###
#######################################################################
# Whether to provide RPC service.
# rpc_enabled = true
# HTTP server address to bind for public RPC.
# rpc_listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# rpc_listen_address_admin = "127.0.0.1:5679"
# Maximum data size of RPC request body (by default, 100MB).
# max_request_body_size = 104857600
# Number of chunks for a single segment.
# rpc_chunks_per_segment = 1024
# Maximum file size that allowed to cache in memory (by default, 10MB).
# rpc_max_cache_file_size = 10485760
#######################################################################
### Chunk Pool Config Options ###
#######################################################################
@ -218,6 +196,18 @@ reward_contract_address = "0x0496D0817BD8519e0de4894Dc379D35c35275609"
#
# prune_batch_wait_time_ms = 1000
#######################################################################
### Network Peer DB Config Options ###
#######################################################################
# [network_peer_db]
# The maximum number of disconnected nodes to remember.
# max_disconnected_peers = 500
# The maximum number of banned nodes to remember.
# max_banned_peers = 1000
#######################################################################
### Router Config Options ###
#######################################################################
@ -243,7 +233,7 @@ batcher_announcement_capacity = 100
auto_sync_enabled = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 32
# max_sync_files = 8
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true
@ -263,11 +253,15 @@ auto_sync_enabled = true
# Timeout to download data from remote peer.
# peer_chunks_download_timeout = "15s"
# Maximum network bandwidth (B/s) to sync files. Default value is 0,
# which indicates no limitation.
# max_bandwidth_bytes = 0
# Maximum threads to sync files in sequence.
# max_sequential_workers = 0
# Maximum threads to sync files randomly.
# max_random_workers = 30
# max_random_workers = 2
# Timeout to terminate a file sync in sequence.
# sequential_find_peer_timeout = "60s"
@ -296,6 +290,30 @@ auto_sync_enabled = true
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 86400
#######################################################################
### RPC Config Options ###
#######################################################################
# [rpc]
# Whether to provide RPC service.
# enabled = true
# HTTP server address to bind for public RPC.
# listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# listen_address_admin = "127.0.0.1:5679"
# Number of chunks for a single segment.
# chunks_per_segment = 1024
# Maximum data size of RPC request body (by default, 100MB).
# max_request_body_size = 104857600
# Maximum file size that allowed to cache in memory (by default, 10MB).
# max_cache_file_size = 10485760
#######################################################################
### Metrics Options ###
#######################################################################

View File

@ -120,28 +120,6 @@ log_sync_start_block_number = 595059
# Watch_loop (eth_getLogs) trigger interval.
# watch_loop_wait_time_ms = 500
#######################################################################
### RPC Config Options ###
#######################################################################
# Whether to provide RPC service.
# rpc_enabled = true
# HTTP server address to bind for public RPC.
# rpc_listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# rpc_listen_address_admin = "127.0.0.1:5679"
# Maximum data size of RPC request body (by default, 100MB).
# max_request_body_size = 104857600
# Number of chunks for a single segment.
# rpc_chunks_per_segment = 1024
# Maximum file size that allowed to cache in memory (by default, 10MB).
# rpc_max_cache_file_size = 10485760
#######################################################################
### Chunk Pool Config Options ###
#######################################################################
@ -230,6 +208,18 @@ reward_contract_address = "0x51998C4d486F406a788B766d93510980ae1f9360"
#
# prune_batch_wait_time_ms = 1000
#######################################################################
### Network Peer DB Config Options ###
#######################################################################
# [network_peer_db]
# The maximum number of disconnected nodes to remember.
# max_disconnected_peers = 500
# The maximum number of banned nodes to remember.
# max_banned_peers = 1000
#######################################################################
### Router Config Options ###
#######################################################################
@ -255,7 +245,7 @@ batcher_announcement_capacity = 100
auto_sync_enabled = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 32
# max_sync_files = 8
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true
@ -275,11 +265,15 @@ auto_sync_enabled = true
# Timeout to download data from remote peer.
# peer_chunks_download_timeout = "15s"
# Maximum network bandwidth (B/s) to sync files. Default value is 0,
# which indicates no limitation.
# max_bandwidth_bytes = 0
# Maximum threads to sync files in sequence.
# max_sequential_workers = 0
# Maximum threads to sync files randomly.
# max_random_workers = 30
# max_random_workers = 2
# Timeout to terminate a file sync in sequence.
# sequential_find_peer_timeout = "60s"
@ -308,6 +302,30 @@ auto_sync_enabled = true
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 86400
#######################################################################
### RPC Config Options ###
#######################################################################
# [rpc]
# Whether to provide RPC service.
# enabled = true
# HTTP server address to bind for public RPC.
# listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# listen_address_admin = "127.0.0.1:5679"
# Number of chunks for a single segment.
# chunks_per_segment = 1024
# Maximum data size of RPC request body (by default, 100MB).
# max_request_body_size = 104857600
# Maximum file size that allowed to cache in memory (by default, 10MB).
# max_cache_file_size = 10485760
#######################################################################
### Metrics Options ###
#######################################################################

View File

@ -120,28 +120,6 @@
# Watch_loop (eth_getLogs) trigger interval.
# watch_loop_wait_time_ms = 500
#######################################################################
### RPC Config Options ###
#######################################################################
# Whether to provide RPC service.
# rpc_enabled = true
# HTTP server address to bind for public RPC.
# rpc_listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# rpc_listen_address_admin = "127.0.0.1:5679"
# Maximum data size of RPC request body (by default, 100MB).
# max_request_body_size = 104857600
# Number of chunks for a single segment.
# rpc_chunks_per_segment = 1024
# Maximum file size that allowed to cache in memory (by default, 10MB).
# rpc_max_cache_file_size = 10485760
#######################################################################
### Chunk Pool Config Options ###
#######################################################################
@ -232,6 +210,18 @@
#
# prune_batch_wait_time_ms = 1000
#######################################################################
### Network Peer DB Config Options ###
#######################################################################
# [network_peer_db]
# The maximum number of disconnected nodes to remember.
# max_disconnected_peers = 500
# The maximum number of banned nodes to remember.
# max_banned_peers = 1000
#######################################################################
### Router Config Options ###
#######################################################################
@ -257,7 +247,7 @@
# auto_sync_enabled = false
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 32
# max_sync_files = 8
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true
@ -277,11 +267,15 @@
# Timeout to download data from remote peer.
# peer_chunks_download_timeout = "15s"
# Maximum network bandwidth (B/s) to sync files. Default value is 0,
# which indicates no limitation.
# max_bandwidth_bytes = 0
# Maximum threads to sync files in sequence.
# max_sequential_workers = 0
# Maximum threads to sync files randomly.
# max_random_workers = 30
# max_random_workers = 2
# Timeout to terminate a file sync in sequence.
# sequential_find_peer_timeout = "60s"
@ -310,6 +304,30 @@
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 86400
#######################################################################
### RPC Config Options ###
#######################################################################
# [rpc]
# Whether to provide RPC service.
# enabled = true
# HTTP server address to bind for public RPC.
# listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# listen_address_admin = "127.0.0.1:5679"
# Number of chunks for a single segment.
# chunks_per_segment = 1024
# Maximum data size of RPC request body (by default, 100MB).
# max_request_body_size = 104857600
# Maximum file size that allowed to cache in memory (by default, 10MB).
# max_cache_file_size = 10485760
#######################################################################
### Metrics Options ###
#######################################################################

View File

@ -13,6 +13,12 @@ jq --version >/dev/null 2>&1 || sudo snap install jq -y
mkdir -p $ROOT_DIR
SED_I="sed -i"
OS_NAME=`uname -o`
if [[ "$OS_NAME" = "Darwin" ]]; then
SED_I="sed -i ''"
fi
# Init configs
for ((i=0; i<$NUM_NODES; i++)) do
$BINARY init node$i --home $ROOT_DIR/node$i --chain-id $CHAIN_ID
@ -22,10 +28,10 @@ for ((i=0; i<$NUM_NODES; i++)) do
TMP_GENESIS=$ROOT_DIR/node$i/config/tmp_genesis.json
# Replace stake with neuron
sed -in-place='' 's/stake/ua0gi/g' "$GENESIS"
$SED_I 's/"stake"/"ua0gi"/g' "$GENESIS"
# Replace the default evm denom of aphoton with neuron
sed -in-place='' 's/aphoton/neuron/g' "$GENESIS"
$SED_I 's/aphoton/neuron/g' "$GENESIS"
cat $GENESIS | jq '.consensus_params.block.max_gas = "25000000"' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
@ -53,24 +59,24 @@ for ((i=0; i<$NUM_NODES; i++)) do
# Change app.toml
APP_TOML=$ROOT_DIR/node$i/config/app.toml
sed -i 's/minimum-gas-prices = "0ua0gi"/minimum-gas-prices = "1000000000neuron"/' $APP_TOML
sed -i '/\[grpc\]/,/^\[/ s/enable = true/enable = false/' $APP_TOML
sed -i '/\[grpc-web\]/,/^\[/ s/enable = true/enable = false/' $APP_TOML
sed -i '/\[json-rpc\]/,/^\[/ s/enable = false/enable = true/' $APP_TOML
$SED_I 's/minimum-gas-prices = "0ua0gi"/minimum-gas-prices = "1000000000neuron"/' $APP_TOML
$SED_I '/\[grpc\]/,/^\[/ s/enable = true/enable = false/' $APP_TOML
$SED_I '/\[grpc-web\]/,/^\[/ s/enable = true/enable = false/' $APP_TOML
$SED_I '/\[json-rpc\]/,/^\[/ s/enable = false/enable = true/' $APP_TOML
# Change config.toml
CONFIG_TOML=$ROOT_DIR/node$i/config/config.toml
sed -i '/seeds = /c\seeds = ""' $CONFIG_TOML
sed -i 's/addr_book_strict = true/addr_book_strict = false/' $CONFIG_TOML
# $SED_I '/seeds = /c\seeds = ""' $CONFIG_TOML
$SED_I 's/addr_book_strict = true/addr_book_strict = false/' $CONFIG_TOML
# Change block time to very small
sed -i '/timeout_propose = "3s"/c\timeout_propose = "300ms"' $CONFIG_TOML
sed -i '/timeout_propose_delta = "500ms"/c\timeout_propose_delta = "50ms"' $CONFIG_TOML
sed -i '/timeout_prevote = "1s"/c\timeout_prevote = "100ms"' $CONFIG_TOML
sed -i '/timeout_prevote_delta = "500ms"/c\timeout_prevote_delta = "50ms"' $CONFIG_TOML
sed -i '/timeout_precommit = "1s"/c\timeout_precommit = "100ms"' $CONFIG_TOML
sed -i '/timeout_precommit_delta = "500ms"/c\timeout_precommit_delta = "50ms"' $CONFIG_TOML
sed -i '/timeout_commit = "5s"/c\timeout_commit = "500ms"' $CONFIG_TOML
$SED_I 's/timeout_propose = "3s"/timeout_propose = "300ms"/' $CONFIG_TOML
$SED_I 's/timeout_propose_delta = "500ms"/timeout_propose_delta = "50ms"/' $CONFIG_TOML
$SED_I 's/timeout_prevote = "1s"/timeout_prevote = "100ms"/' $CONFIG_TOML
$SED_I 's/timeout_prevote_delta = "500ms"/timeout_prevote_delta = "50ms"/' $CONFIG_TOML
$SED_I 's/timeout_precommit = "1s"/timeout_precommit = "100ms"/' $CONFIG_TOML
$SED_I 's/timeout_precommit_delta = "500ms"/timeout_precommit_delta = "50ms"/' $CONFIG_TOML
$SED_I 's/timeout_commit = "5s"/timeout_commit = "500ms"/' $CONFIG_TOML
done
# Update persistent_peers in config.toml
@ -82,7 +88,7 @@ for ((i=1; i<$NUM_NODES; i++)) do
P2P_PORT=$(($P2P_PORT_START+$j))
PERSISTENT_NODES=$PERSISTENT_NODES$NODE_ID@127.0.0.1:$P2P_PORT
done
sed -i "/persistent_peers = /c\persistent_peers = \"$PERSISTENT_NODES\"" $ROOT_DIR/node$i/config/config.toml
$SED_I "s/persistent_peers = \"\"/persistent_peers = \"$PERSISTENT_NODES\"/" $ROOT_DIR/node$i/config/config.toml
done
# Create genesis with a single validator

View File

@ -34,11 +34,15 @@ class ZgsNode(TestNode):
for i in range(index):
libp2p_nodes.append(f"/ip4/127.0.0.1/tcp/{p2p_port(i)}")
rpc_listen_address = f"127.0.0.1:{rpc_port(index)}"
indexed_config = {
"network_libp2p_port": p2p_port(index),
"network_discovery_port": p2p_port(index),
"rpc_listen_address": f"127.0.0.1:{rpc_port(index)}",
"rpc_listen_address_admin": "",
"rpc": {
"listen_address": rpc_listen_address,
"listen_address_admin": rpc_listen_address,
},
"network_libp2p_nodes": libp2p_nodes,
"log_contract_address": log_contract_address,
"mine_contract_address": mine_contract_address,
@ -50,7 +54,7 @@ class ZgsNode(TestNode):
# Overwrite with personalized configs.
update_config(local_conf, updated_config)
data_dir = os.path.join(root_dir, "zgs_node" + str(index))
rpc_url = "http://" + local_conf["rpc_listen_address"]
rpc_url = "http://" + rpc_listen_address
super().__init__(
NodeType.Zgs,
index,

View File

@ -16,7 +16,6 @@ BSC_BINARY = "geth.exe" if is_windows_platform() else "geth"
ZG_BINARY = "0gchaind.exe" if is_windows_platform() else "0gchaind"
CLIENT_BINARY = "0g-storage-client.exe" if is_windows_platform() else "0g-storage-client"
ZG_GIT_REV = "7bc25a060fab9c17bc9942b6747cd07a668d3042" # v0.1.0
CLI_GIT_REV = "98d74b7e7e6084fc986cb43ce2c66692dac094a6"
@unique
@ -76,8 +75,7 @@ def build_zg(dir: str) -> BuildBinaryResult:
dir=dir,
binary_name=ZG_BINARY,
github_url="https://github.com/0glabs/0g-chain.git",
git_rev=ZG_GIT_REV,
build_cmd="make install; cp $(go env GOPATH)/bin/0gchaind .",
build_cmd="git fetch origin pull/74/head:pr-74; git checkout pr-74; make install; cp $(go env GOPATH)/bin/0gchaind .",
compiled_relative_path=[],
)