Rpc enhancements (#15)

* add admin rpc to terminate file or chunks sync

* Disallow to change file sync goal when failed

* split admin rpc into separate endpoints

* Support to launch public and private rpc at one port

* use one rpc port for python test

* fix lint

* restart to sync file or chunks if sync already completed or failed
This commit is contained in:
Bo QIU 2024-01-30 16:50:35 +08:00 committed by GitHub
parent e7011b4657
commit c2b8b1cab3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 123 additions and 19 deletions

View File

@ -20,6 +20,10 @@ pub trait Rpc {
end_index: u64, // exclusive
) -> RpcResult<()>;
/// Terminate file or chunks sync for specified tx_seq.
#[method(name = "terminateSync")]
async fn terminate_sync(&self, tx_seq: u64) -> RpcResult<()>;
#[method(name = "getSyncStatus")]
async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String>;

View File

@ -77,6 +77,24 @@ impl RpcServer for RpcServerImpl {
}
}
#[tracing::instrument(skip(self), err)]
async fn terminate_sync(&self, tx_seq: u64) -> RpcResult<()> {
info!("admin_terminateSync({tx_seq})");
let response = self
.ctx
.request_sync(SyncRequest::TerminateFileSync {
tx_seq,
is_reverted: false,
})
.await?;
match response {
SyncResponse::TerminateFileSync { .. } => Ok(()),
_ => Err(error::internal_error("unexpected response type")),
}
}
#[tracing::instrument(skip(self), err)]
async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String> {
info!("admin_getSyncStatus({tx_seq})");

View File

@ -4,6 +4,7 @@ use std::net::SocketAddr;
pub struct Config {
pub enabled: bool,
pub listen_address: SocketAddr,
pub listen_address_admin: Option<SocketAddr>,
pub chunks_per_segment: usize,
pub max_cache_file_size: usize,
}

View File

@ -19,6 +19,7 @@ use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use network::NetworkGlobals;
use network::NetworkMessage;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use storage_async::Store;
use sync::{SyncRequest, SyncResponse, SyncSender};
@ -60,23 +61,66 @@ impl Context {
}
}
pub async fn run_server(ctx: Context) -> Result<HttpServerHandle, Box<dyn Error>> {
let server = HttpServerBuilder::default()
.build(ctx.config.listen_address)
.await?;
pub async fn run_server(
ctx: Context,
) -> Result<(HttpServerHandle, Option<HttpServerHandle>), Box<dyn Error>> {
let handles = match ctx.config.listen_address_admin {
Some(listen_addr_private) => run_server_public_private(ctx, listen_addr_private).await?,
None => (run_server_all(ctx).await?, None),
};
info!("Server started");
Ok(handles)
}
/// Run a single RPC server for all namespace RPCs.
async fn run_server_all(ctx: Context) -> Result<HttpServerHandle, Box<dyn Error>> {
// public rpc
let mut zgs = (zgs::RpcServerImpl { ctx: ctx.clone() }).into_rpc();
// admin rpc
let admin = (admin::RpcServerImpl { ctx: ctx.clone() }).into_rpc();
zgs.merge(admin)?;
// mine rpc if configured
if ctx.mine_service_sender.is_some() {
let mine = (miner::RpcServerImpl { ctx }).into_rpc();
let mine = (miner::RpcServerImpl { ctx: ctx.clone() }).into_rpc();
zgs.merge(mine)?;
}
let addr = server.local_addr()?;
let handle = server.start(zgs)?;
info!("Server started http://{}", addr);
Ok(handle)
Ok(HttpServerBuilder::default()
.build(ctx.config.listen_address)
.await?
.start(zgs)?)
}
/// Run 2 RPC servers (public & private) for different namespace RPCs.
async fn run_server_public_private(
ctx: Context,
listen_addr_private: SocketAddr,
) -> Result<(HttpServerHandle, Option<HttpServerHandle>), Box<dyn Error>> {
// public rpc
let zgs = (zgs::RpcServerImpl { ctx: ctx.clone() }).into_rpc();
// admin rpc
let mut admin = (admin::RpcServerImpl { ctx: ctx.clone() }).into_rpc();
// mine rpc if configured
if ctx.mine_service_sender.is_some() {
let mine = (miner::RpcServerImpl { ctx: ctx.clone() }).into_rpc();
admin.merge(mine)?;
}
let handle_public = HttpServerBuilder::default()
.build(ctx.config.listen_address)
.await?
.start(zgs)?;
let handle_private = HttpServerBuilder::default()
.build(listen_addr_private)
.await?
.start(admin)?;
Ok((handle_public, Some(handle_private)))
}

View File

@ -236,11 +236,14 @@ impl ClientBuilder {
mine_service_sender: mine_send,
};
let rpc_handle = rpc::run_server(ctx)
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx.clone())
.await
.map_err(|e| format!("Unable to start HTTP RPC server: {:?}", e))?;
executor.spawn(rpc_handle, "rpc");
if let Some(admin_rpc_handle) = maybe_admin_rpc_handle {
executor.spawn(admin_rpc_handle, "rpc_admin");
}
executor.spawn(chunk_pool_handler.run(), "chunk_pool_handler");
executor.spawn(
MemoryChunkPool::monitor_log_entry(chunk_pool_clone, synced_tx_recv),

View File

@ -63,9 +63,20 @@ impl ZgsConfig {
.parse::<std::net::SocketAddr>()
.map_err(|e| format!("Unable to parse rpc_listen_address: {:?}", e))?;
let listen_address_admin = if self.rpc_listen_address_admin.is_empty() {
None
} else {
Some(
self.rpc_listen_address_admin
.parse::<std::net::SocketAddr>()
.map_err(|e| format!("Unable to parse rpc_listen_address_admin: {:?}", e))?,
)
};
Ok(RPCConfig {
enabled: self.rpc_enabled,
listen_address,
listen_address_admin,
chunks_per_segment: self.rpc_chunks_per_segment,
max_cache_file_size: self.rpc_max_cache_file_size,
})

View File

@ -37,6 +37,7 @@ build_config! {
// rpc
(rpc_enabled, (bool), true)
(rpc_listen_address, (String), "127.0.0.1:5678".to_string())
(rpc_listen_address_admin, (String), "127.0.0.1:5679".to_string())
(rpc_chunks_per_segment, (usize), 1024)
(rpc_max_cache_file_size, (usize), 10*1024*1024) //10MB

View File

@ -126,9 +126,29 @@ impl SerialSyncController {
&self.state
}
pub fn is_completed_or_failed(&self) -> bool {
matches!(self.state, SyncState::Completed | SyncState::Failed { .. })
}
/// Resets the status to re-sync file when failed.
pub fn reset(&mut self) {
self.next_chunk = self.goal.index_start;
pub fn reset(&mut self, maybe_range: Option<(u64, u64)>) {
if let Some((start, end)) = maybe_range {
// Sync new chunks regardless of previously downloaded file or chunks.
// It's up to client to avoid duplicated chunks sync.
self.goal = FileSyncGoal::new(self.goal.num_chunks, start, end);
self.next_chunk = start;
} else if self.goal.is_all_chunks() {
// retry the failed file sync at break point
debug!(
"Continue to sync failed file, tx_seq = {}, next_chunk = {}",
self.tx_seq, self.next_chunk
);
} else {
// Ignore the failed chunks sync, and change to file sync.
self.goal = FileSyncGoal::new_file(self.goal.num_chunks);
self.next_chunk = 0;
}
self.failures = 0;
self.state = SyncState::Idle;
// remove disconnected peers
@ -606,7 +626,7 @@ mod tests {
controller.state = SyncState::Completed;
assert_eq!(*controller.get_status(), SyncState::Completed);
controller.reset();
controller.reset(None);
assert_eq!(*controller.get_status(), SyncState::Idle);
}

View File

@ -539,7 +539,7 @@ impl SyncService {
Entry::Vacant(entry) => {
let tx = match self.store.get_tx_by_seq_number(tx_seq).await? {
Some(tx) => tx,
None => bail!("transaction not found"),
None => bail!("Transaction not found"),
};
let num_chunks = match usize::try_from(tx.size) {
@ -561,7 +561,7 @@ impl SyncService {
};
if index_start >= index_end || index_end > num_chunks {
bail!("invalid chunk range");
bail!("Invalid chunk range");
}
entry.insert(SerialSyncController::new(
@ -574,9 +574,9 @@ impl SyncService {
}
};
// trigger retry after failure
if let SyncState::Failed { .. } = controller.get_status() {
controller.reset();
// Trigger file or chunks sync again if completed or failed.
if controller.is_completed_or_failed() {
controller.reset(maybe_range);
}
if let Some((peer_id, addr)) = maybe_peer {

View File

@ -15,6 +15,7 @@ db_dir = "db"
rpc_enabled = true
rpc_listen_address = "0.0.0.0:5678"
# rpc_listen_address_admin = "127.0.0.1:5679"
log_config_file = "log_config"

View File

@ -39,6 +39,7 @@ class ZgsNode(TestNode):
"network_libp2p_port": p2p_port(index),
"network_discovery_port": p2p_port(index),
"rpc_listen_address": f"127.0.0.1:{rpc_port(index)}",
"rpc_listen_address_admin": "",
"network_libp2p_nodes": libp2p_nodes,
"log_contract_address": log_contract_address,
"mine_contract_address": mine_contract_address,