Always init chunk pool (#275)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run

This commit is contained in:
Bo QIU 2024-11-18 16:08:56 +08:00 committed by GitHub
parent c4845f9103
commit cfe05b6f00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 28 additions and 20 deletions

View File

@ -1,5 +1,5 @@
use super::{Client, RuntimeContext}; use super::{Client, RuntimeContext};
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool}; use chunk_pool::{Config as ChunkPoolConfig, MemoryChunkPool};
use file_location_cache::FileLocationCache; use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager}; use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig}; use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
@ -54,7 +54,7 @@ struct PrunerComponents {
} }
struct ChunkPoolComponents { struct ChunkPoolComponents {
send: mpsc::UnboundedSender<ChunkPoolMessage>, chunk_pool: Arc<MemoryChunkPool>,
} }
/// Builds a `Client` instance. /// Builds a `Client` instance.
@ -230,7 +230,7 @@ impl ClientBuilder {
let executor = require!("router", self, runtime_context).clone().executor; let executor = require!("router", self, runtime_context).clone().executor;
let sync_send = require!("router", self, sync).send.clone(); // note: we can make this optional in the future let sync_send = require!("router", self, sync).send.clone(); // note: we can make this optional in the future
let miner_send = self.miner.as_ref().map(|x| x.send.clone()); let miner_send = self.miner.as_ref().map(|x| x.send.clone());
let chunk_pool_send = require!("router", self, chunk_pool).send.clone(); let chunk_pool_send = require!("router", self, chunk_pool).chunk_pool.sender();
let store = require!("router", self, store).clone(); let store = require!("router", self, store).clone();
let file_location_cache = require!("router", self, file_location_cache).clone(); let file_location_cache = require!("router", self, file_location_cache).clone();
@ -260,11 +260,7 @@ impl ClientBuilder {
Ok(self) Ok(self)
} }
pub async fn with_rpc( pub async fn with_rpc(self, rpc_config: RPCConfig) -> Result<Self, String> {
mut self,
rpc_config: RPCConfig,
chunk_pool_config: ChunkPoolConfig,
) -> Result<Self, String> {
if !rpc_config.enabled { if !rpc_config.enabled {
return Ok(self); return Ok(self);
} }
@ -273,16 +269,9 @@ impl ClientBuilder {
let async_store = require!("rpc", self, async_store).clone(); let async_store = require!("rpc", self, async_store).clone();
let network_send = require!("rpc", self, network).send.clone(); let network_send = require!("rpc", self, network).send.clone();
let mine_send = self.miner.as_ref().map(|x| x.send.clone()); let mine_send = self.miner.as_ref().map(|x| x.send.clone());
let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe();
let file_location_cache = require!("rpc", self, file_location_cache).clone(); let file_location_cache = require!("rpc", self, file_location_cache).clone();
let chunk_pool = require!("rpc", self, chunk_pool).chunk_pool.clone();
let (chunk_pool, chunk_pool_handler) =
chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone());
let chunk_pool_components = ChunkPoolComponents {
send: chunk_pool.sender(),
};
let chunk_pool_clone = chunk_pool.clone();
let ctx = rpc::Context { let ctx = rpc::Context {
config: rpc_config, config: rpc_config,
file_location_cache, file_location_cache,
@ -295,7 +284,7 @@ impl ClientBuilder {
mine_service_sender: mine_send, mine_service_sender: mine_send,
}; };
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx.clone()) let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx)
.await .await
.map_err(|e| format!("Unable to start HTTP RPC server: {:?}", e))?; .map_err(|e| format!("Unable to start HTTP RPC server: {:?}", e))?;
@ -303,13 +292,29 @@ impl ClientBuilder {
if let Some(admin_rpc_handle) = maybe_admin_rpc_handle { if let Some(admin_rpc_handle) = maybe_admin_rpc_handle {
executor.spawn(admin_rpc_handle, "rpc_admin"); executor.spawn(admin_rpc_handle, "rpc_admin");
} }
Ok(self)
}
pub async fn with_chunk_pool(
mut self,
chunk_pool_config: ChunkPoolConfig,
) -> Result<Self, String> {
let executor = require!("rpc", self, runtime_context).clone().executor;
let async_store = require!("rpc", self, async_store).clone();
let network_send = require!("rpc", self, network).send.clone();
let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe();
let (chunk_pool, chunk_pool_handler) =
chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone());
executor.spawn(chunk_pool_handler.run(), "chunk_pool_handler"); executor.spawn(chunk_pool_handler.run(), "chunk_pool_handler");
executor.spawn( executor.spawn(
MemoryChunkPool::monitor_log_entry(chunk_pool_clone, synced_tx_recv), MemoryChunkPool::monitor_log_entry(chunk_pool.clone(), synced_tx_recv),
"chunk_pool_log_monitor", "chunk_pool_log_monitor",
); );
self.chunk_pool = Some(chunk_pool_components); self.chunk_pool = Some(ChunkPoolComponents { chunk_pool });
Ok(self) Ok(self)
} }

View File

@ -14,6 +14,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
let network_config = config.network_config().await?; let network_config = config.network_config().await?;
let storage_config = config.storage_config()?; let storage_config = config.storage_config()?;
let log_sync_config = config.log_sync_config()?; let log_sync_config = config.log_sync_config()?;
let chunk_pool_config = config.chunk_pool_config()?;
let miner_config = config.mine_config()?; let miner_config = config.mine_config()?;
let router_config = config.router_config(&network_config)?; let router_config = config.router_config(&network_config)?;
let pruner_config = config.pruner_config()?; let pruner_config = config.pruner_config()?;
@ -27,6 +28,8 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.with_file_location_cache(config.file_location_cache) .with_file_location_cache(config.file_location_cache)
.with_network(&network_config) .with_network(&network_config)
.await? .await?
.with_chunk_pool(chunk_pool_config)
.await?
.with_sync(config.sync) .with_sync(config.sync)
.await? .await?
.with_miner(miner_config) .with_miner(miner_config)
@ -35,7 +38,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await? .await?
.with_pruner(pruner_config) .with_pruner(pruner_config)
.await? .await?
.with_rpc(config.rpc, config.chunk_pool_config()?) .with_rpc(config.rpc)
.await? .await?
.with_router(router_config)? .with_router(router_config)?
.build() .build()