From ca2d8213ba153b2ca3822d58cab34113eabf80ea Mon Sep 17 00:00:00 2001 From: Peter Zhang <peter@0g.ai> Date: Wed, 12 Mar 2025 20:26:13 +0800 Subject: [PATCH] fix shard config init --- node/pruner/src/lib.rs | 2 +- node/src/client/builder.rs | 10 +++++++--- node/src/main.rs | 4 ++-- node/storage/src/log_store/log_manager.rs | 1 + 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index 142c998..e078924 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -276,7 +276,7 @@ impl Pruner { } } -async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> { +pub async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> { store .get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY) .await diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 0298083..b5ec148 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -7,7 +7,7 @@ use network::{ self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver, NetworkSender, RequestId, Service as LibP2PService, }; -use pruner::{Pruner, PrunerConfig, PrunerMessage}; +use pruner::{get_shard_config, Pruner, PrunerConfig, PrunerMessage}; use router::RouterService; use rpc::RPCConfig; use std::sync::Arc; @@ -203,7 +203,7 @@ impl ClientBuilder { if let Some(config) = config { let executor = require!("miner", self, runtime_context).clone().executor; let network_send = require!("miner", self, network).send.clone(); - let store = self.async_store.as_ref().unwrap().clone(); + let store = require!("miner", self, async_store).clone(); let send = MineService::spawn(executor, network_send, config, store).await?; self.miner = Some(MinerComponents { send }); @@ -225,7 +225,11 @@ impl ClientBuilder { Ok(self) } - pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> { + pub async fn with_shard(self, mut config: ShardConfig) -> Result<Self, String> { + let store = require!("shard", self, async_store).clone(); + if let Some(stored_config) = get_shard_config(store.as_ref()).await.unwrap_or(None) { + config = stored_config; + } self.async_store .as_ref() .unwrap() diff --git a/node/src/main.rs b/node/src/main.rs index 9c42046..ba21fa9 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -23,6 +23,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client ClientBuilder::default() .with_runtime_context(context) .with_rocksdb_store(&storage_config)? + .with_shard(shard_config) + .await? .with_log_sync(log_sync_config) .await? .with_file_location_cache(config.file_location_cache) @@ -34,8 +36,6 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client .await? .with_miner(miner_config) .await? - .with_shard(shard_config) - .await? .with_pruner(pruner_config) .await? .with_rpc(config.rpc) diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 516092e..6a024e2 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1157,6 +1157,7 @@ impl LogManager { .get_tx_by_seq_number(from_tx_seq)? .ok_or_else(|| anyhow!("from tx missing"))?; let mut to_tx_offset_list = Vec::with_capacity(to_tx_seq_list.len()); + for seq in to_tx_seq_list { // No need to copy data for completed tx. if self.check_tx_completed(seq)? {