mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
25 Commits
8f8b6db72a
...
20dfb5ab39
Author | SHA1 | Date | |
---|---|---|---|
![]() |
20dfb5ab39 | ||
![]() |
832fa8eab5 | ||
![]() |
924c354acf | ||
![]() |
cc73628c02 | ||
![]() |
7c915196ce | ||
![]() |
8bd2b079e6 | ||
![]() |
9d2303b9b3 | ||
![]() |
f03fdc68e7 | ||
![]() |
f15c27c325 | ||
![]() |
20a3bd8b1d | ||
![]() |
ebfbf65340 | ||
![]() |
3653d34048 | ||
![]() |
d326dc87c3 | ||
![]() |
9474daabde | ||
![]() |
6df5b2aaf9 | ||
![]() |
e1a9cfde64 | ||
![]() |
9fde9a7239 | ||
![]() |
f9ef93dd9c | ||
![]() |
cc846de1f5 | ||
![]() |
fb65d257be | ||
![]() |
8f1fe118ef | ||
![]() |
d9030de505 | ||
![]() |
9eea71e97d | ||
![]() |
bb6e1457b7 | ||
![]() |
da2cdec8a1 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -4,5 +4,6 @@
|
|||||||
/.idea
|
/.idea
|
||||||
tests/**/__pycache__
|
tests/**/__pycache__
|
||||||
tests/tmp/**
|
tests/tmp/**
|
||||||
|
tests/config/zgs
|
||||||
.vscode/*.json
|
.vscode/*.json
|
||||||
/0g-storage-contracts-dev
|
/0g-storage-contracts-dev
|
@ -267,7 +267,7 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
|
|||||||
discovery_enabled: !config.disable_discovery,
|
discovery_enabled: !config.disable_discovery,
|
||||||
metrics_enabled: config.metrics_enabled,
|
metrics_enabled: config.metrics_enabled,
|
||||||
target_peer_count: config.target_peers,
|
target_peer_count: config.target_peers,
|
||||||
..Default::default()
|
..config.peer_manager
|
||||||
};
|
};
|
||||||
|
|
||||||
let slot_duration = std::time::Duration::from_secs(12);
|
let slot_duration = std::time::Duration::from_secs(12);
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
use crate::peer_manager::peerdb::PeerDBConfig;
|
|
||||||
use crate::types::GossipKind;
|
use crate::types::GossipKind;
|
||||||
use crate::{Enr, PeerIdSerialized};
|
use crate::{peer_manager, Enr, PeerIdSerialized};
|
||||||
use directory::{
|
use directory::{
|
||||||
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
|
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
|
||||||
};
|
};
|
||||||
@ -128,7 +127,12 @@ pub struct Config {
|
|||||||
/// The id of the storage network.
|
/// The id of the storage network.
|
||||||
pub network_id: NetworkIdentity,
|
pub network_id: NetworkIdentity,
|
||||||
|
|
||||||
pub peer_db: PeerDBConfig,
|
pub peer_db: peer_manager::peerdb::PeerDBConfig,
|
||||||
|
pub peer_manager: peer_manager::config::Config,
|
||||||
|
|
||||||
|
/// Whether to disable network identity in ENR.
|
||||||
|
/// This is for test purpose only.
|
||||||
|
pub disable_enr_network_id: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
@ -208,6 +212,8 @@ impl Default for Config {
|
|||||||
metrics_enabled: false,
|
metrics_enabled: false,
|
||||||
network_id: Default::default(),
|
network_id: Default::default(),
|
||||||
peer_db: Default::default(),
|
peer_db: Default::default(),
|
||||||
|
peer_manager: Default::default(),
|
||||||
|
disable_enr_network_id: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
//! Helper functions and an extension trait for Ethereum 2 ENRs.
|
//! Helper functions and an extension trait for Ethereum 2 ENRs.
|
||||||
|
|
||||||
pub use discv5::enr::{CombinedKey, EnrBuilder};
|
pub use discv5::enr::{CombinedKey, EnrBuilder};
|
||||||
|
use ssz::Encode;
|
||||||
|
|
||||||
use super::enr_ext::CombinedKeyExt;
|
use super::enr_ext::{CombinedKeyExt, ENR_CONTENT_KEY_NETWORK_ID};
|
||||||
use super::ENR_FILENAME;
|
use super::{EnrExt, ENR_FILENAME};
|
||||||
use crate::types::Enr;
|
use crate::types::Enr;
|
||||||
use crate::NetworkConfig;
|
use crate::NetworkConfig;
|
||||||
use discv5::enr::EnrKey;
|
use discv5::enr::EnrKey;
|
||||||
@ -32,7 +33,9 @@ pub fn use_or_load_enr(
|
|||||||
Ok(disk_enr) => {
|
Ok(disk_enr) => {
|
||||||
// if the same node id, then we may need to update our sequence number
|
// if the same node id, then we may need to update our sequence number
|
||||||
if local_enr.node_id() == disk_enr.node_id() {
|
if local_enr.node_id() == disk_enr.node_id() {
|
||||||
if compare_enr(local_enr, &disk_enr) {
|
if compare_enr(local_enr, &disk_enr)
|
||||||
|
&& is_disk_enr_network_id_unchanged(&disk_enr, config)
|
||||||
|
{
|
||||||
debug!(file = ?enr_f, "ENR loaded from disk");
|
debug!(file = ?enr_f, "ENR loaded from disk");
|
||||||
// the stored ENR has the same configuration, use it
|
// the stored ENR has the same configuration, use it
|
||||||
*local_enr = disk_enr;
|
*local_enr = disk_enr;
|
||||||
@ -94,6 +97,13 @@ pub fn create_enr_builder_from_config<T: EnrKey>(
|
|||||||
let tcp_port = config.enr_tcp_port.unwrap_or(config.libp2p_port);
|
let tcp_port = config.enr_tcp_port.unwrap_or(config.libp2p_port);
|
||||||
builder.tcp(tcp_port);
|
builder.tcp(tcp_port);
|
||||||
}
|
}
|
||||||
|
// add network identity info in ENR if not disabled
|
||||||
|
if !config.disable_enr_network_id {
|
||||||
|
builder.add_value(
|
||||||
|
ENR_CONTENT_KEY_NETWORK_ID,
|
||||||
|
&config.network_id.as_ssz_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
builder
|
builder
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,6 +127,14 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
|
|||||||
&& (local_enr.udp().is_none() || local_enr.udp() == disk_enr.udp())
|
&& (local_enr.udp().is_none() || local_enr.udp() == disk_enr.udp())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_disk_enr_network_id_unchanged(disk_enr: &Enr, config: &NetworkConfig) -> bool {
|
||||||
|
match disk_enr.network_identity() {
|
||||||
|
Some(Ok(id)) => !config.disable_enr_network_id && id == config.network_id,
|
||||||
|
Some(Err(_)) => false,
|
||||||
|
None => config.disable_enr_network_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Loads enr from the given directory
|
/// Loads enr from the given directory
|
||||||
pub fn load_enr_from_disk(dir: &Path) -> Result<Enr, String> {
|
pub fn load_enr_from_disk(dir: &Path) -> Result<Enr, String> {
|
||||||
let enr_f = dir.join(ENR_FILENAME);
|
let enr_f = dir.join(ENR_FILENAME);
|
||||||
|
@ -2,8 +2,12 @@
|
|||||||
use crate::{Enr, Multiaddr, PeerId};
|
use crate::{Enr, Multiaddr, PeerId};
|
||||||
use discv5::enr::{CombinedKey, CombinedPublicKey};
|
use discv5::enr::{CombinedKey, CombinedPublicKey};
|
||||||
use libp2p::core::{identity::Keypair, identity::PublicKey, multiaddr::Protocol};
|
use libp2p::core::{identity::Keypair, identity::PublicKey, multiaddr::Protocol};
|
||||||
|
use shared_types::NetworkIdentity;
|
||||||
|
use ssz::Decode;
|
||||||
use tiny_keccak::{Hasher, Keccak};
|
use tiny_keccak::{Hasher, Keccak};
|
||||||
|
|
||||||
|
pub(crate) const ENR_CONTENT_KEY_NETWORK_ID: &'static str = "network_identity";
|
||||||
|
|
||||||
/// Extend ENR for libp2p types.
|
/// Extend ENR for libp2p types.
|
||||||
pub trait EnrExt {
|
pub trait EnrExt {
|
||||||
/// The libp2p `PeerId` for the record.
|
/// The libp2p `PeerId` for the record.
|
||||||
@ -24,6 +28,9 @@ pub trait EnrExt {
|
|||||||
|
|
||||||
/// Returns any multiaddrs that contain the TCP protocol.
|
/// Returns any multiaddrs that contain the TCP protocol.
|
||||||
fn multiaddr_tcp(&self) -> Vec<Multiaddr>;
|
fn multiaddr_tcp(&self) -> Vec<Multiaddr>;
|
||||||
|
|
||||||
|
/// Returns network identity in content.
|
||||||
|
fn network_identity(&self) -> Option<Result<NetworkIdentity, ssz::DecodeError>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extend ENR CombinedPublicKey for libp2p types.
|
/// Extend ENR CombinedPublicKey for libp2p types.
|
||||||
@ -189,6 +196,12 @@ impl EnrExt for Enr {
|
|||||||
}
|
}
|
||||||
multiaddrs
|
multiaddrs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns network identity in content.
|
||||||
|
fn network_identity(&self) -> Option<Result<NetworkIdentity, ssz::DecodeError>> {
|
||||||
|
let value = self.get(ENR_CONTENT_KEY_NETWORK_ID)?;
|
||||||
|
Some(NetworkIdentity::from_ssz_bytes(value))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CombinedKeyPublicExt for CombinedPublicKey {
|
impl CombinedKeyPublicExt for CombinedPublicKey {
|
||||||
|
@ -139,6 +139,7 @@ impl Discovery {
|
|||||||
udp = ?local_enr.udp(),
|
udp = ?local_enr.udp(),
|
||||||
tcp = ?local_enr.tcp(),
|
tcp = ?local_enr.tcp(),
|
||||||
udp4_socket = ?local_enr.udp_socket(),
|
udp4_socket = ?local_enr.udp_socket(),
|
||||||
|
network_id = ?local_enr.network_identity(),
|
||||||
"ENR Initialised",
|
"ENR Initialised",
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -158,6 +159,7 @@ impl Discovery {
|
|||||||
ip = ?bootnode_enr.ip(),
|
ip = ?bootnode_enr.ip(),
|
||||||
udp = ?bootnode_enr.udp(),
|
udp = ?bootnode_enr.udp(),
|
||||||
tcp = ?bootnode_enr.tcp(),
|
tcp = ?bootnode_enr.tcp(),
|
||||||
|
network_id = ?bootnode_enr.network_identity(),
|
||||||
"Adding node to routing table",
|
"Adding node to routing table",
|
||||||
);
|
);
|
||||||
let repr = bootnode_enr.to_string();
|
let repr = bootnode_enr.to_string();
|
||||||
@ -205,13 +207,37 @@ impl Discovery {
|
|||||||
match result {
|
match result {
|
||||||
Ok(enr) => {
|
Ok(enr) => {
|
||||||
debug!(
|
debug!(
|
||||||
|
multiaddr = %original_addr.to_string(),
|
||||||
node_id = %enr.node_id(),
|
node_id = %enr.node_id(),
|
||||||
peer_id = %enr.peer_id(),
|
peer_id = %enr.peer_id(),
|
||||||
ip = ?enr.ip(),
|
ip = ?enr.ip(),
|
||||||
udp = ?enr.udp(),
|
udp = ?enr.udp(),
|
||||||
tcp = ?enr.tcp(),
|
tcp = ?enr.tcp(),
|
||||||
"Adding node to routing table",
|
network_id = ?enr.network_identity(),
|
||||||
|
"Adding bootnode to routing table",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// check network identity in bootnode ENR if required
|
||||||
|
if !config.disable_enr_network_id {
|
||||||
|
match enr.network_identity() {
|
||||||
|
Some(Ok(id)) => {
|
||||||
|
if id != config.network_id {
|
||||||
|
error!(bootnode=?id, local=?config.network_id, "Bootnode network identity mismatch");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Err(err)) => {
|
||||||
|
error!(?err, "Failed to decode bootnode network identity");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
error!("Bootnode has no network identity");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add bootnode into routing table
|
||||||
let _ = discv5.add_enr(enr).map_err(|e| {
|
let _ = discv5.add_enr(enr).map_err(|e| {
|
||||||
error!(
|
error!(
|
||||||
addr = %original_addr.to_string(),
|
addr = %original_addr.to_string(),
|
||||||
@ -401,10 +427,16 @@ impl Discovery {
|
|||||||
// Generate a random target node id.
|
// Generate a random target node id.
|
||||||
let random_node = NodeId::random();
|
let random_node = NodeId::random();
|
||||||
|
|
||||||
|
// only discover nodes with same network identity
|
||||||
|
let local_network_id = self.network_globals.network_id();
|
||||||
|
let predicate = move |enr: &Enr| -> bool {
|
||||||
|
matches!(enr.network_identity(), Some(Ok(id)) if id == local_network_id)
|
||||||
|
};
|
||||||
|
|
||||||
// Build the future
|
// Build the future
|
||||||
let query_future = self
|
let query_future = self
|
||||||
.discv5
|
.discv5
|
||||||
.find_node_predicate(random_node, Box::new(|_| true), target_peers)
|
.find_node_predicate(random_node, Box::new(predicate), target_peers)
|
||||||
.map(|v| QueryResult {
|
.map(|v| QueryResult {
|
||||||
query_type: query,
|
query_type: query,
|
||||||
result: v,
|
result: v,
|
||||||
|
@ -1,3 +1,8 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use duration_str::deserialize_duration;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
/// The time in seconds between re-status's peers.
|
/// The time in seconds between re-status's peers.
|
||||||
pub const DEFAULT_STATUS_INTERVAL: u64 = 300;
|
pub const DEFAULT_STATUS_INTERVAL: u64 = 300;
|
||||||
|
|
||||||
@ -11,9 +16,14 @@ pub const DEFAULT_PING_INTERVAL_INBOUND: u64 = 20;
|
|||||||
pub const DEFAULT_TARGET_PEERS: usize = 50;
|
pub const DEFAULT_TARGET_PEERS: usize = 50;
|
||||||
|
|
||||||
/// Configurations for the PeerManager.
|
/// Configurations for the PeerManager.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
/* Peer count related configurations */
|
/* Peer count related configurations */
|
||||||
|
/// The heartbeat performs regular updates such as updating reputations and performing discovery
|
||||||
|
/// requests. This defines the interval in seconds.
|
||||||
|
#[serde(deserialize_with = "deserialize_duration")]
|
||||||
|
pub heartbeat_interval: Duration,
|
||||||
/// Whether discovery is enabled.
|
/// Whether discovery is enabled.
|
||||||
pub discovery_enabled: bool,
|
pub discovery_enabled: bool,
|
||||||
/// Whether metrics are enabled.
|
/// Whether metrics are enabled.
|
||||||
@ -35,6 +45,7 @@ pub struct Config {
|
|||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Config {
|
Config {
|
||||||
|
heartbeat_interval: Duration::from_secs(30),
|
||||||
discovery_enabled: true,
|
discovery_enabled: true,
|
||||||
metrics_enabled: false,
|
metrics_enabled: false,
|
||||||
target_peer_count: DEFAULT_TARGET_PEERS,
|
target_peer_count: DEFAULT_TARGET_PEERS,
|
||||||
|
@ -30,10 +30,6 @@ use std::net::IpAddr;
|
|||||||
pub mod config;
|
pub mod config;
|
||||||
mod network_behaviour;
|
mod network_behaviour;
|
||||||
|
|
||||||
/// The heartbeat performs regular updates such as updating reputations and performing discovery
|
|
||||||
/// requests. This defines the interval in seconds.
|
|
||||||
const HEARTBEAT_INTERVAL: u64 = 30;
|
|
||||||
|
|
||||||
/// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would
|
/// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would
|
||||||
/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
|
/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
|
||||||
/// peers.
|
/// peers.
|
||||||
@ -105,6 +101,7 @@ impl PeerManager {
|
|||||||
network_globals: Arc<NetworkGlobals>,
|
network_globals: Arc<NetworkGlobals>,
|
||||||
) -> error::Result<Self> {
|
) -> error::Result<Self> {
|
||||||
let config::Config {
|
let config::Config {
|
||||||
|
heartbeat_interval,
|
||||||
discovery_enabled,
|
discovery_enabled,
|
||||||
metrics_enabled,
|
metrics_enabled,
|
||||||
target_peer_count,
|
target_peer_count,
|
||||||
@ -114,7 +111,7 @@ impl PeerManager {
|
|||||||
} = cfg;
|
} = cfg;
|
||||||
|
|
||||||
// Set up the peer manager heartbeat interval
|
// Set up the peer manager heartbeat interval
|
||||||
let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL));
|
let heartbeat = tokio::time::interval(heartbeat_interval);
|
||||||
|
|
||||||
Ok(PeerManager {
|
Ok(PeerManager {
|
||||||
network_globals,
|
network_globals,
|
||||||
|
@ -112,7 +112,12 @@ impl ClientBuilder {
|
|||||||
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
||||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
let executor = require!("sync", self, runtime_context).clone().executor;
|
||||||
let store = Arc::new(
|
let store = Arc::new(
|
||||||
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
|
LogManager::rocksdb(
|
||||||
|
config.log_config.clone(),
|
||||||
|
config.db_dir.join("flow_db"),
|
||||||
|
config.db_dir.join("data_db"),
|
||||||
|
executor,
|
||||||
|
)
|
||||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -5,10 +5,11 @@ use ethereum_types::{H256, U256};
|
|||||||
use ethers::prelude::{Http, Middleware, Provider};
|
use ethers::prelude::{Http, Middleware, Provider};
|
||||||
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
|
use log_entry_sync::{CacheConfig, ContractAddress, LogSyncConfig};
|
||||||
use miner::MinerConfig;
|
use miner::MinerConfig;
|
||||||
use network::NetworkConfig;
|
use network::{EnrExt, NetworkConfig};
|
||||||
use pruner::PrunerConfig;
|
use pruner::PrunerConfig;
|
||||||
use shared_types::{NetworkIdentity, ProtocolVersion};
|
use shared_types::{NetworkIdentity, ProtocolVersion};
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
use storage::log_store::log_manager::LogConfig;
|
use storage::log_store::log_manager::LogConfig;
|
||||||
@ -38,7 +39,7 @@ impl ZgsConfig {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
||||||
.as_u64();
|
.as_u64();
|
||||||
network_config.network_id = NetworkIdentity {
|
let local_network_id = NetworkIdentity {
|
||||||
chain_id,
|
chain_id,
|
||||||
flow_address,
|
flow_address,
|
||||||
p2p_protocol_version: ProtocolVersion {
|
p2p_protocol_version: ProtocolVersion {
|
||||||
@ -47,6 +48,7 @@ impl ZgsConfig {
|
|||||||
build: network::PROTOCOL_VERSION[2],
|
build: network::PROTOCOL_VERSION[2],
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
network_config.network_id = local_network_id.clone();
|
||||||
|
|
||||||
if !self.network_disable_discovery {
|
if !self.network_disable_discovery {
|
||||||
network_config.enr_tcp_port = Some(self.network_enr_tcp_port);
|
network_config.enr_tcp_port = Some(self.network_enr_tcp_port);
|
||||||
@ -82,7 +84,13 @@ impl ZgsConfig {
|
|||||||
.collect::<Result<_, _>>()
|
.collect::<Result<_, _>>()
|
||||||
.map_err(|e| format!("Unable to parse network_libp2p_nodes: {:?}", e))?;
|
.map_err(|e| format!("Unable to parse network_libp2p_nodes: {:?}", e))?;
|
||||||
|
|
||||||
network_config.discv5_config.table_filter = |_| true;
|
network_config.discv5_config.table_filter = if self.discv5_disable_enr_network_id {
|
||||||
|
Arc::new(|_| true)
|
||||||
|
} else {
|
||||||
|
Arc::new(
|
||||||
|
move |enr| matches!(enr.network_identity(), Some(Ok(id)) if id == local_network_id),
|
||||||
|
)
|
||||||
|
};
|
||||||
network_config.discv5_config.request_timeout =
|
network_config.discv5_config.request_timeout =
|
||||||
Duration::from_secs(self.discv5_request_timeout_secs);
|
Duration::from_secs(self.discv5_request_timeout_secs);
|
||||||
network_config.discv5_config.query_peer_timeout =
|
network_config.discv5_config.query_peer_timeout =
|
||||||
@ -97,6 +105,8 @@ impl ZgsConfig {
|
|||||||
network_config.private = self.network_private;
|
network_config.private = self.network_private;
|
||||||
|
|
||||||
network_config.peer_db = self.network_peer_db;
|
network_config.peer_db = self.network_peer_db;
|
||||||
|
network_config.peer_manager = self.network_peer_manager;
|
||||||
|
network_config.disable_enr_network_id = self.discv5_disable_enr_network_id;
|
||||||
|
|
||||||
Ok(network_config)
|
Ok(network_config)
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ build_config! {
|
|||||||
(discv5_report_discovered_peers, (bool), false)
|
(discv5_report_discovered_peers, (bool), false)
|
||||||
(discv5_disable_packet_filter, (bool), false)
|
(discv5_disable_packet_filter, (bool), false)
|
||||||
(discv5_disable_ip_limit, (bool), false)
|
(discv5_disable_ip_limit, (bool), false)
|
||||||
|
(discv5_disable_enr_network_id, (bool), false)
|
||||||
|
|
||||||
// log sync
|
// log sync
|
||||||
(blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string())
|
(blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string())
|
||||||
@ -87,6 +88,9 @@ pub struct ZgsConfig {
|
|||||||
/// Network peer db config, configured by [network_peer_db] section by `config` crate.
|
/// Network peer db config, configured by [network_peer_db] section by `config` crate.
|
||||||
pub network_peer_db: network::peer_manager::peerdb::PeerDBConfig,
|
pub network_peer_db: network::peer_manager::peerdb::PeerDBConfig,
|
||||||
|
|
||||||
|
/// Network peer manager config, configured by [network_peer_manager] section by `config` crate.
|
||||||
|
pub network_peer_manager: network::peer_manager::config::Config,
|
||||||
|
|
||||||
// router config, configured by [router] section by `config` crate.
|
// router config, configured by [router] section by `config` crate.
|
||||||
pub router: router::Config,
|
pub router: router::Config,
|
||||||
|
|
||||||
|
@ -25,7 +25,12 @@ fn write_performance(c: &mut Criterion) {
|
|||||||
let executor = runtime.task_executor.clone();
|
let executor = runtime.task_executor.clone();
|
||||||
|
|
||||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||||
LogManager::rocksdb(LogConfig::default(), "db_write", executor)
|
LogManager::rocksdb(
|
||||||
|
LogConfig::default(),
|
||||||
|
"db_flow_write",
|
||||||
|
"db_data_write",
|
||||||
|
executor,
|
||||||
|
)
|
||||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
));
|
));
|
||||||
@ -114,7 +119,12 @@ fn read_performance(c: &mut Criterion) {
|
|||||||
let executor = runtime.task_executor.clone();
|
let executor = runtime.task_executor.clone();
|
||||||
|
|
||||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||||
LogManager::rocksdb(LogConfig::default(), "db_read", executor)
|
LogManager::rocksdb(
|
||||||
|
LogConfig::default(),
|
||||||
|
"db_flow_read",
|
||||||
|
"db_data_read",
|
||||||
|
executor,
|
||||||
|
)
|
||||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
));
|
));
|
||||||
|
@ -63,22 +63,22 @@ impl<T: ?Sized + Configurable> ConfigurableExt for T {}
|
|||||||
|
|
||||||
impl Configurable for LogManager {
|
impl Configurable for LogManager {
|
||||||
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||||
Ok(self.db.get(COL_MISC, key)?)
|
Ok(self.flow_db.get(COL_MISC, key)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
self.db.put(COL_MISC, key, value)?;
|
self.flow_db.put(COL_MISC, key, value)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_config(&self, key: &[u8]) -> Result<()> {
|
fn remove_config(&self, key: &[u8]) -> Result<()> {
|
||||||
Ok(self.db.delete(COL_MISC, key)?)
|
Ok(self.flow_db.delete(COL_MISC, key)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
|
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
|
||||||
let mut db_tx = self.db.transaction();
|
let mut db_tx = self.flow_db.transaction();
|
||||||
db_tx.ops = tx.ops;
|
db_tx.ops = tx.ops;
|
||||||
self.db.write(db_tx)?;
|
self.flow_db.write(db_tx)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -2,8 +2,7 @@ use crate::config::ShardConfig;
|
|||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::log_store::load_chunk::EntryBatch;
|
use crate::log_store::load_chunk::EntryBatch;
|
||||||
use crate::log_store::log_manager::{
|
use crate::log_store::log_manager::{
|
||||||
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
|
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
|
||||||
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
|
||||||
};
|
};
|
||||||
use crate::log_store::seal_task_manager::SealTaskManager;
|
use crate::log_store::seal_task_manager::SealTaskManager;
|
||||||
use crate::log_store::{
|
use crate::log_store::{
|
||||||
@ -12,45 +11,36 @@ use crate::log_store::{
|
|||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
use any::Any;
|
use any::Any;
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
|
use append_merkle::{MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use kvdb::DBTransaction;
|
use kvdb::DBTransaction;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||||
use std::cmp::Ordering;
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use std::{any, cmp, mem};
|
use std::{any, cmp};
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, error, trace};
|
||||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||||
|
|
||||||
pub struct FlowStore {
|
pub struct FlowStore {
|
||||||
db: Arc<FlowDBStore>,
|
data_db: Arc<FlowDBStore>,
|
||||||
seal_manager: SealTaskManager,
|
seal_manager: SealTaskManager,
|
||||||
config: FlowConfig,
|
config: FlowConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowStore {
|
impl FlowStore {
|
||||||
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
db,
|
data_db,
|
||||||
seal_manager: Default::default(),
|
seal_manager: Default::default(),
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
|
|
||||||
let start_time = Instant::now();
|
|
||||||
let res = self.db.put_batch_root_list(root_map);
|
|
||||||
|
|
||||||
metrics::PUT_BATCH_ROOT_LIST.update_since(start_time);
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn insert_subtree_list_for_batch(
|
pub fn insert_subtree_list_for_batch(
|
||||||
&self,
|
&self,
|
||||||
batch_index: usize,
|
batch_index: usize,
|
||||||
@ -58,20 +48,19 @@ impl FlowStore {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
let mut batch = self
|
let mut batch = self
|
||||||
.db
|
.data_db
|
||||||
.get_entry_batch(batch_index as u64)?
|
.get_entry_batch(batch_index as u64)?
|
||||||
.unwrap_or_else(|| EntryBatch::new(batch_index as u64));
|
.unwrap_or_else(|| EntryBatch::new(batch_index as u64));
|
||||||
batch.set_subtree_list(subtree_list);
|
batch.set_subtree_list(subtree_list);
|
||||||
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
self.data_db
|
||||||
|
.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
||||||
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
|
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn gen_proof_in_batch(&self, batch_index: usize, sector_index: usize) -> Result<FlowProof> {
|
pub fn gen_proof_in_batch(&self, batch_index: usize, sector_index: usize) -> Result<FlowProof> {
|
||||||
let batch = self
|
let batch = self
|
||||||
.db
|
.data_db
|
||||||
.get_entry_batch(batch_index as u64)?
|
.get_entry_batch(batch_index as u64)?
|
||||||
.ok_or_else(|| anyhow!("batch missing, index={}", batch_index))?;
|
.ok_or_else(|| anyhow!("batch missing, index={}", batch_index))?;
|
||||||
let merkle = batch.to_merkle_tree(batch_index == 0)?.ok_or_else(|| {
|
let merkle = batch.to_merkle_tree(batch_index == 0)?.ok_or_else(|| {
|
||||||
@ -83,24 +72,9 @@ impl FlowStore {
|
|||||||
merkle.gen_proof(sector_index)
|
merkle.gen_proof(sector_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
|
||||||
let start_time = Instant::now();
|
|
||||||
let res = self.db.put_mpt_node_list(node_list);
|
|
||||||
metrics::PUT_MPT_NODE.update_since(start_time);
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
||||||
self.seal_manager.delete_batch_list(batch_list);
|
self.seal_manager.delete_batch_list(batch_list);
|
||||||
self.db.delete_batch_list(batch_list)
|
self.data_db.delete_batch_list(batch_list)
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_raw_batch(&self, batch_index: u64) -> Result<Option<EntryBatch>> {
|
|
||||||
self.db.get_entry_batch(batch_index)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
|
|
||||||
self.db.get_batch_root(batch_index)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,7 +120,7 @@ impl FlowRead for FlowStore {
|
|||||||
length -= 1;
|
length -= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
let entry_batch = try_option!(self.db.get_entry_batch(chunk_index)?);
|
let entry_batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
|
||||||
let mut entry_batch_data =
|
let mut entry_batch_data =
|
||||||
try_option!(entry_batch.get_unsealed_data(offset as usize, length as usize));
|
try_option!(entry_batch.get_unsealed_data(offset as usize, length as usize));
|
||||||
data.append(&mut entry_batch_data);
|
data.append(&mut entry_batch_data);
|
||||||
@ -175,7 +149,7 @@ impl FlowRead for FlowStore {
|
|||||||
let chunk_index = start_entry_index / self.config.batch_size as u64;
|
let chunk_index = start_entry_index / self.config.batch_size as u64;
|
||||||
|
|
||||||
if let Some(mut data_list) = self
|
if let Some(mut data_list) = self
|
||||||
.db
|
.data_db
|
||||||
.get_entry_batch(chunk_index)?
|
.get_entry_batch(chunk_index)?
|
||||||
.map(|b| b.into_data_list(start_entry_index))
|
.map(|b| b.into_data_list(start_entry_index))
|
||||||
{
|
{
|
||||||
@ -199,13 +173,8 @@ impl FlowRead for FlowStore {
|
|||||||
Ok(entry_list)
|
Ok(entry_list)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the list of all stored chunk roots.
|
|
||||||
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
|
|
||||||
self.db.get_batch_root_list()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
|
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
|
||||||
let batch = try_option!(self.db.get_entry_batch(chunk_index)?);
|
let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
|
||||||
let mut mine_chunk = MineLoadChunk::default();
|
let mut mine_chunk = MineLoadChunk::default();
|
||||||
for (seal_index, (sealed, validity)) in mine_chunk
|
for (seal_index, (sealed, validity)) in mine_chunk
|
||||||
.loaded_chunk
|
.loaded_chunk
|
||||||
@ -223,7 +192,7 @@ impl FlowRead for FlowStore {
|
|||||||
|
|
||||||
fn get_num_entries(&self) -> Result<u64> {
|
fn get_num_entries(&self) -> Result<u64> {
|
||||||
// This is an over-estimation as it assumes each batch is full.
|
// This is an over-estimation as it assumes each batch is full.
|
||||||
self.db
|
self.data_db
|
||||||
.kvdb
|
.kvdb
|
||||||
.num_keys(COL_ENTRY_BATCH)
|
.num_keys(COL_ENTRY_BATCH)
|
||||||
.map(|num_batches| num_batches * PORA_CHUNK_SIZE as u64)
|
.map(|num_batches| num_batches * PORA_CHUNK_SIZE as u64)
|
||||||
@ -264,7 +233,7 @@ impl FlowWrite for FlowStore {
|
|||||||
|
|
||||||
// TODO: Try to avoid loading from db if possible.
|
// TODO: Try to avoid loading from db if possible.
|
||||||
let mut batch = self
|
let mut batch = self
|
||||||
.db
|
.data_db
|
||||||
.get_entry_batch(chunk_index)?
|
.get_entry_batch(chunk_index)?
|
||||||
.unwrap_or_else(|| EntryBatch::new(chunk_index));
|
.unwrap_or_else(|| EntryBatch::new(chunk_index));
|
||||||
let completed_seals = batch.insert_data(
|
let completed_seals = batch.insert_data(
|
||||||
@ -284,12 +253,12 @@ impl FlowWrite for FlowStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
metrics::APPEND_ENTRIES.update_since(start_time);
|
metrics::APPEND_ENTRIES.update_since(start_time);
|
||||||
self.db.put_entry_batch_list(batch_list)
|
self.data_db.put_entry_batch_list(batch_list)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
|
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
|
||||||
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||||
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
|
let to_reseal = self.data_db.truncate(start_index, self.config.batch_size)?;
|
||||||
|
|
||||||
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
|
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
|
||||||
let new_seal_version = self.seal_manager.inc_seal_version();
|
let new_seal_version = self.seal_manager.inc_seal_version();
|
||||||
@ -319,7 +288,7 @@ impl FlowSeal for FlowStore {
|
|||||||
let mut tasks = Vec::with_capacity(SEALS_PER_LOAD);
|
let mut tasks = Vec::with_capacity(SEALS_PER_LOAD);
|
||||||
|
|
||||||
let batch_data = self
|
let batch_data = self
|
||||||
.db
|
.data_db
|
||||||
.get_entry_batch((first_index / SEALS_PER_LOAD) as u64)?
|
.get_entry_batch((first_index / SEALS_PER_LOAD) as u64)?
|
||||||
.expect("Lost data chunk in to_seal_set");
|
.expect("Lost data chunk in to_seal_set");
|
||||||
|
|
||||||
@ -358,7 +327,7 @@ impl FlowSeal for FlowStore {
|
|||||||
.chunk_by(|answer| answer.seal_index / SEALS_PER_LOAD as u64)
|
.chunk_by(|answer| answer.seal_index / SEALS_PER_LOAD as u64)
|
||||||
{
|
{
|
||||||
let mut batch_chunk = self
|
let mut batch_chunk = self
|
||||||
.db
|
.data_db
|
||||||
.get_entry_batch(load_index)?
|
.get_entry_batch(load_index)?
|
||||||
.expect("Can not find chunk data");
|
.expect("Can not find chunk data");
|
||||||
for answer in answers_in_chunk {
|
for answer in answers_in_chunk {
|
||||||
@ -374,7 +343,7 @@ impl FlowSeal for FlowStore {
|
|||||||
to_seal_set.remove(&idx);
|
to_seal_set.remove(&idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.db.put_entry_raw(updated_chunk)?;
|
self.data_db.put_entry_raw(updated_chunk)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -404,12 +373,6 @@ impl FlowDBStore {
|
|||||||
);
|
);
|
||||||
if let Some(root) = batch.build_root(batch_index == 0)? {
|
if let Some(root) = batch.build_root(batch_index == 0)? {
|
||||||
trace!("complete batch: index={}", batch_index);
|
trace!("complete batch: index={}", batch_index);
|
||||||
tx.put(
|
|
||||||
COL_ENTRY_BATCH_ROOT,
|
|
||||||
// (batch_index, subtree_depth)
|
|
||||||
&encode_batch_root_key(batch_index as usize, 1),
|
|
||||||
root.as_bytes(),
|
|
||||||
);
|
|
||||||
completed_batches.push((batch_index, root));
|
completed_batches.push((batch_index, root));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -436,94 +399,6 @@ impl FlowDBStore {
|
|||||||
Ok(Some(EntryBatch::from_ssz_bytes(&raw).map_err(Error::from)?))
|
Ok(Some(EntryBatch::from_ssz_bytes(&raw).map_err(Error::from)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
|
|
||||||
let mut tx = self.kvdb.transaction();
|
|
||||||
for (batch_index, (root, subtree_depth)) in root_map {
|
|
||||||
tx.put(
|
|
||||||
COL_ENTRY_BATCH_ROOT,
|
|
||||||
&encode_batch_root_key(batch_index, subtree_depth),
|
|
||||||
root.as_bytes(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(self.kvdb.write(tx)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_batch_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
|
|
||||||
let mut range_root = None;
|
|
||||||
// A list of `BatchRoot` that can reconstruct the whole merkle tree structure.
|
|
||||||
let mut root_list = Vec::new();
|
|
||||||
// A list of leaf `(index, root_hash)` in the subtrees of some nodes in `root_list`,
|
|
||||||
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
|
|
||||||
let mut leaf_list = Vec::new();
|
|
||||||
let mut expected_index = 0;
|
|
||||||
|
|
||||||
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
|
||||||
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
|
||||||
|
|
||||||
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
|
||||||
let (index_bytes, root_bytes) = r?;
|
|
||||||
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
|
|
||||||
let root = DataRoot::from_slice(root_bytes.as_ref());
|
|
||||||
debug!(
|
|
||||||
"load root depth={}, index expected={} get={} root={:?}",
|
|
||||||
subtree_depth, expected_index, batch_index, root,
|
|
||||||
);
|
|
||||||
if subtree_depth == 1 {
|
|
||||||
if range_root.is_none() {
|
|
||||||
// This is expected to be the next leaf.
|
|
||||||
if batch_index == expected_index {
|
|
||||||
root_list.push((1, root));
|
|
||||||
expected_index += 1;
|
|
||||||
} else {
|
|
||||||
bail!(
|
|
||||||
"unexpected chunk leaf, expected={}, get={}",
|
|
||||||
expected_index,
|
|
||||||
batch_index
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
match batch_index.cmp(&expected_index) {
|
|
||||||
Ordering::Less => {
|
|
||||||
// This leaf is within a subtree whose root is known.
|
|
||||||
leaf_list.push((batch_index, root));
|
|
||||||
}
|
|
||||||
Ordering::Equal => {
|
|
||||||
// A subtree range ends.
|
|
||||||
range_root = None;
|
|
||||||
root_list.push((1, root));
|
|
||||||
expected_index += 1;
|
|
||||||
}
|
|
||||||
Ordering::Greater => {
|
|
||||||
while batch_index > expected_index {
|
|
||||||
// Fill the gap with empty leaves.
|
|
||||||
root_list.push((1, empty_root));
|
|
||||||
expected_index += 1;
|
|
||||||
}
|
|
||||||
range_root = None;
|
|
||||||
root_list.push((1, root));
|
|
||||||
expected_index += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
while batch_index > expected_index {
|
|
||||||
// Fill the gap with empty leaves.
|
|
||||||
root_list.push((1, empty_root));
|
|
||||||
expected_index += 1;
|
|
||||||
}
|
|
||||||
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
|
|
||||||
root_list.push((subtree_depth, root));
|
|
||||||
expected_index += 1 << (subtree_depth - 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let extra_node_list = self.get_mpt_node_list()?;
|
|
||||||
Ok(MerkleTreeInitialData {
|
|
||||||
subtree_list: root_list,
|
|
||||||
known_leaves: leaf_list,
|
|
||||||
extra_mpt_nodes: extra_node_list,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn truncate(&self, start_index: u64, batch_size: usize) -> crate::error::Result<Vec<usize>> {
|
fn truncate(&self, start_index: u64, batch_size: usize) -> crate::error::Result<Vec<usize>> {
|
||||||
let mut tx = self.kvdb.transaction();
|
let mut tx = self.kvdb.transaction();
|
||||||
let mut start_batch_index = start_index / batch_size as u64;
|
let mut start_batch_index = start_index / batch_size as u64;
|
||||||
@ -564,38 +439,11 @@ impl FlowDBStore {
|
|||||||
};
|
};
|
||||||
for batch_index in start_batch_index as usize..=end {
|
for batch_index in start_batch_index as usize..=end {
|
||||||
tx.delete(COL_ENTRY_BATCH, &batch_index.to_be_bytes());
|
tx.delete(COL_ENTRY_BATCH, &batch_index.to_be_bytes());
|
||||||
tx.delete_prefix(COL_ENTRY_BATCH_ROOT, &batch_index.to_be_bytes());
|
|
||||||
}
|
}
|
||||||
self.kvdb.write(tx)?;
|
self.kvdb.write(tx)?;
|
||||||
Ok(index_to_reseal)
|
Ok(index_to_reseal)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_mpt_node_list(&self, mpt_node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
|
||||||
let mut tx = self.kvdb.transaction();
|
|
||||||
for (layer_index, position, data) in mpt_node_list {
|
|
||||||
tx.put(
|
|
||||||
COL_FLOW_MPT_NODES,
|
|
||||||
&encode_mpt_node_key(layer_index, position),
|
|
||||||
data.as_bytes(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(self.kvdb.write(tx)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_mpt_node_list(&self) -> Result<Vec<(usize, usize, DataRoot)>> {
|
|
||||||
let mut node_list = Vec::new();
|
|
||||||
for r in self.kvdb.iter(COL_FLOW_MPT_NODES) {
|
|
||||||
let (index_bytes, node_bytes) = r?;
|
|
||||||
let (layer_index, position) = decode_mpt_node_key(index_bytes.as_ref())?;
|
|
||||||
node_list.push((
|
|
||||||
layer_index,
|
|
||||||
position,
|
|
||||||
DataRoot::from_slice(node_bytes.as_ref()),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Ok(node_list)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
||||||
let mut tx = self.kvdb.transaction();
|
let mut tx = self.kvdb.transaction();
|
||||||
for i in batch_list {
|
for i in batch_list {
|
||||||
@ -603,16 +451,6 @@ impl FlowDBStore {
|
|||||||
}
|
}
|
||||||
Ok(self.kvdb.write(tx)?)
|
Ok(self.kvdb.write(tx)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
|
|
||||||
Ok(self
|
|
||||||
.kvdb
|
|
||||||
.get(
|
|
||||||
COL_ENTRY_BATCH_ROOT,
|
|
||||||
&encode_batch_root_key(batch_index as usize, 1),
|
|
||||||
)?
|
|
||||||
.map(|v| DataRoot::from_slice(&v)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
|
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
|
||||||
@ -658,37 +496,12 @@ fn decode_batch_index(data: &[u8]) -> Result<usize> {
|
|||||||
try_decode_usize(data)
|
try_decode_usize(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// For the same batch_index, we want to process the larger subtree_depth first in iteration.
|
|
||||||
fn encode_batch_root_key(batch_index: usize, subtree_depth: usize) -> Vec<u8> {
|
|
||||||
let mut key = batch_index.to_be_bytes().to_vec();
|
|
||||||
key.extend_from_slice(&(usize::MAX - subtree_depth).to_be_bytes());
|
|
||||||
key
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decode_batch_root_key(data: &[u8]) -> Result<(usize, usize)> {
|
|
||||||
if data.len() != mem::size_of::<usize>() * 2 {
|
|
||||||
bail!("invalid data length");
|
|
||||||
}
|
|
||||||
let batch_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
|
|
||||||
let subtree_depth = usize::MAX - try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
|
||||||
Ok((batch_index, subtree_depth))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn encode_mpt_node_key(layer_index: usize, position: usize) -> Vec<u8> {
|
fn encode_mpt_node_key(layer_index: usize, position: usize) -> Vec<u8> {
|
||||||
let mut key = layer_index.to_be_bytes().to_vec();
|
let mut key = layer_index.to_be_bytes().to_vec();
|
||||||
key.extend_from_slice(&position.to_be_bytes());
|
key.extend_from_slice(&position.to_be_bytes());
|
||||||
key
|
key
|
||||||
}
|
}
|
||||||
|
|
||||||
fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
|
||||||
if data.len() != mem::size_of::<usize>() * 2 {
|
|
||||||
bail!("invalid data length");
|
|
||||||
}
|
|
||||||
let layer_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
|
|
||||||
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
|
||||||
Ok((layer_index, position))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn layer_size_key(layer: usize) -> Vec<u8> {
|
fn layer_size_key(layer: usize) -> Vec<u8> {
|
||||||
let mut key = "layer_size".as_bytes().to_vec();
|
let mut key = "layer_size".as_bytes().to_vec();
|
||||||
key.extend_from_slice(&layer.to_be_bytes());
|
key.extend_from_slice(&layer.to_be_bytes());
|
||||||
|
@ -20,7 +20,7 @@ use shared_types::{
|
|||||||
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
||||||
};
|
};
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -59,13 +59,12 @@ static PAD_SEGMENT_ROOT: Lazy<H256> = Lazy::new(|| {
|
|||||||
.root()
|
.root()
|
||||||
});
|
});
|
||||||
pub struct UpdateFlowMessage {
|
pub struct UpdateFlowMessage {
|
||||||
pub root_map: BTreeMap<usize, (H256, usize)>,
|
|
||||||
pub pad_data: usize,
|
pub pad_data: usize,
|
||||||
pub tx_start_flow_index: u64,
|
pub tx_start_flow_index: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct LogManager {
|
pub struct LogManager {
|
||||||
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
|
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
|
||||||
tx_store: TransactionStore,
|
tx_store: TransactionStore,
|
||||||
flow_store: Arc<FlowStore>,
|
flow_store: Arc<FlowStore>,
|
||||||
merkle: RwLock<MerkleManager>,
|
merkle: RwLock<MerkleManager>,
|
||||||
@ -218,12 +217,11 @@ impl LogStoreChunkWrite for LogManager {
|
|||||||
self.append_entries(flow_entry_array, &mut merkle)?;
|
self.append_entries(flow_entry_array, &mut merkle)?;
|
||||||
|
|
||||||
if let Some(file_proof) = maybe_file_proof {
|
if let Some(file_proof) = maybe_file_proof {
|
||||||
let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof(
|
merkle.pora_chunks_merkle.fill_with_file_proof(
|
||||||
file_proof,
|
file_proof,
|
||||||
tx.merkle_nodes,
|
tx.merkle_nodes,
|
||||||
tx.start_entry_index,
|
tx.start_entry_index,
|
||||||
)?;
|
)?;
|
||||||
self.flow_store.put_mpt_node_list(updated_node_list)?;
|
|
||||||
}
|
}
|
||||||
metrics::PUT_CHUNKS.update_since(start_time);
|
metrics::PUT_CHUNKS.update_since(start_time);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
@ -394,10 +392,9 @@ impl LogStoreWrite for LogManager {
|
|||||||
// `merkle` is used in `validate_range_proof`.
|
// `merkle` is used in `validate_range_proof`.
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
if valid {
|
if valid {
|
||||||
let updated_nodes = merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.fill_with_range_proof(data.proof.clone())?;
|
.fill_with_range_proof(data.proof.clone())?;
|
||||||
self.flow_store.put_mpt_node_list(updated_nodes)?;
|
|
||||||
}
|
}
|
||||||
Ok(valid)
|
Ok(valid)
|
||||||
}
|
}
|
||||||
@ -624,28 +621,33 @@ impl LogStoreRead for LogManager {
|
|||||||
impl LogManager {
|
impl LogManager {
|
||||||
pub fn rocksdb(
|
pub fn rocksdb(
|
||||||
config: LogConfig,
|
config: LogConfig,
|
||||||
path: impl AsRef<Path>,
|
flow_path: impl AsRef<Path>,
|
||||||
|
data_path: impl AsRef<Path>,
|
||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
|
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
|
||||||
db_config.enable_statistics = true;
|
db_config.enable_statistics = true;
|
||||||
let db = Arc::new(Database::open(&db_config, path)?);
|
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
|
||||||
Self::new(db, config, executor)
|
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
|
||||||
|
Self::new(flow_db_source, data_db_source, config, executor)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
|
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
|
||||||
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||||
Self::new(db, config, executor)
|
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||||
|
Self::new(flow_db, data_db, config, executor)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new(
|
fn new(
|
||||||
db: Arc<dyn ZgsKeyValueDB>,
|
flow_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||||
|
data_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||||
config: LogConfig,
|
config: LogConfig,
|
||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let tx_store = TransactionStore::new(db.clone())?;
|
let tx_store = TransactionStore::new(flow_db_source.clone())?;
|
||||||
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
|
||||||
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
|
||||||
|
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
|
||||||
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||||
// first and call `put_tx` later.
|
// first and call `put_tx` later.
|
||||||
let next_tx_seq = tx_store.next_tx_seq();
|
let next_tx_seq = tx_store.next_tx_seq();
|
||||||
@ -749,7 +751,7 @@ impl LogManager {
|
|||||||
let (sender, receiver) = mpsc::channel();
|
let (sender, receiver) = mpsc::channel();
|
||||||
|
|
||||||
let mut log_manager = Self {
|
let mut log_manager = Self {
|
||||||
db,
|
flow_db: flow_db_source,
|
||||||
tx_store,
|
tx_store,
|
||||||
flow_store,
|
flow_store,
|
||||||
merkle,
|
merkle,
|
||||||
@ -783,8 +785,6 @@ impl LogManager {
|
|||||||
loop {
|
loop {
|
||||||
match rx.recv() {
|
match rx.recv() {
|
||||||
std::result::Result::Ok(data) => {
|
std::result::Result::Ok(data) => {
|
||||||
// Update the root index.
|
|
||||||
flow_store.put_batch_root_list(data.root_map).unwrap();
|
|
||||||
// Update the flow database.
|
// Update the flow database.
|
||||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||||
// subtrees with data known.
|
// subtrees with data known.
|
||||||
@ -857,21 +857,7 @@ impl LogManager {
|
|||||||
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
|
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let r = entry_proof(&top_proof, &sub_proof);
|
entry_proof(&top_proof, &sub_proof)
|
||||||
if r.is_err() {
|
|
||||||
let raw_batch = self.flow_store.get_raw_batch(seg_index as u64)?.unwrap();
|
|
||||||
let db_root = self.flow_store.get_batch_root(seg_index as u64)?;
|
|
||||||
error!(
|
|
||||||
?r,
|
|
||||||
?db_root,
|
|
||||||
?seg_index,
|
|
||||||
"gen proof error: top_leaves={}, last={}, raw_batch={}",
|
|
||||||
merkle.pora_chunks_merkle.leaves(),
|
|
||||||
merkle.last_chunk_merkle.leaves(),
|
|
||||||
serde_json::to_string(&raw_batch).unwrap(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
r
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self, merkle))]
|
#[instrument(skip(self, merkle))]
|
||||||
@ -888,7 +874,6 @@ impl LogManager {
|
|||||||
|
|
||||||
self.pad_tx(tx_start_index, &mut *merkle)?;
|
self.pad_tx(tx_start_index, &mut *merkle)?;
|
||||||
|
|
||||||
let mut batch_root_map = BTreeMap::new();
|
|
||||||
for (subtree_depth, subtree_root) in merkle_list {
|
for (subtree_depth, subtree_root) in merkle_list {
|
||||||
let subtree_size = 1 << (subtree_depth - 1);
|
let subtree_size = 1 << (subtree_depth - 1);
|
||||||
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
|
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
|
||||||
@ -906,10 +891,6 @@ impl LogManager {
|
|||||||
.update_last(merkle.last_chunk_merkle.root());
|
.update_last(merkle.last_chunk_merkle.root());
|
||||||
}
|
}
|
||||||
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
||||||
batch_root_map.insert(
|
|
||||||
merkle.pora_chunks_merkle.leaves() - 1,
|
|
||||||
(merkle.last_chunk_merkle.root(), 1),
|
|
||||||
);
|
|
||||||
self.complete_last_chunk_merkle(
|
self.complete_last_chunk_merkle(
|
||||||
merkle.pora_chunks_merkle.leaves() - 1,
|
merkle.pora_chunks_merkle.leaves() - 1,
|
||||||
&mut *merkle,
|
&mut *merkle,
|
||||||
@ -920,16 +901,11 @@ impl LogManager {
|
|||||||
// the chunks boundary.
|
// the chunks boundary.
|
||||||
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
|
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
|
||||||
assert!(subtree_size >= PORA_CHUNK_SIZE);
|
assert!(subtree_size >= PORA_CHUNK_SIZE);
|
||||||
batch_root_map.insert(
|
|
||||||
merkle.pora_chunks_merkle.leaves(),
|
|
||||||
(subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)),
|
|
||||||
);
|
|
||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.flow_store.put_batch_root_list(batch_root_map)?;
|
|
||||||
|
|
||||||
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -950,7 +926,6 @@ impl LogManager {
|
|||||||
if pad_size != 0 {
|
if pad_size != 0 {
|
||||||
for pad_data in Self::padding(pad_size as usize) {
|
for pad_data in Self::padding(pad_size as usize) {
|
||||||
let mut is_full_empty = true;
|
let mut is_full_empty = true;
|
||||||
let mut root_map = BTreeMap::new();
|
|
||||||
|
|
||||||
// Update the in-memory merkle tree.
|
// Update the in-memory merkle tree.
|
||||||
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
|
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
|
||||||
@ -978,10 +953,6 @@ impl LogManager {
|
|||||||
merkle
|
merkle
|
||||||
.pora_chunks_merkle
|
.pora_chunks_merkle
|
||||||
.update_last(merkle.last_chunk_merkle.root());
|
.update_last(merkle.last_chunk_merkle.root());
|
||||||
root_map.insert(
|
|
||||||
merkle.pora_chunks_merkle.leaves() - 1,
|
|
||||||
(merkle.last_chunk_merkle.root(), 1),
|
|
||||||
);
|
|
||||||
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -989,10 +960,6 @@ impl LogManager {
|
|||||||
let mut start_index = last_chunk_pad / ENTRY_SIZE;
|
let mut start_index = last_chunk_pad / ENTRY_SIZE;
|
||||||
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
|
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
|
||||||
merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
|
merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
|
||||||
root_map.insert(
|
|
||||||
merkle.pora_chunks_merkle.leaves() - 1,
|
|
||||||
(*PAD_SEGMENT_ROOT, 1),
|
|
||||||
);
|
|
||||||
start_index += PORA_CHUNK_SIZE;
|
start_index += PORA_CHUNK_SIZE;
|
||||||
}
|
}
|
||||||
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
|
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
|
||||||
@ -1001,12 +968,10 @@ impl LogManager {
|
|||||||
let data_size = pad_data.len() / ENTRY_SIZE;
|
let data_size = pad_data.len() / ENTRY_SIZE;
|
||||||
if is_full_empty {
|
if is_full_empty {
|
||||||
self.sender.send(UpdateFlowMessage {
|
self.sender.send(UpdateFlowMessage {
|
||||||
root_map,
|
|
||||||
pad_data: pad_data.len(),
|
pad_data: pad_data.len(),
|
||||||
tx_start_flow_index,
|
tx_start_flow_index,
|
||||||
})?;
|
})?;
|
||||||
} else {
|
} else {
|
||||||
self.flow_store.put_batch_root_list(root_map).unwrap();
|
|
||||||
// Update the flow database.
|
// Update the flow database.
|
||||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||||
// subtrees with data known.
|
// subtrees with data known.
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
use append_merkle::MerkleTreeInitialData;
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||||
@ -212,8 +212,6 @@ pub trait FlowRead {
|
|||||||
/// For simplicity, `index_start` and `index_end` must be at the batch boundaries.
|
/// For simplicity, `index_start` and `index_end` must be at the batch boundaries.
|
||||||
fn get_available_entries(&self, index_start: u64, index_end: u64) -> Result<Vec<ChunkArray>>;
|
fn get_available_entries(&self, index_start: u64, index_end: u64) -> Result<Vec<ChunkArray>>;
|
||||||
|
|
||||||
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>>;
|
|
||||||
|
|
||||||
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
|
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
|
||||||
|
|
||||||
// An estimation of the number of entries in the flow db.
|
// An estimation of the number of entries in the flow db.
|
||||||
|
@ -3,6 +3,10 @@ from web3 import Web3
|
|||||||
ZGS_CONFIG = {
|
ZGS_CONFIG = {
|
||||||
"log_config_file": "log_config",
|
"log_config_file": "log_config",
|
||||||
"confirmation_block_count": 1,
|
"confirmation_block_count": 1,
|
||||||
|
"discv5_disable_ip_limit": True,
|
||||||
|
"network_peer_manager": {
|
||||||
|
"heartbeat_interval": "1s"
|
||||||
|
},
|
||||||
"router": {
|
"router": {
|
||||||
"private_ip_enabled": True,
|
"private_ip_enabled": True,
|
||||||
},
|
},
|
||||||
@ -18,6 +22,8 @@ ZGS_CONFIG = {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ZGS_NODEID = "16Uiu2HAmLkGFUbNFYdhuSbTQ5hmnPjFXx2zUDtwQ2uihHpN9YNNe"
|
||||||
|
|
||||||
BSC_CONFIG = dict(
|
BSC_CONFIG = dict(
|
||||||
NetworkId=1000,
|
NetworkId=1000,
|
||||||
HTTPPort=8545,
|
HTTPPort=8545,
|
||||||
|
74
tests/network_discovery_test.py
Normal file
74
tests/network_discovery_test.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
from config.node_config import ZGS_NODEID
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from utility.utils import p2p_port
|
||||||
|
|
||||||
|
class NetworkDiscoveryTest(TestFramework):
|
||||||
|
"""
|
||||||
|
This is to test whether community nodes could connect to each other via UDP discovery.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setup_params(self):
|
||||||
|
# 1 bootnode and 2 community nodes
|
||||||
|
self.num_nodes = 3
|
||||||
|
|
||||||
|
# setup for node 0 as bootnode
|
||||||
|
tests_dir = os.path.dirname(__file__)
|
||||||
|
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
|
||||||
|
bootnode_port = p2p_port(0)
|
||||||
|
self.zgs_node_configs[0] = {
|
||||||
|
# load pre-defined keypair
|
||||||
|
"network_dir": network_dir,
|
||||||
|
|
||||||
|
# enable UDP discovery relevant configs
|
||||||
|
"network_enr_address": "127.0.0.1",
|
||||||
|
"network_enr_tcp_port": bootnode_port,
|
||||||
|
"network_enr_udp_port": bootnode_port,
|
||||||
|
|
||||||
|
# disable trusted nodes
|
||||||
|
"network_libp2p_nodes": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
# setup node 1 & 2 as community nodes
|
||||||
|
bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"]
|
||||||
|
for i in range(1, self.num_nodes):
|
||||||
|
self.zgs_node_configs[i] = {
|
||||||
|
# enable UDP discovery relevant configs
|
||||||
|
"network_enr_address": "127.0.0.1",
|
||||||
|
"network_enr_tcp_port": p2p_port(i),
|
||||||
|
"network_enr_udp_port": p2p_port(i),
|
||||||
|
|
||||||
|
# disable trusted nodes and enable bootnodes
|
||||||
|
"network_libp2p_nodes": [],
|
||||||
|
"network_boot_nodes": bootnodes,
|
||||||
|
}
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
timeout_secs = 10
|
||||||
|
|
||||||
|
for iter in range(timeout_secs + 1):
|
||||||
|
assert iter < timeout_secs, "Timeout to discover nodes for peer connection"
|
||||||
|
time.sleep(1)
|
||||||
|
self.log.info("==================================== iter %s", iter)
|
||||||
|
|
||||||
|
total_connected = 0
|
||||||
|
for i in range(self.num_nodes):
|
||||||
|
info = self.nodes[i].rpc.admin_getNetworkInfo()
|
||||||
|
total_connected += info["connectedPeers"]
|
||||||
|
self.log.info(
|
||||||
|
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
|
||||||
|
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
|
||||||
|
)
|
||||||
|
|
||||||
|
if total_connected >= self.num_nodes * (self.num_nodes - 1):
|
||||||
|
break
|
||||||
|
|
||||||
|
self.log.info("====================================")
|
||||||
|
self.log.info("All nodes connected to each other successfully")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
NetworkDiscoveryTest().main()
|
74
tests/network_discovery_upgrade_test.py
Normal file
74
tests/network_discovery_upgrade_test.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
from config.node_config import ZGS_NODEID
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from utility.utils import p2p_port
|
||||||
|
|
||||||
|
class NetworkDiscoveryUpgradeTest(TestFramework):
|
||||||
|
"""
|
||||||
|
This is to test that low version community nodes could not connect to bootnodes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setup_params(self):
|
||||||
|
# 1 bootnode and 1 community node
|
||||||
|
self.num_nodes = 2
|
||||||
|
|
||||||
|
# setup for node 0 as bootnode
|
||||||
|
tests_dir = os.path.dirname(__file__)
|
||||||
|
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
|
||||||
|
bootnode_port = p2p_port(0)
|
||||||
|
self.zgs_node_configs[0] = {
|
||||||
|
# load pre-defined keypair
|
||||||
|
"network_dir": network_dir,
|
||||||
|
|
||||||
|
# enable UDP discovery relevant configs
|
||||||
|
"network_enr_address": "127.0.0.1",
|
||||||
|
"network_enr_tcp_port": bootnode_port,
|
||||||
|
"network_enr_udp_port": bootnode_port,
|
||||||
|
|
||||||
|
# disable trusted nodes
|
||||||
|
"network_libp2p_nodes": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
# setup node 1 as community node
|
||||||
|
bootnodes = [f"/ip4/127.0.0.1/udp/{bootnode_port}/p2p/{ZGS_NODEID}"]
|
||||||
|
for i in range(1, self.num_nodes):
|
||||||
|
self.zgs_node_configs[i] = {
|
||||||
|
# enable UDP discovery relevant configs
|
||||||
|
"network_enr_address": "127.0.0.1",
|
||||||
|
"network_enr_tcp_port": p2p_port(i),
|
||||||
|
"network_enr_udp_port": p2p_port(i),
|
||||||
|
|
||||||
|
# disable trusted nodes and enable bootnodes
|
||||||
|
"network_libp2p_nodes": [],
|
||||||
|
"network_boot_nodes": bootnodes,
|
||||||
|
|
||||||
|
# disable network identity in ENR
|
||||||
|
"discv5_disable_enr_network_id": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
for iter in range(10):
|
||||||
|
time.sleep(1)
|
||||||
|
self.log.info("==================================== iter %s", iter)
|
||||||
|
|
||||||
|
total_connected = 0
|
||||||
|
for i in range(self.num_nodes):
|
||||||
|
info = self.nodes[i].rpc.admin_getNetworkInfo()
|
||||||
|
total_connected += info["connectedPeers"]
|
||||||
|
self.log.info(
|
||||||
|
"Node[%s] peers: total = %s, banned = %s, disconnected = %s, connected = %s (in = %s, out = %s)",
|
||||||
|
i, info["totalPeers"], info["bannedPeers"], info["disconnectedPeers"], info["connectedPeers"], info["connectedIncomingPeers"], info["connectedOutgoingPeers"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# ENR incompatible and should not discover each other for TCP connection
|
||||||
|
assert total_connected == 0, "Nodes connected unexpectedly"
|
||||||
|
|
||||||
|
self.log.info("====================================")
|
||||||
|
self.log.info("ENR incompatible nodes do not connect to each other")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
NetworkDiscoveryUpgradeTest().main()
|
@ -2,7 +2,7 @@ use crate::{
|
|||||||
kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, PermitBanList, RateLimiter, RateLimiterBuilder,
|
kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, PermitBanList, RateLimiter, RateLimiterBuilder,
|
||||||
};
|
};
|
||||||
///! A set of configuration parameters to tune the discovery protocol.
|
///! A set of configuration parameters to tune the discovery protocol.
|
||||||
use std::time::Duration;
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
/// Configuration parameters that define the performance of the gossipsub network.
|
/// Configuration parameters that define the performance of the gossipsub network.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -57,7 +57,7 @@ pub struct Discv5Config {
|
|||||||
|
|
||||||
/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
|
/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
|
||||||
/// excluded if they do not pass this filter. The default is to accept all nodes.
|
/// excluded if they do not pass this filter. The default is to accept all nodes.
|
||||||
pub table_filter: fn(&Enr) -> bool,
|
pub table_filter: Arc<dyn Fn(&Enr) -> bool + Send + Sync>,
|
||||||
|
|
||||||
/// The time between pings to ensure connectivity amongst connected nodes. Default: 300
|
/// The time between pings to ensure connectivity amongst connected nodes. Default: 300
|
||||||
/// seconds.
|
/// seconds.
|
||||||
@ -123,7 +123,7 @@ impl Default for Discv5Config {
|
|||||||
query_parallelism: 3,
|
query_parallelism: 3,
|
||||||
ip_limit: false,
|
ip_limit: false,
|
||||||
incoming_bucket_limit: MAX_NODES_PER_BUCKET,
|
incoming_bucket_limit: MAX_NODES_PER_BUCKET,
|
||||||
table_filter: |_| true,
|
table_filter: Arc::new(|_| true),
|
||||||
ping_interval: Duration::from_secs(300),
|
ping_interval: Duration::from_secs(300),
|
||||||
report_discovered_peers: true,
|
report_discovered_peers: true,
|
||||||
filter_rate_limiter,
|
filter_rate_limiter,
|
||||||
@ -242,8 +242,8 @@ impl Discv5ConfigBuilder {
|
|||||||
|
|
||||||
/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
|
/// A filter used to decide whether to insert nodes into our local routing table. Nodes can be
|
||||||
/// excluded if they do not pass this filter.
|
/// excluded if they do not pass this filter.
|
||||||
pub fn table_filter(&mut self, filter: fn(&Enr) -> bool) -> &mut Self {
|
pub fn table_filter<F>(&mut self, filter: F) -> &mut Self where F: Fn(&Enr) -> bool + Send + Sync + 'static {
|
||||||
self.config.table_filter = filter;
|
self.config.table_filter = Arc::new(filter);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -754,6 +754,7 @@ impl Handler {
|
|||||||
// failed.
|
// failed.
|
||||||
enr.node_id() == node_address.node_id
|
enr.node_id() == node_address.node_id
|
||||||
&& (enr.udp_socket().is_none() || enr.udp_socket() == Some(node_address.socket_addr))
|
&& (enr.udp_socket().is_none() || enr.udp_socket() == Some(node_address.socket_addr))
|
||||||
|
&& enr.get("network_identity").is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a message that contains an authentication header.
|
/// Handle a message that contains an authentication header.
|
||||||
|
@ -1264,7 +1264,12 @@ impl Service {
|
|||||||
"Session established with Node: {}, direction: {}",
|
"Session established with Node: {}, direction: {}",
|
||||||
node_id, direction
|
node_id, direction
|
||||||
);
|
);
|
||||||
self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction));
|
|
||||||
|
// requires network identity in ENR, so as to refuse low version peers.
|
||||||
|
match enr.get("network_identity") {
|
||||||
|
Some(_) => self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction)),
|
||||||
|
None => debug!(ip=?enr.ip(), "No network identity in peer ENR"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A session could not be established or an RPC request timed-out (after a few retries, if
|
/// A session could not be established or an RPC request timed-out (after a few retries, if
|
||||||
|
Loading…
Reference in New Issue
Block a user