fix shard config init

This commit is contained in:
Peter Zhang 2025-03-12 20:26:13 +08:00
parent 898350e271
commit ca2d8213ba
4 changed files with 11 additions and 6 deletions

View File

@ -276,7 +276,7 @@ impl Pruner {
} }
} }
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> { pub async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store store
.get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY) .get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
.await .await

View File

@ -7,7 +7,7 @@ use network::{
self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver, self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
NetworkSender, RequestId, Service as LibP2PService, NetworkSender, RequestId, Service as LibP2PService,
}; };
use pruner::{Pruner, PrunerConfig, PrunerMessage}; use pruner::{get_shard_config, Pruner, PrunerConfig, PrunerMessage};
use router::RouterService; use router::RouterService;
use rpc::RPCConfig; use rpc::RPCConfig;
use std::sync::Arc; use std::sync::Arc;
@ -203,7 +203,7 @@ impl ClientBuilder {
if let Some(config) = config { if let Some(config) = config {
let executor = require!("miner", self, runtime_context).clone().executor; let executor = require!("miner", self, runtime_context).clone().executor;
let network_send = require!("miner", self, network).send.clone(); let network_send = require!("miner", self, network).send.clone();
let store = self.async_store.as_ref().unwrap().clone(); let store = require!("miner", self, async_store).clone();
let send = MineService::spawn(executor, network_send, config, store).await?; let send = MineService::spawn(executor, network_send, config, store).await?;
self.miner = Some(MinerComponents { send }); self.miner = Some(MinerComponents { send });
@ -225,7 +225,11 @@ impl ClientBuilder {
Ok(self) Ok(self)
} }
pub async fn with_shard(self, config: ShardConfig) -> Result<Self, String> { pub async fn with_shard(self, mut config: ShardConfig) -> Result<Self, String> {
let store = require!("shard", self, async_store).clone();
if let Some(stored_config) = get_shard_config(store.as_ref()).await.unwrap_or(None) {
config = stored_config;
}
self.async_store self.async_store
.as_ref() .as_ref()
.unwrap() .unwrap()

View File

@ -23,6 +23,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
ClientBuilder::default() ClientBuilder::default()
.with_runtime_context(context) .with_runtime_context(context)
.with_rocksdb_store(&storage_config)? .with_rocksdb_store(&storage_config)?
.with_shard(shard_config)
.await?
.with_log_sync(log_sync_config) .with_log_sync(log_sync_config)
.await? .await?
.with_file_location_cache(config.file_location_cache) .with_file_location_cache(config.file_location_cache)
@ -34,8 +36,6 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await? .await?
.with_miner(miner_config) .with_miner(miner_config)
.await? .await?
.with_shard(shard_config)
.await?
.with_pruner(pruner_config) .with_pruner(pruner_config)
.await? .await?
.with_rpc(config.rpc) .with_rpc(config.rpc)

View File

@ -1157,6 +1157,7 @@ impl LogManager {
.get_tx_by_seq_number(from_tx_seq)? .get_tx_by_seq_number(from_tx_seq)?
.ok_or_else(|| anyhow!("from tx missing"))?; .ok_or_else(|| anyhow!("from tx missing"))?;
let mut to_tx_offset_list = Vec::with_capacity(to_tx_seq_list.len()); let mut to_tx_offset_list = Vec::with_capacity(to_tx_seq_list.len());
for seq in to_tx_seq_list { for seq in to_tx_seq_list {
// No need to copy data for completed tx. // No need to copy data for completed tx.
if self.check_tx_completed(seq)? { if self.check_tx_completed(seq)? {