Compare commits

...

3 Commits

Author SHA1 Message Date
Artem
2b2d8aa35b
Merge 112e083ace into d43a616b56 2025-03-12 22:29:13 +08:00
0g-peterzhb
d43a616b56
fix shard config init (#354)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2025-03-12 22:27:34 +08:00
Artem
112e083ace
adddefault_executor.rs 2025-02-08 12:31:18 +02:00
5 changed files with 85 additions and 6 deletions

View File

@ -276,7 +276,7 @@ impl Pruner {
}
}
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
pub async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store
.get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
.await

View File

@ -7,7 +7,7 @@ use network::{
self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
NetworkSender, RequestId, Service as LibP2PService,
};
use pruner::{Pruner, PrunerConfig, PrunerMessage};
use pruner::{get_shard_config, Pruner, PrunerConfig, PrunerMessage};
use router::RouterService;
use rpc::RPCConfig;
use std::sync::Arc;
@ -203,7 +203,7 @@ impl ClientBuilder {
if let Some(config) = config {
let executor = require!("miner", self, runtime_context).clone().executor;
let network_send = require!("miner", self, network).send.clone();
let store = self.async_store.as_ref().unwrap().clone();
let store = require!("miner", self, async_store).clone();
let send = MineService::spawn(executor, network_send, config, store).await?;
self.miner = Some(MinerComponents { send });
@ -225,7 +225,11 @@ impl ClientBuilder {
Ok(self)
}
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> {
pub async fn with_shard(self, mut config: ShardConfig) -> Result<Self, String> {
let store = require!("shard", self, async_store).clone();
if let Some(stored_config) = get_shard_config(store.as_ref()).await.unwrap_or(None) {
config = stored_config;
}
self.async_store
.as_ref()
.unwrap()

View File

@ -23,6 +23,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
ClientBuilder::default()
.with_runtime_context(context)
.with_rocksdb_store(&storage_config)?
.with_shard(shard_config)
.await?
.with_log_sync(log_sync_config)
.await?
.with_file_location_cache(config.file_location_cache)
@ -34,8 +36,6 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await?
.with_miner(miner_config)
.await?
.with_shard(shard_config)
.await?
.with_pruner(pruner_config)
.await?
.with_rpc(config.rpc)

View File

@ -1157,6 +1157,7 @@ impl LogManager {
.get_tx_by_seq_number(from_tx_seq)?
.ok_or_else(|| anyhow!("from tx missing"))?;
let mut to_tx_offset_list = Vec::with_capacity(to_tx_seq_list.len());
for seq in to_tx_seq_list {
// No need to copy data for completed tx.
if self.check_tx_completed(seq)? {

View File

@ -0,0 +1,74 @@
//! Demonstrates how to run a basic Discovery v5 Service with the default Tokio executor.
//!
//! Discv5 requires a Tokio executor with all features. If none is passed, it will use the current
//! runtime that built the `Discv5` struct.
//!
//! To run this example simply run:
//! ```
//! $ cargo run --example default_executor <BASE64ENR>
//! ```
use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder, Discv5Event};
use std::net::SocketAddr;
use tokio::runtime::Runtime;
#[tokio::main]
async fn main() {
// allows detailed logging with the RUST_LOG env variable
let filter_layer = tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| tracing_subscriber::EnvFilter::try_new("info"))
.unwrap();
let _ = tracing_subscriber::fmt()
.with_env_filter(filter_layer)
.try_init();
// listening address and port
let listen_addr = "0.0.0.0:9000".parse::<SocketAddr>().unwrap();
let enr_key = CombinedKey::generate_secp256k1();
// construct a local ENR
let enr = enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
// default configuration - uses the current executor
let config = Discv5ConfigBuilder::new().build();
// construct the discv5 server
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
// if we know of another peer's ENR, add it known peers
if let Some(base64_enr) = std::env::args().nth(1) {
match base64_enr.parse::<enr::Enr<enr::CombinedKey>>() {
Ok(enr) => {
println!(
"ENR Read. ip: {:?}, udp_port {:?}, tcp_port: {:?}",
enr.ip(),
enr.udp(),
enr.tcp()
);
if let Err(e) = discv5.add_enr(enr) {
println!("ENR was not added: {}", e);
}
}
Err(e) => panic!("Decoding ENR failed: {}", e),
}
}
// start the discv5 service
discv5.start(listen_addr).await.unwrap();
println!("Server started");
// get an event stream
let mut event_stream = discv5.event_stream().await.unwrap();
loop {
match event_stream.recv().await {
Some(Discv5Event::SocketUpdated(addr)) => {
println!("Nodes ENR socket address has been updated to: {:?}", addr);
}
Some(Discv5Event::Discovered(enr)) => {
println!("A peer has been discovered: {}", enr.node_id());
}
_ => {}
}
}
}