enhance rpc logs (#73)

This commit is contained in:
Bo QIU 2024-05-29 10:16:28 +08:00 committed by GitHub
parent b17fd117fd
commit 0852b2355c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 6 deletions

View File

@ -3,7 +3,7 @@ use crate::mem_pool::FileID;
use anyhow::Result; use anyhow::Result;
use network::NetworkMessage; use network::NetworkMessage;
use shared_types::{ChunkArray, FileProof}; use shared_types::{ChunkArray, FileProof};
use std::sync::Arc; use std::{sync::Arc, time::SystemTime};
use storage_async::Store; use storage_async::Store;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@ -62,6 +62,7 @@ impl ChunkPoolHandler {
} }
} }
let start = SystemTime::now();
if !self if !self
.log_store .log_store
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash) .finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
@ -70,7 +71,8 @@ impl ChunkPoolHandler {
return Ok(false); return Ok(false);
} }
debug!(?id, "Transaction finalized"); let elapsed = start.elapsed()?;
debug!(?id, ?elapsed, "Transaction finalized");
// always remove file from pool after transaction finalized // always remove file from pool after transaction finalized
self.mem_pool.remove_file(&id.root).await; self.mem_pool.remove_file(&id.root).await;

View File

@ -6,6 +6,7 @@ use chunk_pool::{FileID, SegmentInfo};
use jsonrpsee::core::async_trait; use jsonrpsee::core::async_trait;
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, Transaction, CHUNK_SIZE}; use shared_types::{DataRoot, Transaction, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result};
use storage::try_option; use storage::try_option;
pub struct RpcServerImpl { pub struct RpcServerImpl {
@ -34,15 +35,22 @@ impl RpcServer for RpcServerImpl {
} }
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()> { async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
debug!(root = %segment.root, index = %segment.index, "zgs_uploadSegment"); info!(root = %segment.root, index = %segment.index, "zgs_uploadSegment");
self.put_segment(segment).await self.put_segment(segment).await
} }
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> { async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
debug!("zgs_uploadSegments()"); let root = match segments.first() {
None => return Ok(()),
Some(seg) => seg.root,
};
let indices = SegmentIndexArray::new(&segments);
info!(%root, ?indices, "zgs_uploadSegments");
for segment in segments.into_iter() { for segment in segments.into_iter() {
self.put_segment(segment).await?; self.put_segment(segment).await?;
} }
Ok(()) Ok(())
} }
@ -52,7 +60,7 @@ impl RpcServer for RpcServerImpl {
start_index: usize, start_index: usize,
end_index: usize, end_index: usize,
) -> RpcResult<Option<Segment>> { ) -> RpcResult<Option<Segment>> {
debug!(%data_root, %start_index, %end_index, "zgs_downloadSegment"); info!(%data_root, %start_index, %end_index, "zgs_downloadSegment");
if start_index >= end_index { if start_index >= end_index {
return Err(error::invalid_params("end_index", "invalid chunk index")); return Err(error::invalid_params("end_index", "invalid chunk index"));
@ -89,7 +97,7 @@ impl RpcServer for RpcServerImpl {
data_root: DataRoot, data_root: DataRoot,
index: usize, index: usize,
) -> RpcResult<Option<SegmentWithProof>> { ) -> RpcResult<Option<SegmentWithProof>> {
debug!(%data_root, %index, "zgs_downloadSegmentWithProof"); info!(%data_root, %index, "zgs_downloadSegmentWithProof");
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?); let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
@ -264,3 +272,61 @@ impl RpcServerImpl {
Ok(()) Ok(())
} }
} }
enum SegmentIndex {
Single(usize),
Range(usize, usize), // [start, end]
}
impl Debug for SegmentIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
match self {
Self::Single(val) => write!(f, "{}", val),
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
}
}
}
struct SegmentIndexArray {
items: Vec<SegmentIndex>,
}
impl Debug for SegmentIndexArray {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
match self.items.first() {
None => write!(f, "NULL"),
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
_ => write!(f, "{:?}", self.items),
}
}
}
impl SegmentIndexArray {
fn new(segments: &[SegmentWithProof]) -> Self {
let mut items = Vec::new();
let mut current = match segments.first() {
None => return SegmentIndexArray { items },
Some(seg) => SegmentIndex::Single(seg.index),
};
for index in segments.iter().skip(1).map(|seg| seg.index) {
match current {
SegmentIndex::Single(val) if val + 1 == index => {
current = SegmentIndex::Range(val, index)
}
SegmentIndex::Range(start, end) if end + 1 == index => {
current = SegmentIndex::Range(start, index)
}
_ => {
items.push(current);
current = SegmentIndex::Single(index);
}
}
}
items.push(current);
SegmentIndexArray { items }
}
}