Compare commits

..

1 Commits

Author SHA1 Message Date
0g-peterzhb
2705167716
Merge b352184dc6 into c4845f9103 2024-11-15 18:43:35 +08:00
2 changed files with 20 additions and 28 deletions

View File

@ -1,5 +1,5 @@
use super::{Client, RuntimeContext};
use chunk_pool::{Config as ChunkPoolConfig, MemoryChunkPool};
use chunk_pool::{ChunkPoolMessage, Config as ChunkPoolConfig, MemoryChunkPool};
use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
@ -54,7 +54,7 @@ struct PrunerComponents {
}
struct ChunkPoolComponents {
chunk_pool: Arc<MemoryChunkPool>,
send: mpsc::UnboundedSender<ChunkPoolMessage>,
}
/// Builds a `Client` instance.
@ -230,7 +230,7 @@ impl ClientBuilder {
let executor = require!("router", self, runtime_context).clone().executor;
let sync_send = require!("router", self, sync).send.clone(); // note: we can make this optional in the future
let miner_send = self.miner.as_ref().map(|x| x.send.clone());
let chunk_pool_send = require!("router", self, chunk_pool).chunk_pool.sender();
let chunk_pool_send = require!("router", self, chunk_pool).send.clone();
let store = require!("router", self, store).clone();
let file_location_cache = require!("router", self, file_location_cache).clone();
@ -260,7 +260,11 @@ impl ClientBuilder {
Ok(self)
}
pub async fn with_rpc(self, rpc_config: RPCConfig) -> Result<Self, String> {
pub async fn with_rpc(
mut self,
rpc_config: RPCConfig,
chunk_pool_config: ChunkPoolConfig,
) -> Result<Self, String> {
if !rpc_config.enabled {
return Ok(self);
}
@ -269,9 +273,16 @@ impl ClientBuilder {
let async_store = require!("rpc", self, async_store).clone();
let network_send = require!("rpc", self, network).send.clone();
let mine_send = self.miner.as_ref().map(|x| x.send.clone());
let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe();
let file_location_cache = require!("rpc", self, file_location_cache).clone();
let chunk_pool = require!("rpc", self, chunk_pool).chunk_pool.clone();
let (chunk_pool, chunk_pool_handler) =
chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone());
let chunk_pool_components = ChunkPoolComponents {
send: chunk_pool.sender(),
};
let chunk_pool_clone = chunk_pool.clone();
let ctx = rpc::Context {
config: rpc_config,
file_location_cache,
@ -284,7 +295,7 @@ impl ClientBuilder {
mine_service_sender: mine_send,
};
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx)
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx.clone())
.await
.map_err(|e| format!("Unable to start HTTP RPC server: {:?}", e))?;
@ -292,29 +303,13 @@ impl ClientBuilder {
if let Some(admin_rpc_handle) = maybe_admin_rpc_handle {
executor.spawn(admin_rpc_handle, "rpc_admin");
}
Ok(self)
}
pub async fn with_chunk_pool(
mut self,
chunk_pool_config: ChunkPoolConfig,
) -> Result<Self, String> {
let executor = require!("rpc", self, runtime_context).clone().executor;
let async_store = require!("rpc", self, async_store).clone();
let network_send = require!("rpc", self, network).send.clone();
let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe();
let (chunk_pool, chunk_pool_handler) =
chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone());
executor.spawn(chunk_pool_handler.run(), "chunk_pool_handler");
executor.spawn(
MemoryChunkPool::monitor_log_entry(chunk_pool.clone(), synced_tx_recv),
MemoryChunkPool::monitor_log_entry(chunk_pool_clone, synced_tx_recv),
"chunk_pool_log_monitor",
);
self.chunk_pool = Some(ChunkPoolComponents { chunk_pool });
self.chunk_pool = Some(chunk_pool_components);
Ok(self)
}

View File

@ -14,7 +14,6 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
let network_config = config.network_config().await?;
let storage_config = config.storage_config()?;
let log_sync_config = config.log_sync_config()?;
let chunk_pool_config = config.chunk_pool_config()?;
let miner_config = config.mine_config()?;
let router_config = config.router_config(&network_config)?;
let pruner_config = config.pruner_config()?;
@ -28,8 +27,6 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.with_file_location_cache(config.file_location_cache)
.with_network(&network_config)
.await?
.with_chunk_pool(chunk_pool_config)
.await?
.with_sync(config.sync)
.await?
.with_miner(miner_config)
@ -38,7 +35,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
.await?
.with_pruner(pruner_config)
.await?
.with_rpc(config.rpc)
.with_rpc(config.rpc, config.chunk_pool_config()?)
.await?
.with_router(router_config)?
.build()