From cfe05b6f00ad4c95a330cfa40dabab9131fcedbf Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:08:56 +0800 Subject: [PATCH] Always init chunk pool (#275) --- node/src/client/builder.rs | 43 +++++++++++++++++++++----------------- node/src/main.rs | 5 ++++- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index d05fa86..7d345c8 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -1,5 +1,5 @@ use super::{Client, RuntimeContext}; -use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool}; +use chunk_pool::{Config as ChunkPoolConfig, MemoryChunkPool}; use file_location_cache::FileLocationCache; use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager}; use miner::{MineService, MinerConfig, MinerMessage, ShardConfig}; @@ -54,7 +54,7 @@ struct PrunerComponents { } struct ChunkPoolComponents { - send: mpsc::UnboundedSender, + chunk_pool: Arc, } /// Builds a `Client` instance. @@ -230,7 +230,7 @@ impl ClientBuilder { let executor = require!("router", self, runtime_context).clone().executor; let sync_send = require!("router", self, sync).send.clone(); // note: we can make this optional in the future let miner_send = self.miner.as_ref().map(|x| x.send.clone()); - let chunk_pool_send = require!("router", self, chunk_pool).send.clone(); + let chunk_pool_send = require!("router", self, chunk_pool).chunk_pool.sender(); let store = require!("router", self, store).clone(); let file_location_cache = require!("router", self, file_location_cache).clone(); @@ -260,11 +260,7 @@ impl ClientBuilder { Ok(self) } - pub async fn with_rpc( - mut self, - rpc_config: RPCConfig, - chunk_pool_config: ChunkPoolConfig, - ) -> Result { + pub async fn with_rpc(self, rpc_config: RPCConfig) -> Result { if !rpc_config.enabled { return Ok(self); } @@ -273,16 +269,9 @@ impl ClientBuilder { let async_store = require!("rpc", self, async_store).clone(); let network_send = require!("rpc", self, network).send.clone(); let mine_send = self.miner.as_ref().map(|x| x.send.clone()); - let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe(); let file_location_cache = require!("rpc", self, file_location_cache).clone(); + let chunk_pool = require!("rpc", self, chunk_pool).chunk_pool.clone(); - let (chunk_pool, chunk_pool_handler) = - chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone()); - let chunk_pool_components = ChunkPoolComponents { - send: chunk_pool.sender(), - }; - - let chunk_pool_clone = chunk_pool.clone(); let ctx = rpc::Context { config: rpc_config, file_location_cache, @@ -295,7 +284,7 @@ impl ClientBuilder { mine_service_sender: mine_send, }; - let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx.clone()) + let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx) .await .map_err(|e| format!("Unable to start HTTP RPC server: {:?}", e))?; @@ -303,13 +292,29 @@ impl ClientBuilder { if let Some(admin_rpc_handle) = maybe_admin_rpc_handle { executor.spawn(admin_rpc_handle, "rpc_admin"); } + + Ok(self) + } + + pub async fn with_chunk_pool( + mut self, + chunk_pool_config: ChunkPoolConfig, + ) -> Result { + let executor = require!("rpc", self, runtime_context).clone().executor; + let async_store = require!("rpc", self, async_store).clone(); + let network_send = require!("rpc", self, network).send.clone(); + let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe(); + + let (chunk_pool, chunk_pool_handler) = + chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone()); + executor.spawn(chunk_pool_handler.run(), "chunk_pool_handler"); executor.spawn( - MemoryChunkPool::monitor_log_entry(chunk_pool_clone, synced_tx_recv), + MemoryChunkPool::monitor_log_entry(chunk_pool.clone(), synced_tx_recv), "chunk_pool_log_monitor", ); - self.chunk_pool = Some(chunk_pool_components); + self.chunk_pool = Some(ChunkPoolComponents { chunk_pool }); Ok(self) } diff --git a/node/src/main.rs b/node/src/main.rs index a73b26c..29db29e 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -14,6 +14,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result Result Result