Merge branch 'main' into query-interval

This commit is contained in:
Bruno Valente 2024-08-27 16:22:15 +08:00
commit e126cfb629
53 changed files with 1464 additions and 207 deletions

304
Cargo.lock generated
View File

@ -181,6 +181,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arrayref"
version = "0.3.7"
@ -838,6 +844,7 @@ dependencies = [
name = "channel"
version = "0.1.0"
dependencies = [
"metrics",
"tokio",
]
@ -1431,6 +1438,17 @@ dependencies = [
"powerfmt",
]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "derive_builder"
version = "0.9.0"
@ -1467,6 +1485,12 @@ dependencies = [
"syn 2.0.68",
]
[[package]]
name = "destructure_traitobject"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c877555693c14d2f84191cfd3ad8582790fc52b5e2274b40b59cf5f5cea25c7"
[[package]]
name = "digest"
version = "0.9.0"
@ -1636,7 +1660,7 @@ dependencies = [
"rust_decimal",
"serde",
"thiserror",
"time",
"time 0.3.36",
]
[[package]]
@ -2406,6 +2430,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@ -3012,6 +3051,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.29"
@ -3080,6 +3125,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
@ -3263,6 +3321,19 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "influx_db_client"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ef03268010ccf98c178eed83aa7377b7f6531f8ec8d43a256902c24cadac60"
dependencies = [
"bytes",
"futures",
"reqwest",
"serde",
"serde_json",
]
[[package]]
name = "inout"
version = "0.1.3"
@ -4415,9 +4486,45 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
dependencies = [
"serde",
"value-bag",
]
[[package]]
name = "log-mdc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a94d21414c1f4a51209ad204c1776a3d0765002c76c6abcb602a6f09f1e881c7"
[[package]]
name = "log4rs"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0816135ae15bd0391cf284eab37e6e3ee0a6ee63d2ceeb659862bd8d0a984ca6"
dependencies = [
"anyhow",
"arc-swap",
"chrono",
"derivative",
"flate2",
"fnv",
"humantime",
"libc",
"log",
"log-mdc",
"once_cell",
"parking_lot 0.12.3",
"rand 0.8.5",
"serde",
"serde-value",
"serde_json",
"serde_yaml",
"thiserror",
"thread-id",
"typemap-ors",
"winapi",
]
[[package]]
name = "log_entry_sync"
version = "0.1.0"
@ -4432,6 +4539,8 @@ dependencies = [
"futures-core",
"futures-util",
"jsonrpsee",
"lazy_static",
"metrics",
"serde_json",
"shared_types",
"storage",
@ -4511,6 +4620,25 @@ dependencies = [
"tiny-keccak",
]
[[package]]
name = "metrics"
version = "0.1.0"
source = "git+https://github.com/Conflux-Chain/conflux-rust.git?rev=992ebc5483d937c8f6b883e266f8ed2a67a7fa9a#992ebc5483d937c8f6b883e266f8ed2a67a7fa9a"
dependencies = [
"chrono",
"futures",
"influx_db_client",
"lazy_static",
"log",
"log4rs",
"parking_lot 0.11.2",
"rand 0.7.3",
"serde",
"time 0.1.45",
"timer",
"tokio",
]
[[package]]
name = "mime"
version = "0.3.17"
@ -4692,6 +4820,23 @@ dependencies = [
"unsigned-varint",
]
[[package]]
name = "native-tls"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466"
dependencies = [
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "netlink-packet-core"
version = "0.4.2"
@ -4989,18 +5134,65 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "openssl"
version = "0.10.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1"
dependencies = [
"bitflags 2.6.0",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.68",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "option-ext"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "ordered-float"
version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c"
dependencies = [
"num-traits",
]
[[package]]
name = "ordered-multimap"
version = "0.4.3"
@ -6141,10 +6333,12 @@ dependencies = [
"http-body",
"hyper",
"hyper-rustls 0.24.2",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite 0.2.14",
@ -6156,6 +6350,7 @@ dependencies = [
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-rustls 0.24.1",
"tower-service",
"url",
@ -6291,6 +6486,7 @@ dependencies = [
"file_location_cache",
"futures",
"lazy_static",
"metrics",
"miner",
"network",
"pruner",
@ -6319,6 +6515,7 @@ dependencies = [
"jsonrpsee",
"merkle_light",
"merkle_tree",
"metrics",
"miner",
"network",
"serde",
@ -6670,6 +6867,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde-value"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c"
dependencies = [
"ordered-float",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.203"
@ -6713,6 +6920,19 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_yaml"
version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [
"indexmap 2.2.6",
"itoa",
"ryu",
"serde",
"unsafe-libyaml",
]
[[package]]
name = "sha-1"
version = "0.9.8"
@ -6844,7 +7064,7 @@ dependencies = [
"num-bigint",
"num-traits",
"thiserror",
"time",
"time 0.3.36",
]
[[package]]
@ -7164,9 +7384,11 @@ dependencies = [
"duration-str",
"eth2_ssz",
"file_location_cache",
"lazy_static",
"libp2p",
"log_entry_sync",
"merkle_light",
"metrics",
"network",
"rand 0.8.5",
"serde",
@ -7309,6 +7531,16 @@ dependencies = [
"syn 2.0.68",
]
[[package]]
name = "thread-id"
version = "4.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe8f25bbdd100db7e1d34acf7fd2dc59c4bf8f7483f505eaa7d4f12f76cc0ea"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "thread_local"
version = "1.1.8"
@ -7329,6 +7561,17 @@ dependencies = [
"libc",
]
[[package]]
name = "time"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]]
name = "time"
version = "0.3.36"
@ -7360,6 +7603,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "timer"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31d42176308937165701f50638db1c31586f183f1aab416268216577aec7306b"
dependencies = [
"chrono",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
@ -7434,6 +7686,16 @@ dependencies = [
"syn 2.0.68",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
@ -7592,7 +7854,7 @@ checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
dependencies = [
"crossbeam-channel",
"thiserror",
"time",
"time 0.3.36",
"tracing-subscriber",
]
@ -7705,7 +7967,7 @@ dependencies = [
"radix_trie",
"rand 0.8.5",
"thiserror",
"time",
"time 0.3.36",
"tokio",
"trust-dns-proto 0.20.4",
]
@ -7806,6 +8068,15 @@ dependencies = [
"utf-8",
]
[[package]]
name = "typemap-ors"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a68c24b707f02dd18f1e4ccceb9d49f2058c2fb86384ef9972592904d7a28867"
dependencies = [
"unsafe-any-ors",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -7898,6 +8169,21 @@ dependencies = [
"subtle",
]
[[package]]
name = "unsafe-any-ors"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a303d30665362d9680d7d91d78b23f5f899504d4f08b3c4cf08d055d87c0ad"
dependencies = [
"destructure_traitobject",
]
[[package]]
name = "unsafe-libyaml"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "unsigned-varint"
version = "0.7.1"
@ -8015,6 +8301,12 @@ version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@ -8552,12 +8844,14 @@ dependencies = [
"duration-str",
"error-chain",
"ethereum-types 0.14.1",
"ethers",
"exit-future",
"file_location_cache",
"futures",
"itertools 0.10.5",
"libp2p",
"log_entry_sync",
"metrics",
"miner",
"network",
"pruner",
@ -8618,7 +8912,7 @@ dependencies = [
"hmac 0.12.1",
"pbkdf2 0.11.0",
"sha1",
"time",
"time 0.3.36",
"zstd",
]

View File

@ -27,6 +27,9 @@ members = [
]
resolver = "2"
[workspace.dependencies]
metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" }
[patch.crates-io]
discv5 = { path = "version-meld/discv5" }
eth2_ssz = { path = "version-meld/eth2_ssz" }

View File

@ -5,3 +5,4 @@ edition = "2021"
[dependencies]
tokio = { version = "1.19.2", features = ["sync", "time"] }
metrics = { workspace = true }

View File

@ -1,7 +1,9 @@
use crate::error::Error;
use crate::metrics::unbounded_channel;
use metrics::{Counter, CounterUsize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use tokio::time::timeout;
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
@ -19,20 +21,30 @@ pub struct Channel<N, Req, Res> {
}
impl<N, Req, Res> Channel<N, Req, Res> {
pub fn unbounded() -> (Sender<N, Req, Res>, Receiver<N, Req, Res>) {
let (sender, receiver) = mpsc::unbounded_channel();
(Sender { chan: sender }, Receiver { chan: receiver })
pub fn unbounded(name: &str) -> (Sender<N, Req, Res>, Receiver<N, Req, Res>) {
let metrics_group = format!("common_channel_{}", name);
let (sender, receiver) = unbounded_channel(metrics_group.as_str());
let metrics_timeout = CounterUsize::register_with_group(metrics_group.as_str(), "timeout");
(
Sender {
chan: sender,
metrics_timeout,
},
receiver,
)
}
}
pub struct Sender<N, Req, Res> {
chan: mpsc::UnboundedSender<Message<N, Req, Res>>,
chan: crate::metrics::Sender<Message<N, Req, Res>>,
metrics_timeout: Arc<dyn Counter<usize>>,
}
impl<N, Req, Res> Clone for Sender<N, Req, Res> {
fn clone(&self) -> Self {
Sender {
chan: self.chan.clone(),
metrics_timeout: self.metrics_timeout.clone(),
}
}
}
@ -53,24 +65,15 @@ impl<N, Req, Res> Sender<N, Req, Res> {
timeout(DEFAULT_REQUEST_TIMEOUT, receiver)
.await
.map_err(|_| Error::TimeoutError)?
.map_err(|_| {
self.metrics_timeout.inc(1);
Error::TimeoutError
})?
.map_err(|e| Error::RecvError(e))
}
}
pub struct Receiver<N, Req, Res> {
chan: mpsc::UnboundedReceiver<Message<N, Req, Res>>,
}
impl<N, Req, Res> Receiver<N, Req, Res> {
pub async fn recv(&mut self) -> Option<Message<N, Req, Res>> {
self.chan.recv().await
}
pub fn try_recv(&mut self) -> Result<Message<N, Req, Res>, TryRecvError> {
self.chan.try_recv()
}
}
pub type Receiver<N, Req, Res> = crate::metrics::Receiver<Message<N, Req, Res>>;
#[cfg(test)]
mod tests {
@ -91,7 +94,7 @@ mod tests {
#[tokio::test]
async fn request_response() {
let (tx, mut rx) = Channel::<Notification, Request, Response>::unbounded();
let (tx, mut rx) = Channel::<Notification, Request, Response>::unbounded("test");
let task1 = async move {
match rx.recv().await.expect("not dropped") {

View File

@ -1,5 +1,6 @@
mod channel;
pub mod error;
pub mod metrics;
pub mod test_util;
pub use crate::channel::{Channel, Message, Receiver, ResponseSender, Sender};

View File

@ -0,0 +1,112 @@
use std::{fmt::Debug, sync::Arc, time::Instant};
use metrics::{register_meter_with_group, Counter, CounterUsize, Histogram, Meter, Sample};
use tokio::sync::mpsc::{
error::{SendError, TryRecvError},
unbounded_channel as new_unbounded_channel, UnboundedReceiver, UnboundedSender,
};
pub fn unbounded_channel<T>(metric_name: &str) -> (Sender<T>, Receiver<T>) {
let (sender, receiver) = new_unbounded_channel();
let metrics_queued = CounterUsize::register_with_group(metric_name, "size");
(
Sender::new(sender, metric_name, metrics_queued.clone()),
Receiver::new(receiver, metric_name, metrics_queued),
)
}
pub struct Sender<T> {
sender: UnboundedSender<(Instant, T)>,
metrics_send_qps: Arc<dyn Meter>,
metrics_queued: Arc<dyn Counter<usize>>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
metrics_send_qps: self.metrics_send_qps.clone(),
metrics_queued: self.metrics_queued.clone(),
}
}
}
impl<T> Debug for Sender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.sender)
}
}
impl<T> Sender<T> {
pub(crate) fn new(
sender: UnboundedSender<(Instant, T)>,
metrics_group: &str,
metrics_queued: Arc<dyn Counter<usize>>,
) -> Self {
Self {
sender,
metrics_send_qps: register_meter_with_group(metrics_group, "send"),
metrics_queued,
}
}
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.sender.send((Instant::now(), value)) {
Ok(()) => {
self.metrics_send_qps.mark(1);
self.metrics_queued.inc(1);
Ok(())
}
Err(e) => Err(SendError(e.0 .1)),
}
}
}
pub struct Receiver<T> {
receiver: UnboundedReceiver<(Instant, T)>,
metrics_recv_qps: Arc<dyn Meter>,
metrics_queued: Arc<dyn Counter<usize>>,
metrics_queue_latency: Arc<dyn Histogram>,
}
impl<T> Debug for Receiver<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.receiver)
}
}
impl<T> Receiver<T> {
pub(crate) fn new(
receiver: UnboundedReceiver<(Instant, T)>,
metrics_group: &str,
metrics_queued: Arc<dyn Counter<usize>>,
) -> Self {
Self {
receiver,
metrics_recv_qps: register_meter_with_group(metrics_group, "recv"),
metrics_queued,
metrics_queue_latency: Sample::ExpDecay(0.015).register_with_group(
metrics_group,
"latency",
1024,
),
}
}
fn on_recv(&self, value: (Instant, T)) -> T {
self.metrics_recv_qps.mark(1);
self.metrics_queued.dec(1);
self.metrics_queue_latency.update_since(value.0);
value.1
}
pub async fn recv(&mut self) -> Option<T> {
let value = self.receiver.recv().await?;
Some(self.on_recv(value))
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let value = self.receiver.try_recv()?;
Ok(self.on_recv(value))
}
}

View File

@ -37,6 +37,8 @@ serde = { version = "1.0.137", features = ["derive"] }
duration-str = "0.5.1"
config = "0.13.1"
public-ip = "0.2"
ethers = "2.0.14"
metrics = { workspace = true }
[dependencies.libp2p]
version = "0.45.1"

View File

@ -22,3 +22,5 @@ contract-interface = { path = "../../common/contract-interface" }
futures-core = "0.3.28"
futures-util = "0.3.28"
thiserror = "1.0.44"
lazy_static = "1.4.0"
metrics = { workspace = true }

View File

@ -0,0 +1,7 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
}

View File

@ -11,7 +11,7 @@ use std::collections::BTreeMap;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::broadcast;
@ -358,7 +358,11 @@ impl LogSyncManager {
}
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
if let Err(e) = self.store.put_tx(tx.clone()) {
let start_time = Instant::now();
let result = self.store.put_tx(tx.clone());
metrics::STORE_PUT_TX.update_since(start_time);
if let Err(e) = result {
error!("put_tx error: e={:?}", e);
false
} else {
@ -458,3 +462,4 @@ pub(crate) mod config;
mod data_cache;
mod log_entry_fetcher;
mod log_query;
mod metrics;

View File

@ -1,11 +1,16 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use ethereum_types::{Address, H256, U256};
use ethers::core::k256::SecretKey;
use ethers::middleware::SignerMiddleware;
use ethers::providers::Http;
use ethers::providers::HttpRateLimitRetryPolicy;
use ethers::providers::Middleware;
use ethers::providers::Provider;
use ethers::providers::RetryClient;
use ethers::providers::RetryClientBuilder;
use ethers::signers::LocalWallet;
use ethers::signers::Signer;
use storage::config::ShardConfig;
@ -21,9 +26,12 @@ pub struct MinerConfig {
pub(crate) iter_batch: usize,
pub(crate) shard_config: ShardConfig,
pub(crate) context_query_interval: Duration,
pub(crate) rate_limit_retries: u32,
pub(crate) timeout_retries: u32,
pub(crate) initial_backoff: u64,
}
pub type MineServiceMiddleware = SignerMiddleware<Provider<Http>, LocalWallet>;
pub type MineServiceMiddleware = SignerMiddleware<Arc<Provider<RetryClient<Http>>>, LocalWallet>;
impl MinerConfig {
#[allow(clippy::too_many_arguments)]
@ -38,6 +46,9 @@ impl MinerConfig {
iter_batch: usize,
context_query_seconds: u64,
shard_config: ShardConfig,
rate_limit_retries: u32,
timeout_retries: u32,
initial_backoff: u64,
) -> Option<MinerConfig> {
miner_key.map(|miner_key| MinerConfig {
miner_id,
@ -50,12 +61,24 @@ impl MinerConfig {
iter_batch,
shard_config,
context_query_interval: Duration::from_secs(context_query_seconds),
rate_limit_retries,
timeout_retries,
initial_backoff,
})
}
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = Provider::<Http>::try_from(&self.rpc_endpoint_url)
.map_err(|e| format!("Can not parse blockchain endpoint: {:?}", e))?;
let provider = Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(self.rate_limit_retries)
.timeout_retries(self.timeout_retries)
.initial_backoff(Duration::from_millis(self.initial_backoff))
.build(
Http::from_str(&self.rpc_endpoint_url)
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
Box::new(HttpRateLimitRetryPolicy),
),
));
let chain_id = provider
.get_chainid()
.await
@ -64,6 +87,7 @@ impl MinerConfig {
.map_err(|e| format!("Cannot parse private key: {:?}", e))?;
let signer = LocalWallet::from(secret_key).with_chain_id(chain_id.as_u64());
let middleware = SignerMiddleware::new(provider, signer);
Ok(middleware)
}
}

View File

@ -11,6 +11,7 @@ use libp2p::gossipsub::{
use libp2p::Multiaddr;
use serde_derive::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use shared_types::NetworkIdentity;
use std::path::PathBuf;
use std::time::Duration;
@ -122,6 +123,9 @@ pub struct Config {
/// Whether metrics are enabled.
pub metrics_enabled: bool,
/// The id of the storage network.
pub network_id: NetworkIdentity,
}
impl Default for Config {
@ -199,6 +203,7 @@ impl Default for Config {
shutdown_after_sync: false,
topics: Vec::new(),
metrics_enabled: false,
network_id: Default::default(),
}
}
}

View File

@ -25,6 +25,7 @@ pub mod types;
pub use config::gossip_max_size;
use std::net::SocketAddr;
use std::time::Instant;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use shared_types::TxID;
@ -97,8 +98,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)]
pub enum RequestId {
Router,
Sync(SyncId),
Router(Instant),
Sync(Instant, SyncId),
}
#[derive(Debug, Clone, Copy)]

View File

@ -391,7 +391,9 @@ mod tests {
use std::io::Write;
fn status_message() -> StatusMessage {
StatusMessage { data: 1 }
StatusMessage {
data: Default::default(),
}
}
fn ping_message() -> Ping {
@ -560,7 +562,10 @@ mod tests {
assert_eq!(stream_identifier.len(), 10);
// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
let status_message_bytes = StatusMessage { data: 1 }.as_ssz_bytes();
let status_message_bytes = StatusMessage {
data: Default::default(),
}
.as_ssz_bytes();
let mut uvi_codec: Uvi<usize> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);

View File

@ -9,7 +9,7 @@ use ssz_types::{
use std::ops::Deref;
use strum::IntoStaticStr;
pub type Hash256 = ethereum_types::H256;
use shared_types::{ChunkArrayWithProof, TxID};
use shared_types::{ChunkArrayWithProof, NetworkIdentity, TxID};
pub use ssz_types::{typenum, typenum::Unsigned, BitList, BitVector, FixedVector};
@ -71,7 +71,7 @@ impl ToString for ErrorType {
/// The STATUS request/response handshake message.
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
pub struct StatusMessage {
pub data: u64,
pub data: NetworkIdentity,
}
/// The PING request/response message.

View File

@ -84,6 +84,7 @@ impl<AppReqId: ReqId> Service<AppReqId> {
.iter()
.map(|x| PeerId::from(x.clone()))
.collect(),
config.network_id.clone(),
));
// try and construct UPnP port mappings if required.

View File

@ -4,6 +4,7 @@ use crate::Client;
use crate::EnrExt;
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use shared_types::NetworkIdentity;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU16, Ordering};
@ -22,10 +23,19 @@ pub struct NetworkGlobals {
pub peers: RwLock<PeerDB>,
/// The current gossipsub topic subscriptions.
pub gossipsub_subscriptions: RwLock<HashSet<GossipTopic>>,
/// The id of the storage network.
pub network_id: RwLock<NetworkIdentity>,
}
impl NetworkGlobals {
pub fn new(enr: Enr, tcp_port: u16, udp_port: u16, trusted_peers: Vec<PeerId>) -> Self {
pub fn new(
enr: Enr,
tcp_port: u16,
udp_port: u16,
trusted_peers: Vec<PeerId>,
network_id: NetworkIdentity,
) -> Self {
NetworkGlobals {
local_enr: RwLock::new(enr.clone()),
peer_id: RwLock::new(enr.peer_id()),
@ -34,6 +44,7 @@ impl NetworkGlobals {
listen_port_udp: AtomicU16::new(udp_port),
peers: RwLock::new(PeerDB::new(trusted_peers)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
network_id: RwLock::new(network_id),
}
}
@ -63,6 +74,10 @@ impl NetworkGlobals {
self.listen_port_udp.load(Ordering::Relaxed)
}
pub fn network_id(&self) -> NetworkIdentity {
self.network_id.read().clone()
}
/// Returns the number of libp2p connected peers.
pub fn connected_peers(&self) -> usize {
self.peers.read().connected_peer_ids().count()
@ -95,6 +110,6 @@ impl NetworkGlobals {
let enr_key: discv5::enr::CombinedKey =
discv5::enr::CombinedKey::from_libp2p(&keypair).unwrap();
let enr = discv5::enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
NetworkGlobals::new(enr, 9000, 9000, vec![])
NetworkGlobals::new(enr, 9000, 9000, vec![], Default::default())
}
}

View File

@ -23,10 +23,14 @@ fn test_status_rpc() {
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt)).await;
// Dummy STATUS RPC message
let rpc_request = Request::Status(StatusMessage { data: 2 });
let rpc_request = Request::Status(StatusMessage {
data: Default::default(),
});
// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage { data: 3 });
let rpc_response = Response::Status(StatusMessage {
data: Default::default(),
});
// build the sender future
let sender_future = async {

View File

@ -1,11 +1,13 @@
use anyhow::{anyhow, bail, Result};
use anyhow::{bail, Result};
use contract_interface::ChunkLinearReward;
use ethereum_types::Address;
use ethers::prelude::{Http, Provider};
use ethers::providers::{HttpRateLimitRetryPolicy, RetryClient, RetryClientBuilder};
use miner::MinerMessage;
use rand::Rng;
use std::cmp::Ordering;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
@ -34,17 +36,16 @@ pub struct PrunerConfig {
pub rpc_endpoint_url: String,
pub reward_address: Address,
pub rate_limit_retries: u32,
pub timeout_retries: u32,
pub initial_backoff: u64,
}
impl PrunerConfig {
fn start_prune_size(&self) -> u64 {
(self.max_num_sectors as f32 * PRUNE_THRESHOLD) as u64
}
fn make_provider(&self) -> Result<Provider<Http>> {
Provider::<Http>::try_from(&self.rpc_endpoint_url)
.map_err(|e| anyhow!("Can not parse blockchain endpoint: {:?}", e))
}
}
pub struct Pruner {
@ -57,7 +58,7 @@ pub struct Pruner {
sender: mpsc::UnboundedSender<PrunerMessage>,
miner_sender: Option<broadcast::Sender<MinerMessage>>,
reward_contract: ChunkLinearReward<Provider<Http>>,
reward_contract: ChunkLinearReward<Arc<Provider<RetryClient<Http>>>>,
}
impl Pruner {
@ -73,8 +74,18 @@ impl Pruner {
let (first_rewardable_chunk, first_tx_seq) = get_first_rewardable_chunk(store.as_ref())
.await?
.unwrap_or((0, 0));
let reward_contract =
ChunkLinearReward::new(config.reward_address, Arc::new(config.make_provider()?));
let provider = Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(config.rate_limit_retries)
.timeout_retries(config.timeout_retries)
.initial_backoff(Duration::from_millis(config.initial_backoff))
.build(
Http::from_str(&config.rpc_endpoint_url)?,
Box::new(HttpRateLimitRetryPolicy),
),
));
let reward_contract = ChunkLinearReward::new(config.reward_address, Arc::new(provider));
let (tx, rx) = mpsc::unbounded_channel();
let pruner = Pruner {
config,

View File

@ -24,6 +24,7 @@ rand = "0.8.5"
serde = { version = "1.0.137", features = ["derive"] }
duration-str = "0.5.1"
public-ip = "0.2"
metrics = { workspace = true }
[dev-dependencies]
channel = { path = "../../common/channel" }

View File

@ -2,6 +2,7 @@
extern crate tracing;
mod libp2p_event_handler;
mod metrics;
mod peer_manager;
mod service;
@ -23,6 +24,7 @@ pub struct Config {
pub max_idle_outgoing_peers: usize,
pub libp2p_nodes: Vec<Multiaddr>,
pub private_ip_enabled: bool,
pub check_announced_ip: bool,
}
impl Default for Config {
@ -34,6 +36,7 @@ impl Default for Config {
max_idle_outgoing_peers: 20,
libp2p_nodes: vec![],
private_ip_enabled: false,
check_announced_ip: false,
}
}
}

View File

@ -1,11 +1,11 @@
use std::net::IpAddr;
use std::time::Instant;
use std::{ops::Neg, sync::Arc};
use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use network::multiaddr::Protocol;
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
use network::Multiaddr;
use network::{
rpc::StatusMessage,
types::{
@ -15,29 +15,37 @@ use network::{
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
PublicKey, PubsubMessage, Request, RequestId, Response,
};
use shared_types::{bytes_to_chunks, timestamp_now, TxID};
use network::{Multiaddr, PeerAction, ReportSource};
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
use storage::config::ShardConfig;
use storage_async::Store;
use sync::{SyncMessage, SyncSender};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, RwLock};
use crate::metrics;
use crate::peer_manager::PeerManager;
use crate::Config;
lazy_static::lazy_static! {
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(2);
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(2);
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(2);
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(5);
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref TOLERABLE_DRIFT: chrono::Duration = chrono::Duration::seconds(10);
}
#[allow(deprecated)]
fn duration_since(timestamp: u32) -> chrono::Duration {
fn duration_since(timestamp: u32, metric: Arc<dyn ::metrics::Histogram>) -> chrono::Duration {
let timestamp = i64::from(timestamp);
let timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp, 0).expect("should fit");
let now = chrono::Utc::now().naive_utc();
now.signed_duration_since(timestamp)
let timestamp = chrono::DateTime::from_timestamp(timestamp, 0).expect("should fit");
let now = chrono::Utc::now();
let duration = now.signed_duration_since(timestamp);
let num_secs = duration.num_seconds();
if num_secs > 0 {
metric.update(num_secs as u64);
}
duration
}
fn peer_id_to_public_key(peer_id: &PeerId) -> Result<PublicKey, String> {
@ -139,14 +147,18 @@ impl Libp2pEventHandler {
}
pub fn send_status(&self, peer_id: PeerId) {
let status_message = StatusMessage { data: 123 }; // dummy status message
let status_message = StatusMessage {
data: self.network_globals.network_id(),
};
debug!(%peer_id, ?status_message, "Sending Status request");
self.send_to_network(NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Router,
request_id: RequestId::Router(Instant::now()),
request: Request::Status(status_message),
});
metrics::LIBP2P_SEND_STATUS.mark(1);
}
pub async fn on_peer_connected(&self, peer_id: PeerId, outgoing: bool) {
@ -155,12 +167,16 @@ impl Libp2pEventHandler {
if outgoing {
self.send_status(peer_id);
self.send_to_sync(SyncMessage::PeerConnected { peer_id });
metrics::LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING.mark(1);
} else {
metrics::LIBP2P_HANDLE_PEER_CONNECTED_INCOMING.mark(1);
}
}
pub async fn on_peer_disconnected(&self, peer_id: PeerId) {
self.peers.write().await.remove(&peer_id);
self.send_to_sync(SyncMessage::PeerDisconnected { peer_id });
metrics::LIBP2P_HANDLE_PEER_DISCONNECTED.mark(1);
}
pub async fn on_rpc_request(
@ -174,6 +190,7 @@ impl Libp2pEventHandler {
match request {
Request::Status(status) => {
self.on_status_request(peer_id, request_id, status);
metrics::LIBP2P_HANDLE_REQUEST_STATUS.mark(1);
}
Request::GetChunks(request) => {
self.send_to_sync(SyncMessage::RequestChunks {
@ -181,6 +198,7 @@ impl Libp2pEventHandler {
request_id,
request,
});
metrics::LIBP2P_HANDLE_REQUEST_GET_CHUNKS.mark(1);
}
Request::DataByHash(_) => {
// ignore
@ -191,7 +209,10 @@ impl Libp2pEventHandler {
fn on_status_request(&self, peer_id: PeerId, request_id: PeerRequestId, status: StatusMessage) {
debug!(%peer_id, ?status, "Received Status request");
let status_message = StatusMessage { data: 456 }; // dummy status message
let network_id = self.network_globals.network_id();
let status_message = StatusMessage {
data: network_id.clone(),
};
debug!(%peer_id, ?status_message, "Sending Status response");
self.send_to_network(NetworkMessage::SendResponse {
@ -199,6 +220,12 @@ impl Libp2pEventHandler {
id: request_id,
response: Response::Status(status_message),
});
self.on_status_message(peer_id, status, network_id);
}
fn on_status_response(&self, peer_id: PeerId, status: StatusMessage) {
let network_id = self.network_globals.network_id();
self.on_status_message(peer_id, status, network_id);
}
pub async fn on_rpc_response(
@ -212,10 +239,22 @@ impl Libp2pEventHandler {
match response {
Response::Status(status_message) => {
debug!(%peer_id, ?status_message, "Received Status response");
match request_id {
RequestId::Router(since) => {
metrics::LIBP2P_HANDLE_RESPONSE_STATUS.mark(1);
metrics::LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY.update_since(since);
}
_ => unreachable!("All status response belong to router"),
}
self.on_status_response(peer_id, status_message);
}
Response::Chunks(response) => {
let request_id = match request_id {
RequestId::Sync(sync_id) => sync_id,
RequestId::Sync(since, sync_id) => {
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS.mark(1);
metrics::LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY.update_since(since);
sync_id
}
_ => unreachable!("All Chunks responses belong to sync"),
};
@ -235,12 +274,16 @@ impl Libp2pEventHandler {
self.peers.write().await.update(&peer_id);
// Check if the failed RPC belongs to sync
if let RequestId::Sync(request_id) = request_id {
if let RequestId::Sync(since, request_id) = request_id {
self.send_to_sync(SyncMessage::RpcError {
peer_id,
request_id,
});
metrics::LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY.update_since(since);
}
metrics::LIBP2P_HANDLE_RESPONSE_ERROR.mark(1);
}
pub async fn on_pubsub_message(
@ -254,11 +297,24 @@ impl Libp2pEventHandler {
match message {
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
PubsubMessage::FindFile(msg) => self.on_find_file(msg).await,
PubsubMessage::FindChunks(msg) => self.on_find_chunks(msg).await,
PubsubMessage::AnnounceFile(msg) => self.on_announce_file(propagation_source, msg),
PubsubMessage::AnnounceChunks(msg) => self.on_announce_chunks(propagation_source, msg),
PubsubMessage::FindFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
self.on_find_file(msg).await
}
PubsubMessage::FindChunks(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
self.on_find_chunks(msg).await
}
PubsubMessage::AnnounceFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE.mark(1);
self.on_announce_file(propagation_source, msg)
}
PubsubMessage::AnnounceChunks(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS.mark(1);
self.on_announce_chunks(propagation_source, msg)
}
PubsubMessage::AnnounceShardConfig(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD.mark(1);
self.on_announce_shard_config(propagation_source, msg)
}
}
@ -389,9 +445,12 @@ impl Libp2pEventHandler {
let FindFile { tx_id, timestamp } = msg;
// verify timestamp
let d = duration_since(timestamp);
let d = duration_since(
timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_FILE.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
debug!(%timestamp, "Invalid timestamp, ignoring FindFile message");
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
return MessageAcceptance::Ignore;
}
@ -399,7 +458,7 @@ impl Libp2pEventHandler {
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
if tx.id() == tx_id {
debug!(?tx_id, "Found file locally, responding to FindFile query");
trace!(?tx_id, "Found file locally, responding to FindFile query");
return match self.construct_announce_file_message(tx_id).await {
Some(msg) => {
@ -415,7 +474,7 @@ impl Libp2pEventHandler {
// try from cache
if let Some(mut msg) = self.file_location_cache.get_one(tx_id) {
debug!(?tx_id, "Found file in cache, responding to FindFile query");
trace!(?tx_id, "Found file in cache, responding to FindFile query");
msg.resend_timestamp = timestamp_now();
self.publish(PubsubMessage::AnnounceFile(msg));
@ -467,9 +526,12 @@ impl Libp2pEventHandler {
}
// verify timestamp
let d = duration_since(msg.timestamp);
let d = duration_since(
msg.timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_CHUNKS.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
debug!(%msg.timestamp, "Invalid timestamp, ignoring FindFile message");
debug!(%msg.timestamp, ?d, "Invalid timestamp, ignoring FindChunks message");
return MessageAcceptance::Ignore;
}
@ -503,7 +565,7 @@ impl Libp2pEventHandler {
_ => return MessageAcceptance::Accept,
};
debug!(?msg, "Found chunks to respond FindChunks message");
trace!(?msg, "Found chunks to respond FindChunks message");
match self
.construct_announce_chunks_message(msg.tx_id, msg.index_start, msg.index_end)
@ -535,10 +597,14 @@ impl Libp2pEventHandler {
None => return false,
};
metrics::LIBP2P_VERIFY_ANNOUNCED_IP.mark(1);
let seen_ips: Vec<IpAddr> = match self.network_globals.peers.read().peer_info(peer_id) {
Some(v) => v.seen_ip_addresses().collect(),
None => {
debug!(%announced_ip, "Failed to verify announced IP address, no peer info found");
// ignore file announcement from un-seen peers
trace!(%announced_ip, "Failed to verify announced IP address, no peer info found");
metrics::LIBP2P_VERIFY_ANNOUNCED_IP_UNSEEN.mark(1);
return false;
}
};
@ -546,7 +612,9 @@ impl Libp2pEventHandler {
if seen_ips.iter().any(|x| *x == announced_ip) {
true
} else {
debug!(%announced_ip, ?seen_ips, "Failed to verify announced IP address, mismatch with seen ips");
// ignore file announcement if announced IP and seen IP mismatch
trace!(%announced_ip, ?seen_ips, "Failed to verify announced IP address, mismatch with seen ips");
metrics::LIBP2P_VERIFY_ANNOUNCED_IP_MISMATCH.mark(1);
false
}
}
@ -568,14 +636,20 @@ impl Libp2pEventHandler {
}
// verify announced ip address if required
if !self.config.private_ip_enabled && !self.verify_announced_address(&msg.peer_id, &addr) {
if !self.config.private_ip_enabled
&& self.config.check_announced_ip
&& !self.verify_announced_address(&msg.peer_id, &addr)
{
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since(msg.resend_timestamp);
let d = duration_since(
msg.resend_timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_FILE.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
debug!(%msg.resend_timestamp, "Invalid resend timestamp, ignoring AnnounceFile message");
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceFile message");
return MessageAcceptance::Ignore;
}
@ -609,14 +683,20 @@ impl Libp2pEventHandler {
}
// verify announced ip address if required
if !self.config.private_ip_enabled && !self.verify_announced_address(&msg.peer_id, &addr) {
if !self.config.private_ip_enabled
&& self.config.check_announced_ip
&& !self.verify_announced_address(&msg.peer_id, &addr)
{
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since(msg.resend_timestamp);
let d = duration_since(
msg.resend_timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_SHARD.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_SHARD_CONFIG_TIMEOUT {
debug!(%msg.resend_timestamp, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceShardConfig message");
return MessageAcceptance::Ignore;
}
@ -655,14 +735,20 @@ impl Libp2pEventHandler {
}
// verify announced ip address if required
if !self.config.private_ip_enabled && !self.verify_announced_address(&msg.peer_id, &addr) {
if !self.config.private_ip_enabled
&& self.config.check_announced_ip
&& !self.verify_announced_address(&msg.peer_id, &addr)
{
return MessageAcceptance::Reject;
}
// propagate gossip to peers
let d = duration_since(msg.resend_timestamp);
let d = duration_since(
msg.resend_timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_CHUNKS.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *ANNOUNCE_FILE_TIMEOUT {
debug!(%msg.resend_timestamp, "Invalid resend timestamp, ignoring AnnounceChunks message");
debug!(%msg.resend_timestamp, ?d, "Invalid resend timestamp, ignoring AnnounceChunks message");
return MessageAcceptance::Ignore;
}
@ -671,6 +757,23 @@ impl Libp2pEventHandler {
MessageAcceptance::Accept
}
fn on_status_message(
&self,
peer_id: PeerId,
status: StatusMessage,
network_id: NetworkIdentity,
) {
if status.data != network_id {
warn!(%peer_id, ?network_id, ?status.data, "Report peer with incompatible network id");
self.send_to_network(NetworkMessage::ReportPeer {
peer_id,
action: PeerAction::Fatal,
source: ReportSource::Gossipsub,
msg: "Incompatible network id in StatusMessage",
})
}
}
}
#[cfg(test)]
@ -723,7 +826,7 @@ mod tests {
let runtime = TestRuntime::default();
let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded();
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let store = LogManager::memorydb(LogConfig::default()).unwrap();
Self {
@ -762,7 +865,8 @@ mod tests {
let keypair = Keypair::generate_secp256k1();
let enr_key = CombinedKey::from_libp2p(&keypair).unwrap();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let network_globals = NetworkGlobals::new(enr, 30000, 30000, vec![]);
let network_globals =
NetworkGlobals::new(enr, 30000, 30000, vec![], Default::default());
let listen_addr: Multiaddr = "/ip4/127.0.0.1/tcp/30000".parse().unwrap();
network_globals.listen_multiaddrs.write().push(listen_addr);
@ -779,7 +883,7 @@ mod tests {
}) => {
assert_eq!(peer_id, expected_peer_id);
assert!(matches!(request, Request::Status(..)));
assert!(matches!(request_id, RequestId::Router))
assert!(matches!(request_id, RequestId::Router(..)))
}
Ok(_) => panic!("Unexpected network message type received"),
Err(e) => panic!("No network message received: {:?}", e),
@ -876,7 +980,9 @@ mod tests {
let alice = PeerId::random();
let req_id = (ConnectionId::new(4), SubstreamId(12));
let request = Request::Status(StatusMessage { data: 412 });
let request = Request::Status(StatusMessage {
data: Default::default(),
});
handler.on_rpc_request(alice, req_id, request).await;
match ctx.network_recv.try_recv() {
@ -943,7 +1049,7 @@ mod tests {
handler
.on_rpc_response(
alice,
RequestId::Sync(SyncId::SerialSync { tx_id: id }),
RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
Response::Chunks(data.clone()),
)
.await;
@ -971,7 +1077,10 @@ mod tests {
let alice = PeerId::random();
let id = TxID::random_hash(555);
handler
.on_rpc_error(alice, RequestId::Sync(SyncId::SerialSync { tx_id: id }))
.on_rpc_error(
alice,
RequestId::Sync(Instant::now(), SyncId::SerialSync { tx_id: id }),
)
.await;
match ctx.sync_recv.try_recv() {

View File

@ -0,0 +1,54 @@
use std::sync::Arc;
use metrics::{register_meter, register_meter_with_group, Histogram, Meter, Sample};
lazy_static::lazy_static! {
// service
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE: Arc<dyn Meter> = register_meter("router_service_route_network_message");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_SEND_REQUEST: Arc<dyn Meter> = register_meter("router_service_route_network_message_send_request");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_SEND_RESPONSE: Arc<dyn Meter> = register_meter("router_service_route_network_message_send_response");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_SEND_ERROR_RESPONSE: Arc<dyn Meter> = register_meter("router_service_route_network_message_send_error_response");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH: Arc<dyn Meter> = register_meter("router_service_route_network_message_publish");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_report_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER: Arc<dyn Meter> = register_meter("router_service_route_network_message_goodbye_peer");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "all");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "already");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "ok");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL: Arc<dyn Meter> = register_meter_with_group("router_service_route_network_message_dail_peer", "fail");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE: Arc<dyn Meter> = register_meter("router_service_route_network_message_announce_local_file");
pub static ref SERVICE_ROUTE_NETWORK_MESSAGE_UPNP: Arc<dyn Meter> = register_meter("router_service_route_network_message_upnp");
pub static ref SERVICE_EXPIRED_PEERS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers", 1024);
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_OK: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_ok", 1024);
pub static ref SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_service_expired_peers_disconnect_fail", 1024);
// libp2p_event_handler
pub static ref LIBP2P_SEND_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_send_status");
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_OUTGOING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "outgoing");
pub static ref LIBP2P_HANDLE_PEER_CONNECTED_INCOMING: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_peer_connected", "incoming");
pub static ref LIBP2P_HANDLE_PEER_DISCONNECTED: Arc<dyn Meter> = register_meter("router_libp2p_handle_peer_disconnected");
pub static ref LIBP2P_HANDLE_REQUEST_STATUS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_status");
pub static ref LIBP2P_HANDLE_REQUEST_GET_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_request_get_chunks");
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_status", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_STATUS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_status", "latency", 1024);
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_get_chunks", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_GET_CHUNKS_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_get_chunks", "latency", 1024);
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_file");
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_find_chunks");
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_FILE: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_file");
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_CHUNKS: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_chunks");
pub static ref LIBP2P_HANDLE_PUBSUB_ANNOUNCE_SHARD: Arc<dyn Meter> = register_meter("router_libp2p_handle_pubsub_announce_shard");
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_FILE: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_find_file", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_FIND_CHUNKS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_find_chunks", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_FILE: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_file", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_CHUNKS: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_chunks", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_LATENCY_ANNOUNCE_SHARD: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("router_libp2p_handle_pubsub_latency_announce_shard", 1024);
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip");
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_UNSEEN: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip_unseen");
pub static ref LIBP2P_VERIFY_ANNOUNCED_IP_MISMATCH: Arc<dyn Meter> = register_meter("router_libp2p_verify_announced_ip_mismatch");
}

View File

@ -1,3 +1,4 @@
use crate::metrics;
use crate::Config;
use crate::{libp2p_event_handler::Libp2pEventHandler, peer_manager::PeerManager};
use chunk_pool::ChunkPoolMessage;
@ -224,6 +225,8 @@ impl RouterService {
) {
trace!(?msg, "Received new message");
metrics::SERVICE_ROUTE_NETWORK_MESSAGE.mark(1);
match msg {
NetworkMessage::SendRequest {
peer_id,
@ -231,6 +234,7 @@ impl RouterService {
request_id,
} => {
self.libp2p.send_request(peer_id, request_id, request);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_SEND_REQUEST.mark(1);
}
NetworkMessage::SendResponse {
peer_id,
@ -238,6 +242,7 @@ impl RouterService {
id,
} => {
self.libp2p.send_response(peer_id, id, response);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_SEND_RESPONSE.mark(1);
}
NetworkMessage::SendErrorResponse {
peer_id,
@ -246,6 +251,7 @@ impl RouterService {
reason,
} => {
self.libp2p.respond_with_error(peer_id, id, error, reason);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_SEND_ERROR_RESPONSE.mark(1);
}
NetworkMessage::Publish { messages } => {
if self.libp2p.swarm.connected_peers().next().is_none() {
@ -257,7 +263,7 @@ impl RouterService {
break;
}
Err(err) => {
debug!(address = %multiaddr, error = ?err, "Could not connect to peer")
debug!(address = %multiaddr, error = ?err, "Could not connect to peer");
}
};
}
@ -275,29 +281,44 @@ impl RouterService {
"Sending pubsub messages",
);
self.libp2p.swarm.behaviour_mut().publish(messages);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_PUBLISH.mark(1);
}
NetworkMessage::ReportPeer {
peer_id,
action,
source,
msg,
} => self.libp2p.report_peer(&peer_id, action, source, msg),
} => {
self.libp2p.report_peer(&peer_id, action, source, msg);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_REPORT_PEER.mark(1);
}
NetworkMessage::GoodbyePeer {
peer_id,
reason,
source,
} => self.libp2p.goodbye_peer(&peer_id, reason, source),
} => {
self.libp2p.goodbye_peer(&peer_id, reason, source);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_GOODBYE_PEER.mark(1);
}
NetworkMessage::DialPeer { address, peer_id } => {
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER.mark(1);
if self.libp2p.swarm.is_connected(&peer_id) {
self.libp2p_event_handler
.send_to_sync(SyncMessage::PeerConnected { peer_id });
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_ALREADY.mark(1);
} else {
match Swarm::dial(&mut self.libp2p.swarm, address.clone()) {
Ok(()) => debug!(%address, "Dialing libp2p peer"),
Ok(()) => {
debug!(%address, "Dialing libp2p peer");
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_OK.mark(1);
}
Err(err) => {
info!(%address, error = ?err, "Failed to dial peer");
self.libp2p_event_handler
.send_to_sync(SyncMessage::DailFailed { peer_id, err });
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_DAIL_PEER_NEW_FAIL.mark(1);
}
};
}
@ -309,12 +330,14 @@ impl RouterService {
.await
{
self.libp2p_event_handler.publish(msg);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
}
}
NetworkMessage::UPnPMappingEstablished {
tcp_socket,
udp_socket,
} => {
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_UPNP.mark(1);
self.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port()));
// If there is an external TCP port update, modify our local ENR.
if let Some(tcp_socket) = tcp_socket {
@ -362,16 +385,30 @@ impl RouterService {
async fn on_heartbeat(&mut self) {
let expired_peers = self.peers.write().await.expired_peers();
trace!("heartbeat, expired peers = {:?}", expired_peers.len());
let num_expired_peers = expired_peers.len() as u64;
metrics::SERVICE_EXPIRED_PEERS.update(num_expired_peers);
if num_expired_peers > 0 {
debug!(%num_expired_peers, "Heartbeat, remove expired peers")
}
let mut num_succeeded = 0;
let mut num_failed = 0;
for peer_id in expired_peers {
// async operation, once peer disconnected, swarm event `PeerDisconnected`
// will be polled to handle in advance.
match self.libp2p.swarm.disconnect_peer_id(peer_id) {
Ok(_) => debug!(%peer_id, "Peer expired and disconnect it"),
Err(_) => error!(%peer_id, "Peer expired but failed to disconnect"),
Ok(_) => {
debug!(%peer_id, "Peer expired and disconnect it");
num_succeeded += 1;
}
Err(_) => {
debug!(%peer_id, "Peer expired but failed to disconnect");
num_failed += 1;
}
}
}
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_OK.update(num_succeeded);
metrics::SERVICE_EXPIRED_PEERS_DISCONNECT_FAIL.update(num_failed);
}
}

View File

@ -26,3 +26,4 @@ storage-async = { path = "../storage-async" }
merkle_light = { path = "../../common/merkle_light" }
merkle_tree = { path = "../../common/merkle_tree"}
futures-channel = "^0.3"
metrics = { workspace = true }

View File

@ -1,7 +1,7 @@
use crate::types::{LocationInfo, NetworkInfo, PeerInfo};
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use sync::{FileSyncInfo, SyncServiceState};
#[rpc(server, client, namespace = "admin")]
@ -48,4 +48,10 @@ pub trait Rpc {
tx_seq: u64,
all_shards: bool,
) -> RpcResult<Option<Vec<LocationInfo>>>;
#[method(name = "getMetrics")]
async fn get_metrics(
&self,
maybe_prefix: Option<String>,
) -> RpcResult<BTreeMap<String, String>>;
}

View File

@ -4,8 +4,9 @@ use crate::{error, Context};
use futures::prelude::*;
use jsonrpsee::core::async_trait;
use jsonrpsee::core::RpcResult;
use metrics::{DEFAULT_GROUPING_REGISTRY, DEFAULT_REGISTRY};
use network::{multiaddr::Protocol, Multiaddr};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::net::IpAddr;
use storage::config::all_shards_available;
use sync::{FileSyncInfo, SyncRequest, SyncResponse, SyncServiceState};
@ -246,4 +247,40 @@ impl RpcServer for RpcServerImpl {
Ok(None)
}
}
async fn get_metrics(
&self,
maybe_prefix: Option<String>,
) -> RpcResult<BTreeMap<String, String>> {
let mut result = BTreeMap::new();
for (name, metric) in DEFAULT_REGISTRY.read().get_all() {
match &maybe_prefix {
Some(prefix) if !name.starts_with(prefix) => {}
_ => {
result.insert(
name.clone(),
format!("{} {}", metric.get_type(), metric.get_value()),
);
}
}
}
for (group_name, metrics) in DEFAULT_GROUPING_REGISTRY.read().get_all() {
for (metric_name, metric) in metrics.iter() {
let name = format!("{}.{}", group_name, metric_name);
match &maybe_prefix {
Some(prefix) if !name.starts_with(prefix) => {}
_ => {
result.insert(
name,
format!("{} {}", metric.get_type(), metric.get_value()),
);
}
}
}
}
Ok(result)
}
}

View File

@ -4,7 +4,7 @@ use anyhow::{anyhow, bail, Error};
use append_merkle::{
AppendMerkleTree, Proof as RawProof, RangeProof as RawRangeProof, Sha3Algorithm,
};
use ethereum_types::{H256, U256};
use ethereum_types::{Address, H256, U256};
use merkle_light::merkle::MerkleTree;
use merkle_light::proof::Proof as RawFileProof;
use merkle_light::{hash::Algorithm, merkle::next_pow2};
@ -366,3 +366,14 @@ impl TryFrom<FileProof> for FlowProof {
}
}
}
#[derive(
DeriveEncode, DeriveDecode, Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize,
)]
pub struct NetworkIdentity {
/// The chain id of the blockchain network.
pub chain_id: u64,
/// The address of the deployed Flow contract on the blockchain.
pub flow_address: Address,
}

View File

@ -3,6 +3,7 @@ use clap::{arg, command, Command};
pub fn cli_app<'a>() -> Command<'a> {
command!()
.arg(arg!(-c --config <FILE> "Sets a custom config file"))
.arg(arg!(--"log-config-file" [FILE] "Sets log configuration file (Default: log_config)"))
.arg(arg!(--"miner-key" [KEY] "Sets miner private key (Default: None)"))
.arg(
arg!(--"blockchain-rpc-endpoint" [URL] "Sets blockchain RPC endpoint (Default: http://127.0.0.1:8545)")

View File

@ -2,11 +2,13 @@
use crate::ZgsConfig;
use ethereum_types::{H256, U256};
use ethers::prelude::{Http, Middleware, Provider};
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
use miner::MinerConfig;
use network::NetworkConfig;
use pruner::PrunerConfig;
use rpc::RPCConfig;
use shared_types::NetworkIdentity;
use std::net::IpAddr;
use std::time::Duration;
use storage::config::ShardConfig;
@ -25,6 +27,21 @@ impl ZgsConfig {
network_config.libp2p_port = self.network_libp2p_port;
network_config.disable_discovery = self.network_disable_discovery;
network_config.discovery_port = self.network_discovery_port;
let flow_address = self
.log_contract_address
.parse::<ContractAddress>()
.map_err(|e| format!("Unable to parse log_contract_address: {:?}", e))?;
let provider = Provider::<Http>::try_from(&self.blockchain_rpc_endpoint)
.map_err(|e| format!("Can not parse blockchain endpoint: {:?}", e))?;
let chain_id = provider
.get_chainid()
.await
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
.as_u64();
network_config.network_id = NetworkIdentity {
chain_id,
flow_address,
};
if !self.network_disable_discovery {
network_config.enr_tcp_port = Some(self.network_enr_tcp_port);
@ -183,6 +200,9 @@ impl ZgsConfig {
iter_batch,
context_query_seconds,
shard_config,
self.rate_limit_retries,
self.timeout_retries,
self.initial_backoff,
))
}
@ -218,6 +238,9 @@ impl ZgsConfig {
batch_wait_time: Duration::from_millis(self.prune_batch_wait_time_ms),
rpc_endpoint_url: self.blockchain_rpc_endpoint.clone(),
reward_address,
rate_limit_retries: self.rate_limit_retries,
timeout_retries: self.timeout_retries,
initial_backoff: self.initial_backoff,
}))
} else {
Ok(None)

View File

@ -98,6 +98,9 @@ pub struct ZgsConfig {
// file location cache config, configured by [file_location_cache] section by `config` crate.
pub file_location_cache: file_location_cache::Config,
// metrics config, configured by [metrics] section by `config` crate.
pub metrics: metrics::MetricsConfiguration,
}
impl Deref for ZgsConfig {

View File

@ -60,6 +60,7 @@ fn main() -> Result<(), Box<dyn Error>> {
// CLI, config, and logs
let matches = cli::cli_app().get_matches();
let config = ZgsConfig::parse(&matches)?;
metrics::initialize(config.metrics.clone());
log::configure(
&config.log_config_file,
&config.log_directory,

View File

@ -609,9 +609,9 @@ impl LogManager {
.get_tx_by_seq_number(last_tx_seq)?
.expect("tx missing");
let mut current_len = initial_data.leaves();
let expected_len = (last_tx.start_entry_index + last_tx.num_entries() as u64)
/ PORA_CHUNK_SIZE as u64;
match expected_len.cmp(&(current_len as u64)) {
let expected_len =
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
match expected_len.cmp(&(current_len)) {
Ordering::Less => {
bail!(
"Unexpected DB: merkle tree larger than the known data size,\
@ -634,10 +634,9 @@ impl LogManager {
let previous_tx = tx_store
.get_tx_by_seq_number(last_tx_seq - 1)?
.expect("tx missing");
let expected_len = ((previous_tx.start_entry_index
+ previous_tx.num_entries() as u64)
/ PORA_CHUNK_SIZE as u64)
as usize;
let expected_len = sector_to_segment(
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
);
if current_len > expected_len {
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
{
@ -737,13 +736,13 @@ impl LogManager {
maybe_tx_seq: Option<u64>,
) -> Result<FlowProof> {
let merkle = self.merkle.read_recursive();
let chunk_index = flow_index / PORA_CHUNK_SIZE as u64;
let seg_index = sector_to_segment(flow_index);
let top_proof = match maybe_tx_seq {
None => merkle.pora_chunks_merkle.gen_proof(chunk_index as usize)?,
None => merkle.pora_chunks_merkle.gen_proof(seg_index)?,
Some(tx_seq) => merkle
.pora_chunks_merkle
.at_version(tx_seq)?
.gen_proof(chunk_index as usize)?,
.gen_proof(seg_index)?,
};
// TODO(zz): Maybe we can decide that all proofs are at the PoRA chunk level, so
@ -753,11 +752,11 @@ impl LogManager {
// and `flow_index` must be within a complete PoRA chunk. For possible future usages,
// we'll need to find the flow length at the given root and load a partial chunk
// if `flow_index` is in the last chunk.
let sub_proof = if chunk_index as usize != merkle.pora_chunks_merkle.leaves() - 1
let sub_proof = if seg_index != merkle.pora_chunks_merkle.leaves() - 1
|| merkle.last_chunk_merkle.leaves() == 0
{
self.flow_store
.gen_proof_in_batch(chunk_index as usize, flow_index as usize % PORA_CHUNK_SIZE)?
.gen_proof_in_batch(seg_index, flow_index as usize % PORA_CHUNK_SIZE)?
} else {
match maybe_tx_seq {
None => merkle
@ -1236,3 +1235,11 @@ pub fn tx_subtree_root_list_padded(data: &[u8]) -> Vec<(usize, DataRoot)> {
root_list
}
pub fn sector_to_segment(sector_index: u64) -> usize {
(sector_index / PORA_CHUNK_SIZE as u64) as usize
}
pub fn segment_to_sector(segment_index: usize) -> usize {
segment_index * PORA_CHUNK_SIZE
}

View File

@ -20,6 +20,8 @@ tracing = "0.1.35"
eth2_ssz = "0.4.0"
serde = { version = "1.0.137", features = ["derive"] }
duration-str = "0.5.1"
lazy_static = "1.4.0"
metrics = { workspace = true }
[dev-dependencies]
merkle_light = { path = "../../common/merkle_light" }

View File

@ -1,7 +1,7 @@
use crate::{controllers::SyncState, Config, SyncRequest, SyncResponse, SyncSender};
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, fmt::Debug, sync::Arc};
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
use storage_async::Store;
use tokio::sync::RwLock;
@ -15,18 +15,23 @@ pub enum SyncResult {
/// Supports to sync files concurrently.
#[derive(Clone)]
pub struct Batcher {
pub(crate) config: Config,
capacity: usize,
find_peer_timeout: Duration,
tasks: Arc<RwLock<HashSet<u64>>>, // files to sync
store: Store,
sync_send: SyncSender,
}
impl Batcher {
pub fn new(config: Config, capacity: usize, store: Store, sync_send: SyncSender) -> Self {
pub fn new(
capacity: usize,
find_peer_timeout: Duration,
store: Store,
sync_send: SyncSender,
) -> Self {
Self {
config,
capacity,
find_peer_timeout,
tasks: Default::default(),
store,
sync_send,
@ -128,7 +133,7 @@ impl Batcher {
// finding peers timeout
Some(SyncState::FindingPeers { origin, .. })
if origin.elapsed() > self.config.find_peer_timeout =>
if origin.elapsed() > self.find_peer_timeout =>
{
debug!(%tx_seq, "Terminate file sync due to finding peers timeout");
self.terminate_file_sync(tx_seq, false).await;
@ -137,7 +142,7 @@ impl Batcher {
// connecting peers timeout
Some(SyncState::ConnectingPeers { origin, .. })
if origin.elapsed() > self.config.find_peer_timeout =>
if origin.elapsed() > self.find_peer_timeout =>
{
debug!(%tx_seq, "Terminate file sync due to connecting peers timeout");
self.terminate_file_sync(tx_seq, false).await;

View File

@ -1,5 +1,8 @@
use super::{batcher::Batcher, sync_store::SyncStore};
use crate::{auto_sync::batcher::SyncResult, Config, SyncSender};
use crate::{
auto_sync::{batcher::SyncResult, metrics},
Config, SyncSender,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::sync::{
@ -19,6 +22,7 @@ pub struct RandomBatcherState {
#[derive(Clone)]
pub struct RandomBatcher {
config: Config,
batcher: Batcher,
sync_store: Arc<SyncStore>,
}
@ -31,7 +35,13 @@ impl RandomBatcher {
sync_store: Arc<SyncStore>,
) -> Self {
Self {
batcher: Batcher::new(config, config.max_random_workers, store, sync_send),
config,
batcher: Batcher::new(
config.max_random_workers,
config.random_find_peer_timeout,
store,
sync_send,
),
sync_store,
}
}
@ -53,10 +63,16 @@ impl RandomBatcher {
// disable file sync until catched up
if !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase");
sleep(self.batcher.config.auto_sync_idle_interval).await;
sleep(self.config.auto_sync_idle_interval).await;
continue;
}
if let Ok(state) = self.get_state().await {
metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64);
}
match self.sync_once().await {
Ok(true) => {}
Ok(false) => {
@ -64,11 +80,11 @@ impl RandomBatcher {
"File sync still in progress or idle, state = {:?}",
self.get_state().await
);
sleep(self.batcher.config.auto_sync_idle_interval).await;
sleep(self.config.auto_sync_idle_interval).await;
}
Err(err) => {
warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
sleep(self.batcher.config.auto_sync_error_interval).await;
sleep(self.config.auto_sync_error_interval).await;
}
}
}
@ -86,6 +102,11 @@ impl RandomBatcher {
};
debug!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
match sync_result {
SyncResult::Completed => metrics::RANDOM_SYNC_RESULT_COMPLETED.mark(1),
SyncResult::Failed => metrics::RANDOM_SYNC_RESULT_FAILED.inc(1),
SyncResult::Timeout => metrics::RANDOM_SYNC_RESULT_TIMEOUT.inc(1),
}
match sync_result {
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,

View File

@ -2,7 +2,7 @@ use super::{
batcher::{Batcher, SyncResult},
sync_store::SyncStore,
};
use crate::{Config, SyncSender};
use crate::{auto_sync::metrics, Config, SyncSender};
use anyhow::Result;
use log_entry_sync::LogSyncEvent;
use serde::{Deserialize, Serialize};
@ -23,6 +23,7 @@ use tokio::{
/// Supports to sync files in sequence concurrently.
#[derive(Clone)]
pub struct SerialBatcher {
config: Config,
batcher: Batcher,
/// Next tx seq to sync.
@ -80,13 +81,17 @@ impl SerialBatcher {
sync_send: SyncSender,
sync_store: Arc<SyncStore>,
) -> Result<Self> {
let capacity = config.max_sequential_workers;
// continue file sync from break point in db
let (next_tx_seq, max_tx_seq) = sync_store.get_tx_seq_range().await?;
Ok(Self {
batcher: Batcher::new(config, capacity, store, sync_send),
config,
batcher: Batcher::new(
config.max_sequential_workers,
config.sequential_find_peer_timeout,
store,
sync_send,
),
next_tx_seq: Arc::new(AtomicU64::new(next_tx_seq.unwrap_or(0))),
max_tx_seq: Arc::new(AtomicU64::new(max_tx_seq.unwrap_or(u64::MAX))),
pending_completed_txs: Default::default(),
@ -136,10 +141,19 @@ impl SerialBatcher {
// disable file sync until catched up
if !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase");
sleep(self.batcher.config.auto_sync_idle_interval).await;
sleep(self.config.auto_sync_idle_interval).await;
continue;
}
// update metrics
let state = self.get_state().await;
metrics::SEQUENTIAL_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
if state.max != u64::MAX {
metrics::SEQUENTIAL_STATE_GAP_NEXT_MAX.update((state.max - state.next) as usize);
}
metrics::SEQUENTIAL_STATE_TXS_PENDING.update(state.pendings.len() as u64);
metrics::SEQUENTIAL_STATE_GAP_NEXT_DB.update((state.next - state.next_in_db) as usize);
// sync files
match self.sync_once().await {
Ok(true) => {}
@ -148,11 +162,11 @@ impl SerialBatcher {
"File sync still in progress or idle, state = {:?}",
self.get_state().await
);
sleep(self.batcher.config.auto_sync_idle_interval).await;
sleep(self.config.auto_sync_idle_interval).await;
}
Err(err) => {
warn!(%err, "Failed to sync file once, state = {:?}", self.get_state().await);
sleep(self.batcher.config.auto_sync_error_interval).await;
sleep(self.config.auto_sync_error_interval).await;
}
}
}
@ -247,6 +261,12 @@ impl SerialBatcher {
};
info!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
match sync_result {
SyncResult::Completed => metrics::SEQUENTIAL_SYNC_RESULT_COMPLETED.mark(1),
SyncResult::Failed => metrics::SEQUENTIAL_SYNC_RESULT_FAILED.inc(1),
SyncResult::Timeout => metrics::SEQUENTIAL_SYNC_RESULT_TIMEOUT.inc(1),
}
self.pending_completed_txs
.write()
.await

View File

@ -21,6 +21,7 @@ pub struct AutoSyncManager {
pub serial: SerialBatcher,
pub random: RandomBatcher,
pub file_announcement_send: UnboundedSender<u64>,
pub catched_up: Arc<AtomicBool>,
}
impl AutoSyncManager {
@ -52,11 +53,12 @@ impl AutoSyncManager {
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
// handle on catched up notification
let catched_up_cloned = catched_up.clone();
executor.spawn(
async move {
if catch_up_end_recv.await.is_ok() {
info!("log entry catched up");
catched_up.store(true, Ordering::Relaxed);
catched_up_cloned.store(true, Ordering::Relaxed);
}
},
"auto_sync_wait_for_catchup",
@ -66,6 +68,7 @@ impl AutoSyncManager {
serial,
random,
file_announcement_send: send,
catched_up,
})
}
}

View File

@ -0,0 +1,24 @@
use std::sync::Arc;
use metrics::{register_meter, Counter, CounterUsize, Gauge, GaugeUsize, Histogram, Meter, Sample};
lazy_static::lazy_static! {
// sequential auto sync
pub static ref SEQUENTIAL_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_sequential_state_txs_syncing", 1024);
pub static ref SEQUENTIAL_STATE_GAP_NEXT_MAX: Arc<dyn Gauge<usize>> = GaugeUsize::register("sync_auto_sequential_state_gap_next_max");
pub static ref SEQUENTIAL_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_sequential_state_txs_pending", 1024);
pub static ref SEQUENTIAL_STATE_GAP_NEXT_DB: Arc<dyn Gauge<usize>> = GaugeUsize::register("sync_auto_sequential_state_gap_next_db");
pub static ref SEQUENTIAL_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_sequential_sync_result_completed");
pub static ref SEQUENTIAL_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_failed");
pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout");
// random auto sync
pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024);
pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);
pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed");
pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed");
pub static ref RANDOM_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_timeout");
}

View File

@ -2,5 +2,6 @@ mod batcher;
pub mod batcher_random;
pub mod batcher_serial;
pub mod manager;
mod metrics;
pub mod sync_store;
mod tx_store;

View File

@ -0,0 +1,11 @@
use std::sync::Arc;
use metrics::{register_timer, Counter, CounterUsize, Histogram, Sample, Timer};
lazy_static::lazy_static! {
pub static ref SERIAL_SYNC_FILE_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_file_completed");
pub static ref SERIAL_SYNC_CHUNKS_COMPLETED: Arc<dyn Timer> = register_timer("sync_controllers_serial_sync_chunks_completed");
pub static ref SERIAL_SYNC_SEGMENT_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_controllers_serial_sync_segment_latency", 1024);
pub static ref SERIAL_SYNC_SEGMENT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_segment_timeout");
pub static ref SERIAL_SYNC_UNEXPECTED_ERRORS: Arc<dyn Counter<usize>> = CounterUsize::register("sync_controllers_serial_sync_unexpected_errors");
}

View File

@ -1,3 +1,4 @@
mod metrics;
mod peers;
mod serial;
@ -17,10 +18,12 @@ pub struct FileSyncGoal {
pub index_start: u64,
/// Chunk index to sync to (exclusive).
pub index_end: u64,
/// `true` if we are syncing all the needed data of this file.
pub all_chunks: bool,
}
impl FileSyncGoal {
pub fn new(num_chunks: u64, index_start: u64, index_end: u64) -> Self {
pub fn new(num_chunks: u64, index_start: u64, index_end: u64, all_chunks: bool) -> Self {
assert!(
index_start < index_end && index_end <= num_chunks,
"invalid index_end"
@ -29,15 +32,16 @@ impl FileSyncGoal {
num_chunks,
index_start,
index_end,
all_chunks,
}
}
pub fn new_file(num_chunks: u64) -> Self {
Self::new(num_chunks, 0, num_chunks)
Self::new(num_chunks, 0, num_chunks, true)
}
pub fn is_all_chunks(&self) -> bool {
self.index_start == 0 && self.index_end == self.num_chunks
self.all_chunks
}
}

View File

@ -1,6 +1,6 @@
use crate::context::SyncNetworkContext;
use crate::controllers::peers::{PeerState, SyncPeers};
use crate::controllers::{FileSyncGoal, FileSyncInfo};
use crate::controllers::{metrics, FileSyncGoal, FileSyncInfo};
use crate::{Config, InstantWrapper};
use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError;
@ -12,7 +12,7 @@ use network::{
use rand::Rng;
use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
use std::{sync::Arc, time::Instant};
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage_async::Store;
#[derive(Clone, Debug, PartialEq, Eq)]
@ -139,7 +139,7 @@ impl SerialSyncController {
if let Some((start, end)) = maybe_range {
// Sync new chunks regardless of previously downloaded file or chunks.
// It's up to client to avoid duplicated chunks sync.
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end);
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end, false);
self.next_chunk = start;
} else if self.goal.is_all_chunks() {
// retry the failed file sync at break point
@ -258,7 +258,8 @@ impl SerialSyncController {
// request next chunk array
let from_chunk = self.next_chunk;
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id });
let request_id =
network::RequestId::Sync(Instant::now(), RequestId::SerialSync { tx_id: self.tx_id });
// TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed.
// We can wait for its commitment, but this will slow down this state machine.
// Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new
@ -311,15 +312,15 @@ impl SerialSyncController {
.peers
.add_new_peer_with_config(peer_id, addr.clone(), shard_config)
{
info!(%self.tx_seq, %peer_id, %addr, "Found new peer");
debug!(%self.tx_seq, %peer_id, %addr, "Found new peer");
true
} else {
// e.g. multiple `AnnounceFile` messages propagated
debug!(%self.tx_seq, %peer_id, %addr, "Found an existing peer");
trace!(%self.tx_seq, %peer_id, %addr, "Found an existing peer");
false
}
} else {
debug!(%self.tx_seq, %peer_id, %addr, "Found peer without shard config");
info!(%self.tx_seq, %peer_id, %addr, "Found peer without shard config");
false
}
}
@ -406,7 +407,6 @@ impl SerialSyncController {
}
pub async fn on_response(&mut self, from_peer_id: PeerId, response: ChunkArrayWithProof) {
debug!(%self.tx_seq, %from_peer_id, "Received RPC response");
if self.handle_on_response_mismatch(from_peer_id) {
return;
}
@ -429,6 +429,7 @@ impl SerialSyncController {
let data_len = response.chunks.data.len();
if data_len == 0 || data_len % CHUNK_SIZE > 0 {
warn!(%from_peer_id, %self.tx_seq, %data_len, "Invalid chunk response data length");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.ban_peer(from_peer_id, "Invalid chunk response data length");
self.state = SyncState::Idle;
return;
@ -466,6 +467,7 @@ impl SerialSyncController {
}
Err(err) => {
warn!(%err, %self.tx_seq, "Failed to validate chunks response");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.ban_peer(from_peer_id, "Chunk array validation failed");
self.state = SyncState::Idle;
return;
@ -474,11 +476,13 @@ impl SerialSyncController {
self.failures = 0;
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
let shard_config = self.store.get_store().flow().get_shard_config();
let next_chunk = shard_config.next_segment_index(
(from_chunk / PORA_CHUNK_SIZE as u64) as usize,
(self.tx_start_chunk_in_flow / PORA_CHUNK_SIZE as u64) as usize,
) * PORA_CHUNK_SIZE;
let next_chunk = segment_to_sector(shard_config.next_segment_index(
sector_to_segment(from_chunk),
sector_to_segment(self.tx_start_chunk_in_flow),
));
// store in db
match self
.store
@ -488,6 +492,7 @@ impl SerialSyncController {
Ok(true) => self.next_chunk = next_chunk as u64,
Ok(false) => {
warn!(%self.tx_seq, ?self.tx_id, "Transaction reverted while storing chunks");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed {
reason: FailureReason::TxReverted(self.tx_id),
};
@ -495,6 +500,7 @@ impl SerialSyncController {
}
Err(err) => {
error!(%err, %self.tx_seq, "Unexpected DB error while storing chunks");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed {
reason: FailureReason::DBError(err.to_string()),
};
@ -511,6 +517,7 @@ impl SerialSyncController {
// completed to download chunks
if !self.goal.is_all_chunks() {
self.state = SyncState::Completed;
metrics::SERIAL_SYNC_CHUNKS_COMPLETED.update_since(self.since.0);
return;
}
@ -523,15 +530,18 @@ impl SerialSyncController {
Ok(true) => {
info!(%self.tx_seq, "Succeeded to finalize file");
self.state = SyncState::Completed;
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
}
Ok(false) => {
warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed {
reason: FailureReason::TxReverted(self.tx_id),
};
}
Err(err) => {
error!(%err, %self.tx_seq, "Unexpected error during finalize_tx");
metrics::SERIAL_SYNC_UNEXPECTED_ERRORS.inc(1);
self.state = SyncState::Failed {
reason: FailureReason::DBError(err.to_string()),
};
@ -566,12 +576,11 @@ impl SerialSyncController {
/// Randomly select a `Connected` peer to sync chunks.
fn select_peer_for_request(&self, request: &GetChunksRequest) -> Option<PeerId> {
let segment_index =
(request.index_start + self.tx_start_chunk_in_flow) / PORA_CHUNK_SIZE as u64;
let segment_index = sector_to_segment(request.index_start + self.tx_start_chunk_in_flow);
let mut peers = self.peers.filter_peers(vec![PeerState::Connected]);
peers.retain(|peer_id| match self.peers.shard_config(peer_id) {
Some(v) => v.in_range(segment_index),
Some(v) => v.in_range(segment_index as u64),
None => false,
});
@ -675,6 +684,7 @@ impl SerialSyncController {
debug!(%self.tx_seq, "No peer to continue downloading and try to find other peers to download");
self.state = SyncState::Idle;
} else if since.elapsed() >= self.config.peer_chunks_download_timeout {
metrics::SERIAL_SYNC_SEGMENT_TIMEOUT.inc(1);
self.handle_response_failure(peer_id, "RPC timeout");
} else {
completed = true;
@ -874,7 +884,7 @@ mod tests {
);
match request_id {
network::RequestId::Sync(sync_id) => match sync_id {
network::RequestId::Sync(_, sync_id) => match sync_id {
network::SyncId::SerialSync { tx_id } => {
assert_eq!(tx_id, controller.tx_id);
}

View File

@ -52,7 +52,9 @@ pub struct Config {
pub max_sequential_workers: usize,
pub max_random_workers: usize,
#[serde(deserialize_with = "deserialize_duration")]
pub find_peer_timeout: Duration,
pub sequential_find_peer_timeout: Duration,
#[serde(deserialize_with = "deserialize_duration")]
pub random_find_peer_timeout: Duration,
}
impl Default for Config {
@ -61,26 +63,27 @@ impl Default for Config {
// sync service config
heartbeat_interval: Duration::from_secs(5),
auto_sync_enabled: false,
max_sync_files: 16,
max_sync_files: 32,
sync_file_by_rpc_enabled: true,
sync_file_on_announcement_enabled: false,
// serial sync config
max_chunks_to_request: 2 * 1024,
max_request_failures: 5,
peer_connect_timeout: Duration::from_secs(5),
peer_disconnect_timeout: Duration::from_secs(5),
peer_find_timeout: Duration::from_secs(5),
peer_chunks_download_timeout: Duration::from_secs(5),
peer_connect_timeout: Duration::from_secs(15),
peer_disconnect_timeout: Duration::from_secs(15),
peer_find_timeout: Duration::from_secs(30),
peer_chunks_download_timeout: Duration::from_secs(15),
peer_wait_outgoing_connection_timeout: Duration::from_secs(10),
peer_next_chunks_request_wait_timeout: Duration::from_secs(3),
// auto sync config
auto_sync_idle_interval: Duration::from_secs(3),
auto_sync_error_interval: Duration::from_secs(10),
max_sequential_workers: 8,
max_random_workers: 4,
find_peer_timeout: Duration::from_secs(10),
max_sequential_workers: 24,
max_random_workers: 8,
sequential_find_peer_timeout: Duration::from_secs(60),
random_find_peer_timeout: Duration::from_secs(500),
}
}
}
@ -110,6 +113,7 @@ impl InstantWrapper {
#[serde(rename_all = "camelCase")]
pub struct SyncServiceState {
pub num_syncing: usize,
pub catched_up: Option<bool>,
pub auto_sync_serial: Option<SerialBatcherState>,
pub auto_sync_random: Option<RandomBatcherState>,
}

View File

@ -4,7 +4,7 @@ use crate::controllers::{
FailureReason, FileSyncGoal, FileSyncInfo, SerialSyncController, SyncState,
};
use crate::{Config, SyncServiceState};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent;
@ -14,13 +14,16 @@ use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
PeerRequestId, SyncId as RequestId,
};
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID};
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
use std::sync::atomic::Ordering;
use std::{
cmp,
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use storage::config::ShardConfig;
use storage::error::Result as StorageResult;
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage::log_store::Store as LogStore;
use storage_async::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
@ -156,7 +159,7 @@ impl SyncService {
event_recv: broadcast::Receiver<LogSyncEvent>,
catch_up_end_recv: oneshot::Receiver<()>,
) -> Result<SyncSender> {
let (sync_send, sync_recv) = channel::Channel::unbounded();
let (sync_send, sync_recv) = channel::Channel::unbounded("sync");
let store = Store::new(store, executor.clone());
// init auto sync
@ -275,11 +278,13 @@ impl SyncService {
let state = match &self.auto_sync_manager {
Some(manager) => SyncServiceState {
num_syncing: self.controllers.len(),
catched_up: Some(manager.catched_up.load(Ordering::Relaxed)),
auto_sync_serial: Some(manager.serial.get_state().await),
auto_sync_random: manager.random.get_state().await.ok(),
},
None => SyncServiceState {
num_syncing: self.controllers.len(),
catched_up: None,
auto_sync_serial: None,
auto_sync_random: None,
},
@ -630,9 +635,19 @@ impl SyncService {
bail!("File already exists");
}
let (index_start, index_end) = match maybe_range {
Some((start, end)) => (start, end),
None => (0, num_chunks),
let (index_start, index_end, all_chunks) = match maybe_range {
Some((start, end)) => (start, end, false),
None => {
let start = match Self::tx_sync_start_index(&self.store, &tx).await? {
Some(s) => s,
None => {
debug!(%tx.seq, "No more data needed");
self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?;
return Ok(());
}
};
(start, num_chunks, true)
}
};
if index_start >= index_end || index_end > num_chunks {
@ -643,7 +658,7 @@ impl SyncService {
self.config,
tx.id(),
tx.start_entry_index(),
FileSyncGoal::new(num_chunks, index_start, index_end),
FileSyncGoal::new(num_chunks, index_start, index_end, all_chunks),
self.ctx.clone(),
self.store.clone(),
self.file_location_cache.clone(),
@ -749,7 +764,7 @@ impl SyncService {
to_terminate.push(*tx_seq);
}
}
} else {
} else if self.controllers.contains_key(&min_tx_seq) {
to_terminate.push(min_tx_seq);
}
@ -757,9 +772,12 @@ impl SyncService {
self.controllers.remove(tx_seq);
}
debug!(?to_terminate, "File sync terminated");
let num_terminated = to_terminate.len();
if num_terminated > 0 {
debug!(?to_terminate, "File sync terminated");
}
to_terminate.len()
num_terminated
}
fn on_heartbeat(&mut self) {
@ -787,6 +805,35 @@ impl SyncService {
self.controllers.remove(&tx_seq);
}
}
async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
let shard_config = store.get_store().flow().get_shard_config();
let start_segment = sector_to_segment(tx.start_entry_index());
let end =
bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);
let mut start = if shard_config.in_range(start_segment as u64) {
0
} else {
segment_to_sector(shard_config.next_segment_index(0, start_segment))
};
while start < end {
if store
.get_chunks_by_tx_and_index_range(
tx.seq,
start,
cmp::min(start + PORA_CHUNK_SIZE, end),
)
.await?
.is_none()
{
return Ok(Some(start as u64));
}
start = segment_to_sector(
shard_config.next_segment_index(sector_to_segment(start as u64), start_segment),
);
}
Ok(None)
}
}
#[cfg(test)]
@ -906,7 +953,7 @@ mod tests {
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (_, sync_recv) = channel::Channel::unbounded();
let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService {
config: Config::default(),
@ -935,7 +982,7 @@ mod tests {
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (_, sync_recv) = channel::Channel::unbounded();
let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService {
config: Config::default(),
@ -1703,7 +1750,7 @@ mod tests {
};
let sync_id = match request_id {
network::RequestId::Sync(sync_id) => sync_id,
network::RequestId::Sync(_, sync_id) => sync_id,
_ => unreachable!("All Chunks responses belong to sync"),
};

View File

@ -0,0 +1,264 @@
# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
#######################################################################
### Network Config Options ###
#######################################################################
# Data directory where node's keyfile is stored.
# network_dir = "network"
# IP address to listen on.
# network_listen_address = "0.0.0.0"
# The address to broadcast to peers about which address we are listening on. Generally,
# configure public IP address for UDP discovery. If not specified, program will try to
# detect public IP address automatically.
# network_enr_address = ""
# The tcp port to broadcast to peers in order to reach back for libp2p services.
# network_enr_tcp_port = 1234
# The udp port to broadcast to peers in order to reach back for discovery.
# network_enr_udp_port = 1234
# The TCP port that libp2p listens on.
# network_libp2p_port = 1234
# UDP port that discovery listens on.
# network_discovery_port = 1234
# Target number of connected peers.
# network_target_peers = 50
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
# configured as well to enable UDP discovery.
network_boot_nodes = ["/ip4/35.95.5.134/udp/1234/p2p/16Uiu2HAmFGrDV8wKToa1dd8uh6bz8bSY28n33iRP3pvfeBU6ysCw","/ip4/35.84.189.77/udp/1234/p2p/16Uiu2HAmF7t5iuRoWLMvQVfHbbJr5TFgHo2oU1CDxJm56eLdxRAY"]
# List of libp2p nodes to initially connect to.
# network_libp2p_nodes = []
# Indicates if the user has set the network to be in private mode. Currently this
# prevents sending client identifying information over identify.
# network_private = false
# Disables the discovery protocol from starting.
# network_disable_discovery = false
#######################################################################
### UDP Discovery Config Options ###
#######################################################################
# The request timeout for each UDP request.
# discv5_request_timeout_secs = 5
# The timeout after which a `QueryPeer` in an ongoing query is marked unresponsive.
# Unresponsive peers don't count towards the parallelism limits for a query.
# Hence, we may potentially end up making more requests to good peers.
# discv5_query_peer_timeout_secs = 2
# The number of retries for each UDP request.
# discv5_request_retries = 1
# The number of peers to request in parallel in a single query.
# discv5_query_parallelism = 5
# Reports all discovered ENR's when traversing the DHT to the event stream.
# discv5_report_discovered_peers = false
# Disables the incoming packet filter.
# discv5_disable_packet_filter = false
# Disable to limit the number of IP addresses from the same
# /24 subnet in the kbuckets table. This is to mitigate eclipse attacks.
# discv5_disable_ip_limit = false
#######################################################################
### Log Sync Config Options ###
#######################################################################
# RPC endpoint to sync event logs on EVM compatible blockchain.
# blockchain_rpc_endpoint = "http://127.0.0.1:8545"
# Flow contract address to sync event logs.
log_contract_address = "0x0460aA47b41a66694c0a73f667a1b795A5ED3556"
# Block number to sync event logs from blockchain. Generally, this is
# the block number when flow contract deployed.
log_sync_start_block_number = 595059
# Number of blocks to confirm a transaction.
confirmation_block_count = 6
# Maximum number of event logs to poll at a time.
# log_page_size = 999
# Maximum data size to cache in memory (by default, 100MB).
# max_cache_data_size = 104857600
# TTL to cache data in memory.
# cache_tx_seq_ttl = 500
# The number of retries after a RPC request times out.
# rate_limit_retries = 100
# The nubmer of retries for rate limited responses.
# timeout_retries = 100
# The duration to wait before retry, in ms.
# initial_backoff = 500
# The duration between each paginated getLogs RPC call, in ms.
# This is set to avoid triggering the throttling mechanism in the RPC server.
# recover_query_delay = 50
# The counter assumed the finalized block behind the latest block.
# default_finalized_block_count = 100
# Remove finalized block trigger interval.
# remove_finalized_block_interval_minutes = 30
# Watch_loop (eth_getLogs) trigger interval.
# watch_loop_wait_time_ms = 500
#######################################################################
### RPC Config Options ###
#######################################################################
# Whether to provide RPC service.
# rpc_enabled = true
# HTTP server address to bind for public RPC.
# rpc_listen_address = "0.0.0.0:5678"
# HTTP server address to bind for admin and debug RPC.
# rpc_listen_address_admin = "127.0.0.1:5679"
# Maximum data size of RPC request body (by default, 100MB).
# max_request_body_size = 104857600
# Number of chunks for a single segment.
# rpc_chunks_per_segment = 1024
# Maximum file size that allowed to cache in memory (by default, 10MB).
# rpc_max_cache_file_size = 10485760
#######################################################################
### Chunk Pool Config Options ###
#######################################################################
# Maximum number of threads to upload segments of a single file simultaneously.
# chunk_pool_write_window_size = 4
# Maximum data size of cached segment in pool (by default, 4MB).
# chunk_pool_max_cached_chunks_all = 4194304
# Maximum number of threads to upload segments for all files simultaneously.
# chunk_pool_max_writings = 16
# Expiration time to cache uploaded segments in memory.
# chunk_pool_expiration_time_secs = 300
#######################################################################
### DB Config Options ###
#######################################################################
# Directory to store data.
# db_dir = "db"
#######################################################################
### Misc Config Options ###
#######################################################################
# Log configuration file.
# log_config_file = "log_config"
# Log directory.
# log_directory = "log"
#######################################################################
### Mine Config Options ###
#######################################################################
# Mine contract address for incentive.
mine_contract_address = "0x1785c8683b3c527618eFfF78d876d9dCB4b70285"
# Miner key is used to sign blockchain transaction for incentive.
# The value should be a hex string of length 64 without 0x prefix.
#
# Note, the corresponding address should have enough tokens to pay
# transaction gas fee.
# miner_key = ""
#######################################################################
### Sharding Config Options ###
#######################################################################
# The max number of chunk entries to store in db.
# Each entry is 256B, so the db size is roughly limited to
# `256 * db_max_num_sectors` Bytes.
# If this limit is reached, the node will update its `shard_position`
# and store only half data.
#
db_max_num_sectors = 1000000000
# The format is <shard_id>/<shard_number>, where the shard number is 2^n.
# This only applies if there is no stored shard config in db.
# shard_position = "0/2"
reward_contract_address = "0x0496D0817BD8519e0de4894Dc379D35c35275609"
# The time interval to check if we should half `shard_position` to prune data.
#
# prune_check_time_s = 60
# The number of chunk entries to delete in a batch when we prune data.
#
# prune_batch_size = 1024
# The time interval to wait between each prune batch deletion to avoid
# IO resource exhaustion.
#
# prune_batch_wait_time_ms = 1000
#######################################################################
### File Sync Config Options ###
#######################################################################
[sync]
# Enable file sync among peers automatically. When enabled, each node will store
# all files, and sufficient disk space is required.
auto_sync_enabled = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 32
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true
# Enable to start a file sync automatically when a file announcement P2P message received.
# sync_file_on_announcement_enabled = false
# Maximum threads to sync files in sequence.
# max_sequential_workers = 24
# Maximum threads to sync files randomly.
# max_random_workers = 8
#######################################################################
### File Location Cache Options ###
#######################################################################
# [file_location_cache]
# File location cache is a cache that maintains storage positions of files.
# Storage location information is represented by the IP address of the storage node and the timestamp indicating when the node declared that it stores the corresponding file.
# It has both a global capacity limit and a limit on the capacity for location information of each individual file.
# When the cache is full, the storage position information with oldest timestamp will be replaced.
# Global cache capacity.
# max_entries_total = 4096
# Location information capacity for each file.
# max_entries_per_file = 4
# Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600

View File

@ -33,7 +33,7 @@
# List of nodes to bootstrap UDP discovery. Note, `network_enr_address` should be
# configured as well to enable UDP discovery.
network_boot_nodes = ["/ip4/54.219.26.22/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQWu3i1FL1aFYeh9wiQTNu4pDCgps","/ip4/52.52.127.117/udp/1234/p2p/16Uiu2HAkzRjxK2gorngB1Xq84qDrT4hSVznYDHj6BkbaE4SGx9oS","/ip4/18.167.69.68/udp/1234/p2p/16Uiu2HAm2k6ua2mGgvZ8rTMV8GhpW71aVzkQWy7D37TTDuLCpgmX"]
network_boot_nodes = ["/ip4/54.219.26.22/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQWu3i1FL1aFYeh9wiQTNu4pDCgps","/ip4/52.52.127.117/udp/1234/p2p/16Uiu2HAkzRjxK2gorngB1Xq84qDrT4hSVznYDHj6BkbaE4SGx9oS","/ip4/18.162.65.205/udp/1234/p2p/16Uiu2HAm2k6ua2mGgvZ8rTMV8GhpW71aVzkQWy7D37TTDuLCpgmX"]
# List of libp2p nodes to initially connect to.
# network_libp2p_nodes = []
@ -80,14 +80,14 @@ network_boot_nodes = ["/ip4/54.219.26.22/udp/1234/p2p/16Uiu2HAmTVDGNhkHD98zDnJxQ
# blockchain_rpc_endpoint = "http://127.0.0.1:8545"
# Flow contract address to sync event logs.
log_contract_address = "0x8873cc79c5b3b5666535C825205C9a128B1D75F1"
log_contract_address = "0xbD2C3F0E65eDF5582141C35969d66e34629cC768"
# Block number to sync event logs from blockchain. Generally, this is
# the block number when flow contract deployed.
log_sync_start_block_number = 802
log_sync_start_block_number = 595059
# Number of blocks to confirm a transaction.
# confirmation_block_count = 12
confirmation_block_count = 6
# Maximum number of event logs to poll at a time.
# log_page_size = 999
@ -179,11 +179,8 @@ log_sync_start_block_number = 802
### Mine Config Options ###
#######################################################################
# Mine contract address for PoRA.
mine_contract_address = "0x6176AA095C47A7F79deE2ea473B77ebf50035421"
# Reward contract address for incentive.
reward_contract_address = "0x4a62e08198b8B2a791532280bEA976EE3b024d79"
# Mine contract address for incentive.
mine_contract_address = "0x6815F41019255e00D6F34aAB8397a6Af5b6D806f"
# Miner key is used to sign blockchain transaction for incentive.
# The value should be a hex string of length 64 without 0x prefix.
@ -213,12 +210,13 @@ reward_contract_address = "0x4a62e08198b8B2a791532280bEA976EE3b024d79"
# If this limit is reached, the node will update its `shard_position`
# and store only half data.
#
# db_max_num_sectors = 1000000000
db_max_num_sectors = 1000000000
# The format is <shard_id>/<shard_number>, where the shard number is 2^n.
# This only applies if there is no stored shard config in db.
# shard_position = "0/2"
reward_contract_address = "0x51998C4d486F406a788B766d93510980ae1f9360"
# The time interval to check if we should half `shard_position` to prune data.
#
# prune_check_time_s = 60
@ -242,11 +240,7 @@ reward_contract_address = "0x4a62e08198b8B2a791532280bEA976EE3b024d79"
auto_sync_enabled = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 16
# Timeout to terminate a file sync when automatically sync from other peers.
# If timeout, terminated file sync will be triggered later.
# find_peer_timeout = "10s"
# max_sync_files = 32
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true
@ -255,7 +249,28 @@ auto_sync_enabled = true
# sync_file_on_announcement_enabled = false
# Maximum threads to sync files in sequence.
# max_sequential_workers = 8
# max_sequential_workers = 24
# Maximum threads to sync files randomly.
# max_random_workers = 4
# max_random_workers = 8
#######################################################################
### File Location Cache Options ###
#######################################################################
# [file_location_cache]
# File location cache is a cache that maintains storage positions of files.
# Storage location information is represented by the IP address of the storage node and the timestamp indicating when the node declared that it stores the corresponding file.
# It has both a global capacity limit and a limit on the capacity for location information of each individual file.
# When the cache is full, the storage position information with oldest timestamp will be replaced.
# Global cache capacity.
# max_entries_total = 4096
# Location information capacity for each file.
# max_entries_per_file = 4
# Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600

View File

@ -242,11 +242,7 @@
# auto_sync_enabled = false
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 16
# Timeout to terminate a file sync when automatically sync from other peers.
# If timeout, terminated file sync will be triggered later.
# find_peer_timeout = "10s"
# max_sync_files = 32
# Enable to start a file sync via RPC (e.g. `admin_startSyncFile`).
# sync_file_by_rpc_enabled = true
@ -255,10 +251,10 @@
# sync_file_on_announcement_enabled = false
# Maximum threads to sync files in sequence.
# max_sequential_workers = 8
# max_sequential_workers = 24
# Maximum threads to sync files randomly.
# max_random_workers = 4
# max_random_workers = 8
#######################################################################
### File Location Cache Options ###

View File

@ -1 +1 @@
debug,hyper=info,h2=info,rpc=info,discv5=info,router=info,jsonrpsee_http_server=info
info

1
run/log_config_debug Normal file
View File

@ -0,0 +1 @@
debug,hyper=info,h2=info,rpc=info,discv5=info,jsonrpsee_http_server=info

View File

@ -68,7 +68,7 @@ class ZgsNode(TestNode):
os.mkdir(self.data_dir)
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
with open(log_config_path, "w") as f:
f.write("debug,hyper=info,h2=info")
f.write("trace,hyper=info,h2=info")
initialize_toml_config(self.config_file, self.config)

View File

@ -1,6 +1,6 @@
use super::*;
use core::num::NonZeroUsize;
use ethereum_types::{H256, U128, U256};
use ethereum_types::{H160, H256, U128, U256};
use smallvec::SmallVec;
use std::sync::Arc;
@ -277,6 +277,27 @@ impl Decode for H256 {
}
}
impl Decode for H160 {
fn is_ssz_fixed_len() -> bool {
true
}
fn ssz_fixed_len() -> usize {
20
}
fn from_ssz_bytes(bytes: &[u8]) -> Result<Self, DecodeError> {
let len = bytes.len();
let expected = <Self as Decode>::ssz_fixed_len();
if len != expected {
Err(DecodeError::InvalidByteLength { len, expected })
} else {
Ok(H160::from_slice(bytes))
}
}
}
impl Decode for U256 {
fn is_ssz_fixed_len() -> bool {
true

View File

@ -1,6 +1,6 @@
use super::*;
use core::num::NonZeroUsize;
use ethereum_types::{H256, U128, U256};
use ethereum_types::{H160, H256, U128, U256};
use smallvec::SmallVec;
use std::sync::Arc;
@ -323,6 +323,24 @@ impl Encode for H256 {
}
}
impl Encode for H160 {
fn is_ssz_fixed_len() -> bool {
true
}
fn ssz_fixed_len() -> usize {
20
}
fn ssz_bytes_len(&self) -> usize {
20
}
fn ssz_append(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(self.as_bytes());
}
}
impl Encode for U256 {
fn is_ssz_fixed_len() -> bool {
true