Compare commits

...

24 Commits

Author SHA1 Message Date
Peter Zhang
832fa8eab5 add detailed metrics in slow operations 2024-10-31 18:09:54 +08:00
Peter Zhang
924c354acf add detailed metrics in slow operations 2024-10-31 18:09:54 +08:00
Peter Zhang
cc73628c02 add detailed metrics in slow operations 2024-10-31 18:09:54 +08:00
Peter Zhang
7c915196ce add detailed metrics for storage layer 2024-10-31 18:09:54 +08:00
Peter Zhang
8bd2b079e6 add detailed metrics for storage layer 2024-10-31 18:09:54 +08:00
Peter Zhang
9d2303b9b3 add detailed metrics for storage layer 2024-10-31 18:09:54 +08:00
Peter Zhang
f03fdc68e7 add detailed metrics for storage layer 2024-10-31 18:09:54 +08:00
Peter Zhang
f15c27c325 add detailed metrics in slow operations 2024-10-31 18:09:53 +08:00
Peter Zhang
20a3bd8b1d add detailed metrics in slow operations 2024-10-31 18:09:12 +08:00
Peter Zhang
ebfbf65340 format code 2024-10-31 18:08:01 +08:00
Peter Zhang
3653d34048 add detailed metrics in slow operations 2024-10-31 18:07:59 +08:00
boqiu
d326dc87c3 Add comments 2024-10-31 18:03:00 +08:00
boqiu
9474daabde fix random test failure 2024-10-31 18:03:00 +08:00
boqiu
6df5b2aaf9 fmt code 2024-10-31 18:03:00 +08:00
boqiu
e1a9cfde64 Add py test for auto sync v2 2024-10-31 18:03:00 +08:00
boqiu
9fde9a7239 fix unit test failures 2024-10-31 18:03:00 +08:00
boqiu
f9ef93dd9c Mark peer connected if FileAnnouncement RPC message received 2024-10-31 18:03:00 +08:00
boqiu
cc846de1f5 do not propagate FindFile to whole network 2024-10-31 18:03:00 +08:00
boqiu
fb65d257be Disable sequential sync and store new file in v2 sync store 2024-10-31 18:03:00 +08:00
boqiu
8f1fe118ef handle NewFile in sync servic to write in db 2024-10-31 18:03:00 +08:00
boqiu
d9030de505 Add new P2P protocol NewFile 2024-10-31 18:03:00 +08:00
0g-peterzhb
9eea71e97d
separate data db from flow db (#252)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* separate data db from flow db
2024-10-31 15:44:26 +08:00
Bo QIU
bb6e1457b7
Refuse network identity incompatible nodes in UDP discovery layer (#253)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Add python test for UDP discovery

* Refuse nodes with incompatible ENR
2024-10-30 17:26:02 +08:00
peilun-conflux
da2cdec8a1
Remove unused. (#251)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-10-29 23:23:53 +08:00
33 changed files with 524 additions and 342 deletions

3
.gitignore vendored
View File

@ -4,5 +4,6 @@
/.idea
tests/**/__pycache__
tests/tmp/**
tests/config/zgs
.vscode/*.json
/0g-storage-contracts-dev
/0g-storage-contracts-dev

3
Cargo.lock generated
View File

@ -226,6 +226,7 @@ dependencies = [
"itertools 0.13.0",
"lazy_static",
"lru 0.12.5",
"metrics",
"once_cell",
"serde",
"tiny-keccak",
@ -7300,8 +7301,10 @@ dependencies = [
"kvdb",
"kvdb-memorydb",
"kvdb-rocksdb",
"lazy_static",
"merkle_light",
"merkle_tree",
"metrics",
"once_cell",
"parking_lot 0.12.3",
"rand 0.8.5",

View File

@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
tracing = "0.1.36"
once_cell = "1.19.0"
metrics = { workspace = true }
itertools = "0.13.0"
lru = "0.12.5"

View File

@ -1,4 +1,5 @@
mod merkle_tree;
mod metrics;
mod node_manager;
mod proof;
mod sha3;
@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;
use tracing::{trace, warn};
use crate::merkle_tree::MerkleTreeWrite;
@ -145,6 +147,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn append(&mut self, new_leaf: E) {
let start_time = Instant::now();
if new_leaf == E::null() {
// appending null is not allowed.
return;
@ -152,10 +155,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.node_manager.start_transaction();
self.node_manager.push_node(0, new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1);
self.node_manager.commit();
metrics::APPEND.update_since(start_time);
}
pub fn append_list(&mut self, leaf_list: Vec<E>) {
let start_time = Instant::now();
if leaf_list.contains(&E::null()) {
// appending null is not allowed.
return;
@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.node_manager.append_nodes(0, &leaf_list);
self.recompute_after_append_leaves(start_index);
self.node_manager.commit();
metrics::APPEND_LIST.update_since(start_time);
}
/// Append a leaf list by providing their intermediate node hash.
@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Other nodes in the subtree will be set to `null` nodes.
/// TODO: Optimize to avoid storing the `null` nodes?
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
let start_time = Instant::now();
if subtree_root == E::null() {
// appending null is not allowed.
bail!("subtree_root is null");
@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.append_subtree_inner(subtree_depth, subtree_root)?;
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
self.node_manager.commit();
metrics::APPEND_SUBTREE.update_since(start_time);
Ok(())
}
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
let start_time = Instant::now();
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
// appending null is not allowed.
bail!("subtree_list contains null");
@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
}
self.node_manager.commit();
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(())
}
/// Change the value of the last leaf and return the new merkle root.
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
pub fn update_last(&mut self, updated_leaf: E) {
let start_time = Instant::now();
if updated_leaf == E::null() {
// updating to null is not allowed.
return;
@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
self.recompute_after_append_leaves(self.leaves() - 1);
self.node_manager.commit();
metrics::UPDATE_LAST.update_since(start_time);
}
/// Fill an unknown `null` leaf with its real value.

View File

@ -0,0 +1,11 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
}

View File

@ -1,5 +1,5 @@
use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS;
use crate::sync_manager::{metrics, RETRY_WAIT_MS};
use crate::ContractAddress;
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm};
@ -14,15 +14,12 @@ use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::TaskExecutor;
use tokio::{
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
},
time::Instant,
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
};
pub struct LogEntryFetcher {
@ -242,6 +239,7 @@ impl LogEntryFetcher {
);
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await {
let start_time = Instant::now();
match maybe_log {
Ok(log) => {
let sync_progress =
@ -301,6 +299,7 @@ impl LogEntryFetcher {
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
}
}
metrics::RECOVER_LOG.update_since(start_time);
}
info!("log recover end");

View File

@ -1,7 +1,13 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
}

View File

@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
// Each tx has less than 10KB, so the cache size should be acceptable.
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10;
const CHECK_ROOT_INTERVAL: u64 = 500;
/// Errors while handle data
#[derive(Error, Debug)]
@ -408,6 +409,7 @@ impl LogSyncManager {
}
LogFetchProgress::Transaction((tx, block_number)) => {
let mut stop = false;
let start_time = Instant::now();
match self.put_tx(tx.clone()).await {
Some(false) => stop = true,
Some(true) => {
@ -441,6 +443,8 @@ impl LogSyncManager {
// no receivers will be created.
warn!("log sync broadcast error, error={:?}", e);
}
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
}
LogFetchProgress::Reverted(reverted) => {
self.process_reverted(reverted).await;
@ -453,7 +457,6 @@ impl LogSyncManager {
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
let start_time = Instant::now();
let result = self.store.put_tx(tx.clone());
metrics::STORE_PUT_TX.update_since(start_time);
if let Err(e) = result {
error!("put_tx error: e={:?}", e);
@ -513,38 +516,45 @@ impl LogSyncManager {
// Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
}
}
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
}
}
}
metrics::STORE_PUT_TX_SPEED_IN_BYTES
.update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
metrics::STORE_PUT_TX.update_since(start_time);
true
}
}

View File

@ -267,7 +267,7 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
discovery_enabled: !config.disable_discovery,
metrics_enabled: config.metrics_enabled,
target_peer_count: config.target_peers,
..Default::default()
..config.peer_manager
};
let slot_duration = std::time::Duration::from_secs(12);

View File

@ -1,6 +1,5 @@
use crate::peer_manager::peerdb::PeerDBConfig;
use crate::types::GossipKind;
use crate::{Enr, PeerIdSerialized};
use crate::{peer_manager, Enr, PeerIdSerialized};
use directory::{
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
};
@ -128,7 +127,12 @@ pub struct Config {
/// The id of the storage network.
pub network_id: NetworkIdentity,
pub peer_db: PeerDBConfig,
pub peer_db: peer_manager::peerdb::PeerDBConfig,
pub peer_manager: peer_manager::config::Config,
/// Whether to disable network identity in ENR.
/// This is for test purpose only.
pub disable_enr_network_id: bool,
}
impl Default for Config {
@ -208,6 +212,8 @@ impl Default for Config {
metrics_enabled: false,
network_id: Default::default(),
peer_db: Default::default(),
peer_manager: Default::default(),
disable_enr_network_id: false,
}
}
}

View File

@ -1,9 +1,10 @@
//! Helper functions and an extension trait for Ethereum 2 ENRs.
pub use discv5::enr::{CombinedKey, EnrBuilder};
use ssz::Encode;
use super::enr_ext::CombinedKeyExt;
use super::ENR_FILENAME;
use super::enr_ext::{CombinedKeyExt, ENR_CONTENT_KEY_NETWORK_ID};
use super::{EnrExt, ENR_FILENAME};
use crate::types::Enr;
use crate::NetworkConfig;
use discv5::enr::EnrKey;
@ -32,7 +33,9 @@ pub fn use_or_load_enr(
Ok(disk_enr) => {
// if the same node id, then we may need to update our sequence number
if local_enr.node_id() == disk_enr.node_id() {
if compare_enr(local_enr, &disk_enr) {
if compare_enr(local_enr, &disk_enr)
&& is_disk_enr_network_id_unchanged(&disk_enr, config)
{
debug!(file = ?enr_f, "ENR loaded from disk");
// the stored ENR has the same configuration, use it
*local_enr = disk_enr;
@ -94,6 +97,13 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
let tcp_port = config.enr_tcp_port.unwrap_or(config.libp2p_port);
builder.tcp(tcp_port);
}
// add network identity info in ENR if not disabled
if !config.disable_enr_network_id {
builder.add_value(
ENR_CONTENT_KEY_NETWORK_ID,
&config.network_id.as_ssz_bytes(),
);
}
builder
}
@ -117,6 +127,14 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
&& (local_enr.udp().is_none() || local_enr.udp() == disk_enr.udp())
}
fn is_disk_enr_network_id_unchanged(disk_enr: &Enr, config: &NetworkConfig) -> bool {
match disk_enr.network_identity() {
Some(Ok(id)) => !config.disable_enr_network_id && id == config.network_id,
Some(Err(_)) => false,
None => config.disable_enr_network_id,
}
}
/// Loads enr from the given directory
pub fn load_enr_from_disk(dir: &Path) -> Result<Enr, String> {
let enr_f = dir.join(ENR_FILENAME);

View File

@ -2,8 +2,12 @@
use crate::{Enr, Multiaddr, PeerId};
use discv5::enr::{CombinedKey, CombinedPublicKey};
use libp2p::core::{identity::Keypair, identity::PublicKey, multiaddr::Protocol};
use shared_types::NetworkIdentity;
use ssz::Decode;
use tiny_keccak::{Hasher, Keccak};
pub(crate) const ENR_CONTENT_KEY_NETWORK_ID: &'static str = "network_identity";
/// Extend ENR for libp2p types.
pub trait EnrExt {
/// The libp2p `PeerId` for the record.
@ -24,6 +28,9 @@ pub trait EnrExt {
/// Returns any multiaddrs that contain the TCP protocol.
fn multiaddr_tcp(&self) -> Vec<Multiaddr>;
/// Returns network identity in content.
fn network_identity(&self) -> Option<Result<NetworkIdentity, ssz::DecodeError>>;
}
/// Extend ENR CombinedPublicKey for libp2p types.
@ -189,6 +196,12 @@ impl EnrExt for Enr {
}
multiaddrs
}
/// Returns network identity in content.
fn network_identity(&self) -> Option<Result<NetworkIdentity, ssz::DecodeError>> {
let value = self.get(ENR_CONTENT_KEY_NETWORK_ID)?;
Some(NetworkIdentity::from_ssz_bytes(value))
}
}
impl CombinedKeyPublicExt for CombinedPublicKey {

View File

@ -139,6 +139,7 @@ impl Discovery {
udp = ?local_enr.udp(),
tcp = ?local_enr.tcp(),
udp4_socket = ?local_enr.udp_socket(),
network_id = ?local_enr.network_identity(),
"ENR Initialised",
);
@ -158,6 +159,7 @@ impl Discovery {
ip = ?bootnode_enr.ip(),
udp = ?bootnode_enr.udp(),
tcp = ?bootnode_enr.tcp(),
network_id = ?bootnode_enr.network_identity(),
"Adding node to routing table",
);
let repr = bootnode_enr.to_string();
@ -205,13 +207,37 @@ impl Discovery {
match result {
Ok(enr) => {
debug!(
multiaddr = %original_addr.to_string(),
node_id = %enr.node_id(),
peer_id = %enr.peer_id(),
ip = ?enr.ip(),
udp = ?enr.udp(),
tcp = ?enr.tcp(),
"Adding node to routing table",
network_id = ?enr.network_identity(),
"Adding bootnode to routing table",
);
// check network identity in bootnode ENR if required
if !config.disable_enr_network_id {
match enr.network_identity() {
Some(Ok(id)) => {
if id != config.network_id {
error!(bootnode=?id, local=?config.network_id, "Bootnode network identity mismatch");
continue;
}
}
Some(Err(err)) => {
error!(?err, "Failed to decode bootnode network identity");
continue;
}
None => {
error!("Bootnode has no network identity");
continue;
}
}
}
// add bootnode into routing table
let _ = discv5.add_enr(enr).map_err(|e| {
error!(
addr = %original_addr.to_string(),
@ -401,10 +427,16 @@ impl Discovery {
// Generate a random target node id.
let random_node = NodeId::random();
// only discover nodes with same network identity
let local_network_id = self.network_globals.network_id();
let predicate = move |enr: &Enr| -> bool {
matches!(enr.network_identity(), Some(Ok(id)) if id == local_network_id)
};
// Build the future
let query_future = self
.discv5
.find_node_predicate(random_node, Box::new(|_| true), target_peers)
.find_node_predicate(random_node, Box::new(predicate), target_peers)
.map(|v| QueryResult {
query_type: query,
result: v,

View File

@ -1,3 +1,8 @@
use std::time::Duration;
use duration_str::deserialize_duration;
use serde::{Deserialize, Serialize};
/// The time in seconds between re-status's peers.
pub const DEFAULT_STATUS_INTERVAL: u64 = 300;
@ -11,9 +16,14 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20;
pub const DEFAULT_TARGET_PEERS: usize = 50;
/// Configurations for the PeerManager.
#[derive(Debug)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
/* Peer count related configurations */
/// The heartbeat performs regular updates such as updating reputations and performing discovery
/// requests. This defines the interval in seconds.
#[serde(deserialize_with = "deserialize_duration")]
pub heartbeat_interval: Duration,
/// Whether discovery is enabled.
pub discovery_enabled: bool,
/// Whether metrics are enabled.
@ -35,6 +45,7 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Config {
heartbeat_interval: Duration::from_secs(30),
discovery_enabled: true,
metrics_enabled: false,
target_peer_count: DEFAULT_TARGET_PEERS,

View File

@ -30,10 +30,6 @@ use std::net::IpAddr;
pub mod config;
mod network_behaviour;
/// The heartbeat performs regular updates such as updating reputations and performing discovery
/// requests. This defines the interval in seconds.
const HEARTBEAT_INTERVAL: u64 = 30;
/// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would
/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
/// peers.
@ -105,6 +101,7 @@ impl PeerManager {
network_globals: Arc<NetworkGlobals>,
) -> error::Result<Self> {
let config::Config {
heartbeat_interval,
discovery_enabled,
metrics_enabled,
target_peer_count,
@ -114,7 +111,7 @@ impl PeerManager {
} = cfg;
// Set up the peer manager heartbeat interval
let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL));
let heartbeat = tokio::time::interval(heartbeat_interval);
Ok(PeerManager {
network_globals,

View File

@ -112,8 +112,13 @@ impl ClientBuilder {
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
LogManager::rocksdb(
config.log_config.clone(),
config.db_dir.join("flow_db"),
config.db_dir.join("data_db"),
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);
self.store = Some(store.clone());

View File

@ -5,10 +5,11 @@ use ethereum_types::{H256, U256};
use ethers::prelude::{Http, Middleware, Provider};
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
use miner::MinerConfig;
use network::NetworkConfig;
use network::{EnrExt, NetworkConfig};
use pruner::PrunerConfig;
use shared_types::{NetworkIdentity, ProtocolVersion};
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use storage::config::ShardConfig;
use storage::log_store::log_manager::LogConfig;
@ -38,7 +39,7 @@ impl ZgsConfig {
.await
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
.as_u64();
network_config.network_id = NetworkIdentity {
let local_network_id = NetworkIdentity {
chain_id,
flow_address,
p2p_protocol_version: ProtocolVersion {
@ -47,6 +48,7 @@ impl ZgsConfig {
build: network::PROTOCOL_VERSION[2],
},
};
network_config.network_id = local_network_id.clone();
if !self.network_disable_discovery {
network_config.enr_tcp_port = Some(self.network_enr_tcp_port);
@ -82,7 +84,13 @@ impl ZgsConfig {
.collect::<Result<_, _>>()
.map_err(|e| format!("Unable to parse network_libp2p_nodes: {:?}", e))?;
network_config.discv5_config.table_filter = |_| true;
network_config.discv5_config.table_filter = if self.discv5_disable_enr_network_id {
Arc::new(|_| true)
} else {
Arc::new(
move |enr| matches!(enr.network_identity(), Some(Ok(id)) if id == local_network_id),
)
};
network_config.discv5_config.request_timeout =
Duration::from_secs(self.discv5_request_timeout_secs);
network_config.discv5_config.query_peer_timeout =
@ -97,6 +105,8 @@ impl ZgsConfig {
network_config.private = self.network_private;
network_config.peer_db = self.network_peer_db;
network_config.peer_manager = self.network_peer_manager;
network_config.disable_enr_network_id = self.discv5_disable_enr_network_id;
Ok(network_config)
}

View File

@ -28,6 +28,7 @@ build_config! {
(discv5_report_discovered_peers, (bool), false)
(discv5_disable_packet_filter, (bool), false)
(discv5_disable_ip_limit, (bool), false)
(discv5_disable_enr_network_id, (bool), false)
// log sync
(blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string())
@ -87,6 +88,9 @@ pub struct ZgsConfig {
/// Network peer db config, configured by [network_peer_db] section by `config` crate.
pub network_peer_db: network::peer_manager::peerdb::PeerDBConfig,
/// Network peer manager config, configured by [network_peer_manager] section by `config` crate.
pub network_peer_manager: network::peer_manager::config::Config,
// router config, configured by [router] section by `config` crate.
pub router: router::Config,

View File

@ -31,6 +31,8 @@ parking_lot = "0.12.3"
serde_json = "1.0.127"
tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" }
lazy_static = "1.4.0"
metrics = { workspace = true }
once_cell = { version = "1.19.0", features = [] }
[dev-dependencies]

View File

@ -25,9 +25,14 @@ fn write_performance(c: &mut Criterion) {
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_write", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
LogManager::rocksdb(
LogConfig::default(),
"db_flow_write",
"db_data_write",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
let chunk_count = 2048;
@ -114,9 +119,14 @@ fn read_performance(c: &mut Criterion) {
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_read", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
LogManager::rocksdb(
LogConfig::default(),
"db_flow_read",
"db_data_read",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
let tx_size = 1000;

View File

@ -63,22 +63,22 @@ impl<T: ?Sized + Configurable> ConfigurableExt for T {}
impl Configurable for LogManager {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.db.get(COL_MISC, key)?)
Ok(self.flow_db.get(COL_MISC, key)?)
}
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.db.put(COL_MISC, key, value)?;
self.flow_db.put(COL_MISC, key, value)?;
Ok(())
}
fn remove_config(&self, key: &[u8]) -> Result<()> {
Ok(self.db.delete(COL_MISC, key)?)
Ok(self.flow_db.delete(COL_MISC, key)?)
}
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
let mut db_tx = self.db.transaction();
let mut db_tx = self.flow_db.transaction();
db_tx.ops = tx.ops;
self.db.write(db_tx)?;
self.flow_db.write(db_tx)?;
Ok(())
}

View File

@ -1,68 +1,66 @@
use super::load_chunk::EntryBatch;
use super::seal_task_manager::SealTaskManager;
use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::error::Error;
use crate::log_store::load_chunk::EntryBatch;
use crate::log_store::log_manager::{
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
};
use crate::log_store::seal_task_manager::SealTaskManager;
use crate::log_store::{
metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
};
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB};
use any::Any;
use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
use append_merkle::{MerkleTreeRead, NodeDatabase, NodeTransaction};
use itertools::Itertools;
use kvdb::DBTransaction;
use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
use shared_types::{ChunkArray, DataRoot, FlowProof};
use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::{any, cmp, mem};
use std::time::Instant;
use std::{any, cmp};
use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
pub struct FlowStore {
db: Arc<FlowDBStore>,
data_db: Arc<FlowDBStore>,
seal_manager: SealTaskManager,
config: FlowConfig,
}
impl FlowStore {
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
Self {
db,
data_db,
seal_manager: Default::default(),
config,
}
}
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
self.db.put_batch_root_list(root_map)
}
pub fn insert_subtree_list_for_batch(
&self,
batch_index: usize,
subtree_list: Vec<(usize, usize, DataRoot)>,
) -> Result<()> {
let start_time = Instant::now();
let mut batch = self
.db
.data_db
.get_entry_batch(batch_index as u64)?
.unwrap_or_else(|| EntryBatch::new(batch_index as u64));
batch.set_subtree_list(subtree_list);
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
self.data_db
.put_entry_raw(vec![(batch_index as u64, batch)])?;
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
Ok(())
}
pub fn gen_proof_in_batch(&self, batch_index: usize, sector_index: usize) -> Result<FlowProof> {
let batch = self
.db
.data_db
.get_entry_batch(batch_index as u64)?
.ok_or_else(|| anyhow!("batch missing, index={}", batch_index))?;
let merkle = batch.to_merkle_tree(batch_index == 0)?.ok_or_else(|| {
@ -74,21 +72,9 @@ impl FlowStore {
merkle.gen_proof(sector_index)
}
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
self.db.put_mpt_node_list(node_list)
}
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
self.seal_manager.delete_batch_list(batch_list);
self.db.delete_batch_list(batch_list)
}
pub fn get_raw_batch(&self, batch_index: u64) -> Result<Option<EntryBatch>> {
self.db.get_entry_batch(batch_index)
}
pub fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
self.db.get_batch_root(batch_index)
self.data_db.delete_batch_list(batch_list)
}
}
@ -134,7 +120,7 @@ impl FlowRead for FlowStore {
length -= 1;
}
let entry_batch = try_option!(self.db.get_entry_batch(chunk_index)?);
let entry_batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
let mut entry_batch_data =
try_option!(entry_batch.get_unsealed_data(offset as usize, length as usize));
data.append(&mut entry_batch_data);
@ -163,7 +149,7 @@ impl FlowRead for FlowStore {
let chunk_index = start_entry_index / self.config.batch_size as u64;
if let Some(mut data_list) = self
.db
.data_db
.get_entry_batch(chunk_index)?
.map(|b| b.into_data_list(start_entry_index))
{
@ -187,13 +173,8 @@ impl FlowRead for FlowStore {
Ok(entry_list)
}
/// Return the list of all stored chunk roots.
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
self.db.get_batch_root_list()
}
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
let batch = try_option!(self.db.get_entry_batch(chunk_index)?);
let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
let mut mine_chunk = MineLoadChunk::default();
for (seal_index, (sealed, validity)) in mine_chunk
.loaded_chunk
@ -211,7 +192,7 @@ impl FlowRead for FlowStore {
fn get_num_entries(&self) -> Result<u64> {
// This is an over-estimation as it assumes each batch is full.
self.db
self.data_db
.kvdb
.num_keys(COL_ENTRY_BATCH)
.map(|num_batches| num_batches * PORA_CHUNK_SIZE as u64)
@ -227,6 +208,7 @@ impl FlowWrite for FlowStore {
/// Return the roots of completed chunks. The order is guaranteed to be increasing
/// by chunk index.
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut to_seal_set = self.seal_manager.to_seal_set.write();
trace!("append_entries: {} {}", data.start_index, data.data.len());
if data.data.len() % BYTES_PER_SECTOR != 0 {
@ -251,7 +233,7 @@ impl FlowWrite for FlowStore {
// TODO: Try to avoid loading from db if possible.
let mut batch = self
.db
.data_db
.get_entry_batch(chunk_index)?
.unwrap_or_else(|| EntryBatch::new(chunk_index));
let completed_seals = batch.insert_data(
@ -269,12 +251,14 @@ impl FlowWrite for FlowStore {
batch_list.push((chunk_index, batch));
}
self.db.put_entry_batch_list(batch_list)
metrics::APPEND_ENTRIES.update_since(start_time);
self.data_db.put_entry_batch_list(batch_list)
}
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
let mut to_seal_set = self.seal_manager.to_seal_set.write();
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
let to_reseal = self.data_db.truncate(start_index, self.config.batch_size)?;
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
let new_seal_version = self.seal_manager.inc_seal_version();
@ -304,7 +288,7 @@ impl FlowSeal for FlowStore {
let mut tasks = Vec::with_capacity(SEALS_PER_LOAD);
let batch_data = self
.db
.data_db
.get_entry_batch((first_index / SEALS_PER_LOAD) as u64)?
.expect("Lost data chunk in to_seal_set");
@ -343,7 +327,7 @@ impl FlowSeal for FlowStore {
.chunk_by(|answer| answer.seal_index / SEALS_PER_LOAD as u64)
{
let mut batch_chunk = self
.db
.data_db
.get_entry_batch(load_index)?
.expect("Can not find chunk data");
for answer in answers_in_chunk {
@ -359,7 +343,7 @@ impl FlowSeal for FlowStore {
to_seal_set.remove(&idx);
}
self.db.put_entry_raw(updated_chunk)?;
self.data_db.put_entry_raw(updated_chunk)?;
Ok(())
}
@ -378,6 +362,7 @@ impl FlowDBStore {
&self,
batch_list: Vec<(u64, EntryBatch)>,
) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut completed_batches = Vec::new();
let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list {
@ -388,16 +373,11 @@ impl FlowDBStore {
);
if let Some(root) = batch.build_root(batch_index == 0)? {
trace!("complete batch: index={}", batch_index);
tx.put(
COL_ENTRY_BATCH_ROOT,
// (batch_index, subtree_depth)
&encode_batch_root_key(batch_index as usize, 1),
root.as_bytes(),
);
completed_batches.push((batch_index, root));
}
}
self.kvdb.write(tx)?;
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
Ok(completed_batches)
}
@ -419,94 +399,6 @@ impl FlowDBStore {
Ok(Some(EntryBatch::from_ssz_bytes(&raw).map_err(Error::from)?))
}
fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (batch_index, (root, subtree_depth)) in root_map {
tx.put(
COL_ENTRY_BATCH_ROOT,
&encode_batch_root_key(batch_index, subtree_depth),
root.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn get_batch_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
let mut range_root = None;
// A list of `BatchRoot` that can reconstruct the whole merkle tree structure.
let mut root_list = Vec::new();
// A list of leaf `(index, root_hash)` in the subtrees of some nodes in `root_list`,
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
let mut leaf_list = Vec::new();
let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?;
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
let root = DataRoot::from_slice(root_bytes.as_ref());
debug!(
"load root depth={}, index expected={} get={} root={:?}",
subtree_depth, expected_index, batch_index, root,
);
if subtree_depth == 1 {
if range_root.is_none() {
// This is expected to be the next leaf.
if batch_index == expected_index {
root_list.push((1, root));
expected_index += 1;
} else {
bail!(
"unexpected chunk leaf, expected={}, get={}",
expected_index,
batch_index
);
}
} else {
match batch_index.cmp(&expected_index) {
Ordering::Less => {
// This leaf is within a subtree whose root is known.
leaf_list.push((batch_index, root));
}
Ordering::Equal => {
// A subtree range ends.
range_root = None;
root_list.push((1, root));
expected_index += 1;
}
Ordering::Greater => {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = None;
root_list.push((1, root));
expected_index += 1;
}
}
}
} else {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
root_list.push((subtree_depth, root));
expected_index += 1 << (subtree_depth - 1);
}
}
let extra_node_list = self.get_mpt_node_list()?;
Ok(MerkleTreeInitialData {
subtree_list: root_list,
known_leaves: leaf_list,
extra_mpt_nodes: extra_node_list,
})
}
fn truncate(&self, start_index: u64, batch_size: usize) -> crate::error::Result<Vec<usize>> {
let mut tx = self.kvdb.transaction();
let mut start_batch_index = start_index / batch_size as u64;
@ -547,38 +439,11 @@ impl FlowDBStore {
};
for batch_index in start_batch_index as usize..=end {
tx.delete(COL_ENTRY_BATCH, &batch_index.to_be_bytes());
tx.delete_prefix(COL_ENTRY_BATCH_ROOT, &batch_index.to_be_bytes());
}
self.kvdb.write(tx)?;
Ok(index_to_reseal)
}
fn put_mpt_node_list(&self, mpt_node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (layer_index, position, data) in mpt_node_list {
tx.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(layer_index, position),
data.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn get_mpt_node_list(&self) -> Result<Vec<(usize, usize, DataRoot)>> {
let mut node_list = Vec::new();
for r in self.kvdb.iter(COL_FLOW_MPT_NODES) {
let (index_bytes, node_bytes) = r?;
let (layer_index, position) = decode_mpt_node_key(index_bytes.as_ref())?;
node_list.push((
layer_index,
position,
DataRoot::from_slice(node_bytes.as_ref()),
));
}
Ok(node_list)
}
fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
let mut tx = self.kvdb.transaction();
for i in batch_list {
@ -586,16 +451,6 @@ impl FlowDBStore {
}
Ok(self.kvdb.write(tx)?)
}
fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
Ok(self
.kvdb
.get(
COL_ENTRY_BATCH_ROOT,
&encode_batch_root_key(batch_index as usize, 1),
)?
.map(|v| DataRoot::from_slice(&v)))
}
}
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
@ -641,37 +496,12 @@ fn decode_batch_index(data: &[u8]) -> Result<usize> {
try_decode_usize(data)
}
/// For the same batch_index, we want to process the larger subtree_depth first in iteration.
fn encode_batch_root_key(batch_index: usize, subtree_depth: usize) -> Vec<u8> {
let mut key = batch_index.to_be_bytes().to_vec();
key.extend_from_slice(&(usize::MAX - subtree_depth).to_be_bytes());
key
}
fn decode_batch_root_key(data: &[u8]) -> Result<(usize, usize)> {
if data.len() != mem::size_of::<usize>() * 2 {
bail!("invalid data length");
}
let batch_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
let subtree_depth = usize::MAX - try_decode_usize(&data[mem::size_of::<u64>()..])?;
Ok((batch_index, subtree_depth))
}
fn encode_mpt_node_key(layer_index: usize, position: usize) -> Vec<u8> {
let mut key = layer_index.to_be_bytes().to_vec();
key.extend_from_slice(&position.to_be_bytes());
key
}
fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
if data.len() != mem::size_of::<usize>() * 2 {
bail!("invalid data length");
}
let layer_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
Ok((layer_index, position))
}
fn layer_size_key(layer: usize) -> Vec<u8> {
let mut key = "layer_size".as_bytes().to_vec();
key.extend_from_slice(&layer.to_be_bytes());

View File

@ -1,5 +1,3 @@
use super::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
use crate::log_store::tx_store::TransactionStore;
@ -22,12 +20,17 @@ use shared_types::{
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, instrument, trace, warn};
use crate::log_store::metrics;
use crate::log_store::tx_store::BlockHashAndSubmissionIndex;
use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
/// 256 Bytes
pub const ENTRY_SIZE: usize = 256;
/// 1024 Entries.
@ -56,13 +59,12 @@ static PAD_SEGMENT_ROOT: Lazy<H256> = Lazy::new(|| {
.root()
});
pub struct UpdateFlowMessage {
pub root_map: BTreeMap<usize, (H256, usize)>,
pub pad_data: usize,
pub tx_start_flow_index: u64,
}
pub struct LogManager {
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore,
flow_store: Arc<FlowStore>,
merkle: RwLock<MerkleManager>,
@ -189,6 +191,7 @@ impl LogStoreChunkWrite for LogManager {
chunks: ChunkArray,
maybe_file_proof: Option<FlowProof>,
) -> Result<bool> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
let tx = self
.tx_store
@ -214,13 +217,13 @@ impl LogStoreChunkWrite for LogManager {
self.append_entries(flow_entry_array, &mut merkle)?;
if let Some(file_proof) = maybe_file_proof {
let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof(
merkle.pora_chunks_merkle.fill_with_file_proof(
file_proof,
tx.merkle_nodes,
tx.start_entry_index,
)?;
self.flow_store.put_mpt_node_list(updated_node_list)?;
}
metrics::PUT_CHUNKS.update_since(start_time);
Ok(true)
}
@ -251,6 +254,7 @@ impl LogStoreWrite for LogManager {
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
///
fn put_tx(&self, tx: Transaction) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
debug!("put_tx: tx={:?}", tx);
let expected_seq = self.tx_store.next_tx_seq();
@ -280,6 +284,7 @@ impl LogStoreWrite for LogManager {
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
}
}
metrics::PUT_TX.update_since(start_time);
Ok(())
}
@ -310,6 +315,7 @@ impl LogStoreWrite for LogManager {
}
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
let start_time = Instant::now();
trace!(
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
tx_seq,
@ -338,6 +344,7 @@ impl LogStoreWrite for LogManager {
if same_root_seq_list.first() == Some(&tx_seq) {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
}
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
Ok(true)
} else {
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
@ -385,10 +392,9 @@ impl LogStoreWrite for LogManager {
// `merkle` is used in `validate_range_proof`.
let mut merkle = self.merkle.write();
if valid {
let updated_nodes = merkle
merkle
.pora_chunks_merkle
.fill_with_range_proof(data.proof.clone())?;
self.flow_store.put_mpt_node_list(updated_nodes)?;
}
Ok(valid)
}
@ -615,28 +621,33 @@ impl LogStoreRead for LogManager {
impl LogManager {
pub fn rocksdb(
config: LogConfig,
path: impl AsRef<Path>,
flow_path: impl AsRef<Path>,
data_path: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
db_config.enable_statistics = true;
let db = Arc::new(Database::open(&db_config, path)?);
Self::new(db, config, executor)
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
Self::new(flow_db_source, data_db_source, config, executor)
}
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(db, config, executor)
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(flow_db, data_db, config, executor)
}
fn new(
db: Arc<dyn ZgsKeyValueDB>,
flow_db_source: Arc<dyn ZgsKeyValueDB>,
data_db_source: Arc<dyn ZgsKeyValueDB>,
config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let tx_store = TransactionStore::new(db.clone())?;
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
let tx_store = TransactionStore::new(flow_db_source.clone())?;
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
// first and call `put_tx` later.
let next_tx_seq = tx_store.next_tx_seq();
@ -740,7 +751,7 @@ impl LogManager {
let (sender, receiver) = mpsc::channel();
let mut log_manager = Self {
db,
flow_db: flow_db_source,
tx_store,
flow_store,
merkle,
@ -774,8 +785,6 @@ impl LogManager {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the root index.
flow_store.put_batch_root_list(data.root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
@ -848,21 +857,7 @@ impl LogManager {
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
}
};
let r = entry_proof(&top_proof, &sub_proof);
if r.is_err() {
let raw_batch = self.flow_store.get_raw_batch(seg_index as u64)?.unwrap();
let db_root = self.flow_store.get_batch_root(seg_index as u64)?;
error!(
?r,
?db_root,
?seg_index,
"gen proof error: top_leaves={}, last={}, raw_batch={}",
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves(),
serde_json::to_string(&raw_batch).unwrap(),
);
}
r
entry_proof(&top_proof, &sub_proof)
}
#[instrument(skip(self, merkle))]
@ -875,10 +870,10 @@ impl LogManager {
if merkle_list.is_empty() {
return Ok(());
}
let start_time = Instant::now();
self.pad_tx(tx_start_index, &mut *merkle)?;
let mut batch_root_map = BTreeMap::new();
for (subtree_depth, subtree_root) in merkle_list {
let subtree_size = 1 << (subtree_depth - 1);
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
@ -896,10 +891,6 @@ impl LogManager {
.update_last(merkle.last_chunk_merkle.root());
}
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
batch_root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(merkle.last_chunk_merkle.root(), 1),
);
self.complete_last_chunk_merkle(
merkle.pora_chunks_merkle.leaves() - 1,
&mut *merkle,
@ -910,22 +901,20 @@ impl LogManager {
// the chunks boundary.
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
assert!(subtree_size >= PORA_CHUNK_SIZE);
batch_root_map.insert(
merkle.pora_chunks_merkle.leaves(),
(subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)),
);
merkle
.pora_chunks_merkle
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
}
}
self.flow_store.put_batch_root_list(batch_root_map)?;
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(())
}
#[instrument(skip(self, merkle))]
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow.
let start_time = Instant::now();
let mut tx_start_flow_index =
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
let pad_size = tx_start_index - tx_start_flow_index;
@ -937,7 +926,6 @@ impl LogManager {
if pad_size != 0 {
for pad_data in Self::padding(pad_size as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree.
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
@ -965,10 +953,6 @@ impl LogManager {
merkle
.pora_chunks_merkle
.update_last(merkle.last_chunk_merkle.root());
root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(merkle.last_chunk_merkle.root(), 1),
);
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
}
@ -976,10 +960,6 @@ impl LogManager {
let mut start_index = last_chunk_pad / ENTRY_SIZE;
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(*PAD_SEGMENT_ROOT, 1),
);
start_index += PORA_CHUNK_SIZE;
}
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
@ -988,12 +968,10 @@ impl LogManager {
let data_size = pad_data.len() / ENTRY_SIZE;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
root_map,
pad_data: pad_data.len(),
tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
@ -1014,6 +992,8 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
metrics::PAD_TX.update_since(start_time);
Ok(())
}
@ -1142,6 +1122,8 @@ impl LogManager {
}
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
let shard_config = self.flow_store.get_shard_config();
// We have all the data need for this tx, so just copy them.
@ -1190,6 +1172,8 @@ impl LogManager {
for (seq, _) in to_tx_offset_list {
self.tx_store.finalize_tx(seq)?;
}
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
Ok(())
}
@ -1262,6 +1246,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
}
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
let start_time = Instant::now();
if leaf_data.len() % ENTRY_SIZE != 0 {
bail!("merkle_tree: mismatched data size");
}
@ -1277,6 +1262,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
.map(Sha3Algorithm::leaf)
.collect()
};
metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
Ok(r)
}

View File

@ -0,0 +1,39 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
pub static ref PUT_CHUNKS: Arc<dyn Timer> = register_timer("log_store_put_chunks");
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
register_timer("log_store_log_manager_check_tx_completed");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_log_manager_append_subtree_list");
pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
register_timer("log_store_log_manager_data_to_merkle_leaves");
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
register_timer("log_store_log_manager_copy_tx_and_finalize");
pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_insert_subtree_list");
pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_put_entry_batch_list");
pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
pub static ref FINALIZE_TX_WITH_HASH: Arc<dyn Timer> = register_timer("log_store_log_manager_finalize_tx_with_hash");
}

View File

@ -1,5 +1,5 @@
use crate::config::ShardConfig;
use append_merkle::MerkleTreeInitialData;
use ethereum_types::H256;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
@ -15,6 +15,7 @@ pub mod config;
mod flow_store;
mod load_chunk;
pub mod log_manager;
mod metrics;
mod seal_task_manager;
#[cfg(test)]
mod tests;
@ -211,8 +212,6 @@ pub trait FlowRead {
/// For simplicity, `index_start` and `index_end` must be at the batch boundaries.
fn get_available_entries(&self, index_start: u64, index_end: u64) -> Result<Vec<ChunkArray>>;
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>>;
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
// An estimation of the number of entries in the flow db.

View File

@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
};
use crate::log_store::metrics;
use crate::{try_option, LogManager, ZgsKeyValueDB};
use anyhow::{anyhow, Result};
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
@ -51,6 +53,8 @@ impl TransactionStore {
#[instrument(skip(self))]
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
let start_time = Instant::now();
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
// The last tx is inserted again, so no need to process it.
@ -86,6 +90,7 @@ impl TransactionStore {
);
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
self.kvdb.write(db_tx)?;
metrics::TX_STORE_PUT.update_since(start_time);
Ok(old_tx_seq_list)
}
@ -175,8 +180,12 @@ impl TransactionStore {
}
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]))
let start_time = Instant::now();
let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]);
metrics::CHECK_TX_COMPLETED.update_since(start_time);
Ok(res)
}
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {

View File

@ -3,6 +3,10 @@ from web3 import Web3
ZGS_CONFIG = {
"log_config_file": "log_config",
"confirmation_block_count": 1,
"discv5_disable_ip_limit": True,
"network_peer_manager": {
"heartbeat_interval": "1s"
},
"router": {
"private_ip_enabled": True,
},
@ -18,6 +22,8 @@ ZGS_CONFIG = {
}
}
ZGS_NODEID = "16Uiu2HAmLkGFUbNFYdhuSbTQ5hmnPjFXx2zUDtwQ2uihHpN9YNNe"
BSC_CONFIG = dict(
NetworkId=1000,
HTTPPort=8545,

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
import os
import time
from config.node_config import ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkDiscoveryTest(TestFramework):
"""
This is to test whether community nodes could connect to each other via UDP discovery.
"""
def setup_params(self):
# 1 bootnode and 2 community nodes
self.num_nodes = 3
# setup for node 0 as bootnode
tests_dir = os.path.dirname(__file__)
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
bootnode_port = p2p_port(0)
self.zgs_node_configs[0] = {
# load pre-defined keypair
"network_dir": network_dir,
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
}
# setup node 1 & 2 as community nodes
bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"]
for i in range(1, self.num_nodes):
self.zgs_node_configs[i] = {
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
}
def run_test(self):
timeout_secs = 10
for iter in range(timeout_secs + 1):
assert iter < timeout_secs, "Timeout to discover nodes for peer connection"
time.sleep(1)
self.log.info("==================================== iter %s", iter)
total_connected = 0
for i in range(self.num_nodes):
info = self.nodes[i].rpc.admin_getNetworkInfo()
total_connected += info["connectedPeers"]
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
)
if total_connected >= self.num_nodes * (self.num_nodes - 1):
break
self.log.info("====================================")
self.log.info("All nodes connected to each other successfully")
if __name__ == "__main__":
NetworkDiscoveryTest().main()

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
import os
import time
from config.node_config import ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
class NetworkDiscoveryUpgradeTest(TestFramework):
"""
This is to test that low version community nodes could not connect to bootnodes.
"""
def setup_params(self):
# 1 bootnode and 1 community node
self.num_nodes = 2
# setup for node 0 as bootnode
tests_dir = os.path.dirname(__file__)
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
bootnode_port = p2p_port(0)
self.zgs_node_configs[0] = {
# load pre-defined keypair
"network_dir": network_dir,
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,
"network_enr_udp_port": bootnode_port,
# disable trusted nodes
"network_libp2p_nodes": [],
}
# setup node 1 as community node
bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"]
for i in range(1, self.num_nodes):
self.zgs_node_configs[i] = {
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": p2p_port(i),
"network_enr_udp_port": p2p_port(i),
# disable trusted nodes and enable bootnodes
"network_libp2p_nodes": [],
"network_boot_nodes": bootnodes,
# disable network identity in ENR
"discv5_disable_enr_network_id": True,
}
def run_test(self):
for iter in range(10):
time.sleep(1)
self.log.info("==================================== iter %s", iter)
total_connected = 0
for i in range(self.num_nodes):
info = self.nodes[i].rpc.admin_getNetworkInfo()
total_connected += info["connectedPeers"]
self.log.info(
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
)
# ENR incompatible and should not discover each other for TCP connection
assert total_connected == 0, "Nodes connected unexpectedly"
self.log.info("====================================")
self.log.info("ENR incompatible nodes do not connect to each other")
if __name__ == "__main__":
NetworkDiscoveryUpgradeTest().main()

View File

@ -3,6 +3,7 @@
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoRandomSyncV2Test(TestFramework):
def setup_params(self):
self.num_nodes = 4
@ -30,5 +31,6 @@ class AutoRandomSyncV2Test(TestFramework):
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoRandomSyncV2Test().main()

View File

@ -2,7 +2,7 @@ use crate::{
kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, PermitBanList, RateLimiter, RateLimiterBuilder,
};
///! A set of configuration parameters to tune the discovery protocol.
use std::time::Duration;
use std::{sync::Arc, time::Duration};
/// Configuration parameters that define the performance of the gossipsub network.
#[derive(Clone)]
@ -57,7 +57,7 @@ pub struct Discv5Config {
/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
/// excluded if they do not pass this filter. The default is to accept all nodes.
pub table_filter: fn(&Enr) -> bool,
pub table_filter: Arc<dyn Fn(&Enr) -> bool + Send + Sync>,
/// The time between pings to ensure connectivity amongst connected nodes. Default: 300
/// seconds.
@ -123,7 +123,7 @@ impl Default for Discv5Config {
query_parallelism: 3,
ip_limit: false,
incoming_bucket_limit: MAX_NODES_PER_BUCKET,
table_filter: |_| true,
table_filter: Arc::new(|_| true),
ping_interval: Duration::from_secs(300),
report_discovered_peers: true,
filter_rate_limiter,
@ -242,8 +242,8 @@ impl Discv5ConfigBuilder {
/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
/// excluded if they do not pass this filter.
pub fn table_filter(&mut self, filter: fn(&Enr) -> bool) -> &mut Self {
self.config.table_filter = filter;
pub fn table_filter<F>(&mut self, filter: F) -> &mut Self where F: Fn(&Enr) -> bool + Send + Sync + 'static {
self.config.table_filter = Arc::new(filter);
self
}

View File

@ -754,6 +754,7 @@ impl Handler {
// failed.
enr.node_id() == node_address.node_id
&& (enr.udp_socket().is_none() || enr.udp_socket() == Some(node_address.socket_addr))
&& enr.get("network_identity").is_some()
}
/// Handle a message that contains an authentication header.

View File

@ -1264,7 +1264,12 @@ impl Service {
"Session established with Node: {}, direction: {}",
node_id, direction
);
self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction));
// requires network identity in ENR, so as to refuse low version peers.
match enr.get("network_identity") {
Some(_) => self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction)),
None => debug!(ip=?enr.ip(), "No network identity in peer ENR"),
}
}
/// A session could not be established or an RPC request timed-out (after a few retries, if