0g-storage-node/node/rpc/src/zgs_grpc/zgs_grpc.rs

63 lines
2.1 KiB
Rust
Raw Normal View History

2025-07-01 13:22:54 +00:00
use crate::types::SegmentWithProof as RpcSegment;
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcService;
2025-07-01 13:22:54 +00:00
use crate::zgs_grpc_proto::{Empty, PingReply, PingRequest, UploadSegmentsByTxSeqRequest};
2025-07-01 13:20:09 +00:00
use crate::{rpc_helper, Context, SegmentIndexArray};
pub struct ZgsGrpcServiceImpl {
pub ctx: Context,
}
#[tonic::async_trait]
impl ZgsGrpcService for ZgsGrpcServiceImpl {
async fn ping(
&self,
request: tonic::Request<PingRequest>,
) -> Result<tonic::Response<PingReply>, tonic::Status> {
let msg = request.into_inner().message;
2025-07-01 13:22:54 +00:00
let reply = PingReply {
message: format!("Echo: {}", msg),
};
Ok(tonic::Response::new(reply))
}
2025-07-01 13:20:09 +00:00
async fn upload_segments_by_tx_seq(
&self,
request: tonic::Request<UploadSegmentsByTxSeqRequest>,
) -> Result<tonic::Response<Empty>, tonic::Status> {
let req = request.into_inner();
let segments = req.segments;
let tx_seq = req.tx_seq;
2025-07-01 13:22:54 +00:00
let rpc_segments = segments
2025-07-01 13:20:09 +00:00
.into_iter()
2025-07-01 13:22:54 +00:00
.map(|s| {
RpcSegment::try_from(s)
.map_err(|e| tonic::Status::invalid_argument(format!("Invalid segment: {}", e)))
})
.collect::<Result<Vec<_>, _>>()?;
2025-07-01 13:20:09 +00:00
let indices = SegmentIndexArray::new(&rpc_segments);
info!(%tx_seq, ?indices, "grpc_zgs_uploadSegmentsByTxSeq");
2025-07-01 13:22:54 +00:00
let maybe_tx = self
.ctx
.log_store
.get_tx_by_seq_number(tx_seq)
.await
.map_err(|e| {
tonic::Status::internal(format!(
"Failed to get transaction by sequence number: {}",
e
))
})?;
2025-07-01 13:20:09 +00:00
for segment in rpc_segments.into_iter() {
rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx.clone())
2025-07-01 13:22:54 +00:00
.await
.map_err(|e| tonic::Status::internal(format!("Failed to put segment: {}", e)))?;
2025-07-01 13:20:09 +00:00
}
// Return an empty response
Ok(tonic::Response::new(Empty {}))
}
}