mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-11-03 08:07:27 +00:00
add upload segments grpc function
This commit is contained in:
parent
cb18ffd6c3
commit
4102755a14
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -6992,6 +6992,7 @@ dependencies = [
|
||||
"append_merkle",
|
||||
"base64 0.13.1",
|
||||
"chunk_pool",
|
||||
"ethereum-types 0.14.1",
|
||||
"file_location_cache",
|
||||
"futures",
|
||||
"futures-channel",
|
||||
|
||||
@ -33,6 +33,7 @@ tonic = { version = "0.9.2", features = ["transport"] }
|
||||
prost = "0.11.9"
|
||||
prost-types = "0.11.9"
|
||||
tonic-reflection = "0.9.2"
|
||||
ethereum-types = "0.14"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.9.2"
|
||||
|
||||
@ -2,9 +2,35 @@ syntax = "proto3";
|
||||
|
||||
package zgs_grpc;
|
||||
|
||||
// A trivial ping service
|
||||
service ZgsGrpcService {
|
||||
rpc Ping (PingRequest) returns (PingReply);
|
||||
option go_package = "github.com/0glabs/0g-storage-client/node/proto;zgs_grpc";
|
||||
|
||||
message Empty {}
|
||||
|
||||
/// 32-byte hash root
|
||||
message DataRoot {
|
||||
bytes value = 1;
|
||||
}
|
||||
|
||||
/// A proof over a file-segment Merkle tree
|
||||
message FileProof {
|
||||
/// sequence of 32-byte hashes
|
||||
repeated bytes lemma = 1;
|
||||
/// bit-paths (left=false, right=true) alongside the lemmas
|
||||
repeated bool path = 2;
|
||||
}
|
||||
|
||||
/// A file segment plus its Merkle proof
|
||||
message SegmentWithProof {
|
||||
DataRoot root = 1; // file Merkle root
|
||||
bytes data = 2; // raw segment bytes
|
||||
uint64 index = 3; // segment index
|
||||
FileProof proof = 4; // Merkle proof of this leaf
|
||||
uint64 file_size = 5; // total file length
|
||||
}
|
||||
|
||||
message UploadSegmentsByTxSeqRequest {
|
||||
repeated SegmentWithProof segments = 1;
|
||||
uint64 tx_seq = 2;
|
||||
}
|
||||
|
||||
message PingRequest {
|
||||
@ -14,3 +40,9 @@ message PingRequest {
|
||||
message PingReply {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
// A trivial ping service
|
||||
service ZgsGrpcService {
|
||||
rpc Ping (PingRequest) returns (PingReply);
|
||||
rpc UploadSegmentsByTxSeq(UploadSegmentsByTxSeqRequest) returns (Empty);
|
||||
}
|
||||
|
||||
@ -10,8 +10,11 @@ mod middleware;
|
||||
mod miner;
|
||||
pub mod types;
|
||||
mod zgs;
|
||||
mod zgs_grpc;
|
||||
mod rpc_helper;
|
||||
|
||||
use crate::miner::RpcServer as MinerRpcServer;
|
||||
use crate::types::SegmentWithProof;
|
||||
use crate::zgs_grpc::zgs_grpc::ZgsGrpcServiceImpl;
|
||||
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcServiceServer;
|
||||
use admin::RpcServer as AdminRpcServer;
|
||||
@ -22,6 +25,7 @@ use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
|
||||
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
|
||||
use std::error::Error;
|
||||
use std::fmt::{Debug, Formatter, Result as FmtResult};
|
||||
use std::sync::Arc;
|
||||
use storage_async::Store;
|
||||
use sync::{SyncRequest, SyncResponse, SyncSender};
|
||||
@ -29,23 +33,18 @@ use task_executor::ShutdownReason;
|
||||
use tokio::sync::broadcast;
|
||||
use zgs::RpcServer as ZgsRpcServer;
|
||||
use zgs_miner::MinerMessage;
|
||||
use tonic_reflection::server::Builder as ReflectionBuilder;
|
||||
use tonic::transport::Server;
|
||||
|
||||
pub use admin::RpcClient as ZgsAdminRpcClient;
|
||||
pub use config::Config as RPCConfig;
|
||||
pub use miner::RpcClient as ZgsMinerRpcClient;
|
||||
pub use zgs::RpcClient as ZgsRPCClient;
|
||||
// bring in the reflection-builder
|
||||
use tonic_reflection::server::Builder as ReflectionBuilder;
|
||||
|
||||
|
||||
pub mod zgs_grpc_proto {
|
||||
tonic::include_proto!("zgs_grpc");
|
||||
}
|
||||
|
||||
mod zgs_grpc;
|
||||
|
||||
use tonic::transport::Server;
|
||||
|
||||
const DESCRIPTOR_SET: &[u8] = include_bytes!("../proto/zgs_grpc_descriptor.bin");
|
||||
|
||||
/// A wrapper around all the items required to spawn the HTTP server.
|
||||
@ -164,3 +163,60 @@ pub async fn run_grpc_server(ctx: Context) -> Result<(), Box<dyn Error>> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
enum SegmentIndex {
|
||||
Single(usize),
|
||||
Range(usize, usize), // [start, end]
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndex {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
|
||||
match self {
|
||||
Self::Single(val) => write!(f, "{}", val),
|
||||
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SegmentIndexArray {
|
||||
items: Vec<SegmentIndex>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndexArray {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
|
||||
match self.items.first() {
|
||||
None => write!(f, "NULL"),
|
||||
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
|
||||
_ => write!(f, "{:?}", self.items),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentIndexArray {
|
||||
fn new(segments: &[SegmentWithProof]) -> Self {
|
||||
let mut items = Vec::new();
|
||||
|
||||
let mut current = match segments.first() {
|
||||
None => return SegmentIndexArray { items },
|
||||
Some(seg) => SegmentIndex::Single(seg.index),
|
||||
};
|
||||
|
||||
for index in segments.iter().skip(1).map(|seg| seg.index) {
|
||||
match current {
|
||||
SegmentIndex::Single(val) if val + 1 == index => {
|
||||
current = SegmentIndex::Range(val, index)
|
||||
}
|
||||
SegmentIndex::Range(start, end) if end + 1 == index => {
|
||||
current = SegmentIndex::Range(start, index)
|
||||
}
|
||||
_ => {
|
||||
items.push(current);
|
||||
current = SegmentIndex::Single(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
items.push(current);
|
||||
|
||||
SegmentIndexArray { items }
|
||||
}
|
||||
}
|
||||
|
||||
102
node/rpc/src/rpc_helper.rs
Normal file
102
node/rpc/src/rpc_helper.rs
Normal file
@ -0,0 +1,102 @@
|
||||
use crate::Context;
|
||||
use crate::types::SegmentWithProof;
|
||||
use crate::error;
|
||||
use chunk_pool::SegmentInfo;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use shared_types::Transaction;
|
||||
|
||||
/// Put a single segment (mirrors your old `put_segment`)
|
||||
pub async fn put_segment(
|
||||
ctx: &Context,
|
||||
segment: SegmentWithProof,
|
||||
) -> RpcResult<()> {
|
||||
debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||
|
||||
// fetch optional tx
|
||||
let maybe_tx = ctx
|
||||
.log_store
|
||||
.get_tx_by_data_root(&segment.root, false)
|
||||
.await?;
|
||||
|
||||
put_segment_with_maybe_tx(ctx, segment, maybe_tx).await
|
||||
}
|
||||
|
||||
/// Put a segment, given an optional Transaction (mirrors `put_segment_with_maybe_tx`)
|
||||
pub async fn put_segment_with_maybe_tx(
|
||||
ctx: &Context,
|
||||
segment: SegmentWithProof,
|
||||
maybe_tx: Option<Transaction>,
|
||||
) -> RpcResult<()> {
|
||||
ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
|
||||
if let Some(tx) = &maybe_tx {
|
||||
if tx.data_merkle_root != segment.root {
|
||||
return Err(error::internal_error("data root and tx seq not match"));
|
||||
}
|
||||
}
|
||||
|
||||
// decide cache vs write
|
||||
let need_cache = if ctx.chunk_pool.check_already_has_cache(&segment.root).await {
|
||||
true
|
||||
} else {
|
||||
check_need_cache(ctx, &maybe_tx, segment.file_size).await?
|
||||
};
|
||||
|
||||
segment.validate(ctx.config.chunks_per_segment)?;
|
||||
|
||||
let seg_info = SegmentInfo {
|
||||
root: segment.root,
|
||||
seg_data: segment.data,
|
||||
seg_proof: segment.proof,
|
||||
seg_index: segment.index,
|
||||
chunks_per_segment: ctx.config.chunks_per_segment,
|
||||
};
|
||||
|
||||
if need_cache {
|
||||
ctx.chunk_pool.cache_chunks(seg_info).await?;
|
||||
} else {
|
||||
let file_id = chunk_pool::FileID {
|
||||
root: seg_info.root,
|
||||
tx_id: maybe_tx.unwrap().id(),
|
||||
};
|
||||
ctx.chunk_pool
|
||||
.write_chunks(seg_info, file_id, segment.file_size)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The old `check_need_cache`
|
||||
pub async fn check_need_cache(
|
||||
ctx: &Context,
|
||||
maybe_tx: &Option<Transaction>,
|
||||
file_size: usize,
|
||||
) -> RpcResult<bool> {
|
||||
if let Some(tx) = maybe_tx {
|
||||
if tx.size != file_size as u64 {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"segment file size not matched with tx file size",
|
||||
));
|
||||
}
|
||||
if ctx.log_store.check_tx_completed(tx.seq).await? {
|
||||
return Err(error::invalid_params(
|
||||
"root",
|
||||
"already uploaded and finalized",
|
||||
));
|
||||
}
|
||||
if ctx.log_store.check_tx_pruned(tx.seq).await? {
|
||||
return Err(error::invalid_params("root", "already pruned"));
|
||||
}
|
||||
Ok(false)
|
||||
} else {
|
||||
if file_size > ctx.config.max_cache_file_size {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"caching of large file when tx is unavailable is not supported",
|
||||
));
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
use crate::error;
|
||||
use crate::{error, zgs_grpc_proto};
|
||||
use append_merkle::ZERO_HASHES;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use merkle_light::hash::Algorithm;
|
||||
@ -11,12 +11,15 @@ use shared_types::{
|
||||
Transaction, CHUNK_SIZE,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
use std::convert::TryFrom;
|
||||
use std::hash::Hasher;
|
||||
use std::net::IpAddr;
|
||||
use std::time::Instant;
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::log_manager::bytes_to_entries;
|
||||
use storage::H256;
|
||||
use tonic::Status as GrpcStatus;
|
||||
use ethereum_types::H256 as EthH256;
|
||||
|
||||
const ZERO_HASH: [u8; 32] = [
|
||||
0xd3, 0x97, 0xb3, 0xb0, 0x43, 0xd8, 0x7f, 0xcd, 0x6f, 0xad, 0x12, 0x91, 0xff, 0xb, 0xfd, 0x16,
|
||||
@ -76,6 +79,79 @@ pub struct SegmentWithProof {
|
||||
pub file_size: usize,
|
||||
}
|
||||
|
||||
/// Convert the proto DataRoot → your app’s DataRoot
|
||||
impl TryFrom<zgs_grpc_proto::DataRoot> for DataRoot {
|
||||
type Error = GrpcStatus;
|
||||
|
||||
fn try_from(value: zgs_grpc_proto::DataRoot) -> Result<Self, GrpcStatus> {
|
||||
let bytes = value.value;
|
||||
if bytes.len() != 32 {
|
||||
return Err(GrpcStatus::invalid_argument(format!("Invalid hash length: got {}, want 32", bytes.len())));
|
||||
}
|
||||
// assume AppDataRoot is a newtype around H256:
|
||||
let mut arr = [0u8; 32];
|
||||
arr.copy_from_slice(&bytes);
|
||||
Ok(EthH256(arr))
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert proto FileProof → your app’s FileProof
|
||||
impl TryFrom<zgs_grpc_proto::FileProof> for FileProof {
|
||||
type Error = GrpcStatus;
|
||||
|
||||
fn try_from(value: zgs_grpc_proto::FileProof) -> Result<Self, GrpcStatus> {
|
||||
// turn each `bytes` into an H256
|
||||
let mut lemma = Vec::with_capacity(value.lemma.len());
|
||||
for bin in value.lemma {
|
||||
if bin.len() != 32 {
|
||||
return Err(GrpcStatus::invalid_argument(format!("Invalid hash length: got {}, want 32", bin.len())));
|
||||
}
|
||||
let mut arr = [0u8; 32];
|
||||
arr.copy_from_slice(&bin);
|
||||
lemma.push(H256(arr));
|
||||
}
|
||||
|
||||
Ok(FileProof {
|
||||
lemma,
|
||||
path: value.path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert the full SegmentWithProof
|
||||
impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof {
|
||||
type Error = GrpcStatus;
|
||||
|
||||
fn try_from(grpc_segment: zgs_grpc_proto::SegmentWithProof) -> Result<Self, GrpcStatus> {
|
||||
let root = grpc_segment
|
||||
.root
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.map_err(|e| e)?;
|
||||
let data = grpc_segment.data;
|
||||
// index is u64 in proto, usize in app
|
||||
let index = grpc_segment.index
|
||||
.try_into()
|
||||
.map_err(|_| GrpcStatus::invalid_argument(format!("Invalid segment index: {}", grpc_segment.index)))?;
|
||||
let proof = grpc_segment
|
||||
.proof
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.map_err(|e| e)?;
|
||||
let file_size = grpc_segment.file_size
|
||||
.try_into()
|
||||
.map_err(|_| GrpcStatus::invalid_argument(format!("Invalid file size: {}", grpc_segment.file_size)))?;
|
||||
|
||||
Ok(SegmentWithProof {
|
||||
root,
|
||||
data,
|
||||
index,
|
||||
proof,
|
||||
file_size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentWithProof {
|
||||
/// Splits file into segments and returns the total number of segments and the last segment size.
|
||||
pub fn split_file_into_segments(
|
||||
|
||||
@ -1,12 +1,10 @@
|
||||
use super::api::RpcServer;
|
||||
use crate::error;
|
||||
use crate::{error, rpc_helper, SegmentIndexArray};
|
||||
use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
|
||||
use crate::Context;
|
||||
use chunk_pool::{FileID, SegmentInfo};
|
||||
use jsonrpsee::core::async_trait;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||
use std::fmt::{Debug, Formatter, Result};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::tx_store::TxStatus;
|
||||
use storage::{try_option, H256};
|
||||
@ -39,7 +37,7 @@ impl RpcServer for RpcServerImpl {
|
||||
|
||||
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
info!(root = %segment.root, index = %segment.index, "zgs_uploadSegment");
|
||||
self.put_segment(segment).await
|
||||
rpc_helper::put_segment(&self.ctx, segment).await
|
||||
}
|
||||
|
||||
async fn upload_segment_by_tx_seq(
|
||||
@ -49,7 +47,7 @@ impl RpcServer for RpcServerImpl {
|
||||
) -> RpcResult<()> {
|
||||
info!(tx_seq = %tx_seq, index = %segment.index, "zgs_uploadSegmentByTxSeq");
|
||||
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx).await
|
||||
}
|
||||
|
||||
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
|
||||
@ -61,7 +59,7 @@ impl RpcServer for RpcServerImpl {
|
||||
info!(%root, ?indices, "zgs_uploadSegments");
|
||||
|
||||
for segment in segments.into_iter() {
|
||||
self.put_segment(segment).await?;
|
||||
rpc_helper::put_segment(&self.ctx, segment).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -77,8 +75,7 @@ impl RpcServer for RpcServerImpl {
|
||||
|
||||
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||
for segment in segments.into_iter() {
|
||||
match self
|
||||
.put_segment_with_maybe_tx(segment, maybe_tx.clone())
|
||||
match rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx.clone())
|
||||
.await
|
||||
{
|
||||
Ok(()) => {} // success
|
||||
@ -235,44 +232,44 @@ impl RpcServer for RpcServerImpl {
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
async fn check_need_cache(
|
||||
&self,
|
||||
maybe_tx: &Option<Transaction>,
|
||||
file_size: usize,
|
||||
) -> RpcResult<bool> {
|
||||
if let Some(tx) = maybe_tx {
|
||||
if tx.size != file_size as u64 {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"segment file size not matched with tx file size",
|
||||
));
|
||||
}
|
||||
// async fn check_need_cache(
|
||||
// &self,
|
||||
// maybe_tx: &Option<Transaction>,
|
||||
// file_size: usize,
|
||||
// ) -> RpcResult<bool> {
|
||||
// if let Some(tx) = maybe_tx {
|
||||
// if tx.size != file_size as u64 {
|
||||
// return Err(error::invalid_params(
|
||||
// "file_size",
|
||||
// "segment file size not matched with tx file size",
|
||||
// ));
|
||||
// }
|
||||
|
||||
// Transaction already finalized for the specified file data root.
|
||||
if self.ctx.log_store.check_tx_completed(tx.seq).await? {
|
||||
return Err(error::invalid_params(
|
||||
"root",
|
||||
"already uploaded and finalized",
|
||||
));
|
||||
}
|
||||
// // Transaction already finalized for the specified file data root.
|
||||
// if self.ctx.log_store.check_tx_completed(tx.seq).await? {
|
||||
// return Err(error::invalid_params(
|
||||
// "root",
|
||||
// "already uploaded and finalized",
|
||||
// ));
|
||||
// }
|
||||
|
||||
if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
|
||||
return Err(error::invalid_params("root", "already pruned"));
|
||||
}
|
||||
// if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
|
||||
// return Err(error::invalid_params("root", "already pruned"));
|
||||
// }
|
||||
|
||||
Ok(false)
|
||||
} else {
|
||||
//Check whether file is small enough to cache in the system
|
||||
if file_size > self.ctx.config.max_cache_file_size {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"caching of large file when tx is unavailable is not supported",
|
||||
));
|
||||
}
|
||||
// Ok(false)
|
||||
// } else {
|
||||
// //Check whether file is small enough to cache in the system
|
||||
// if file_size > self.ctx.config.max_cache_file_size {
|
||||
// return Err(error::invalid_params(
|
||||
// "file_size",
|
||||
// "caching of large file when tx is unavailable is not supported",
|
||||
// ));
|
||||
// }
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
// Ok(true)
|
||||
// }
|
||||
// }
|
||||
|
||||
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
||||
let (finalized, pruned) = match self.ctx.log_store.get_store().get_tx_status(tx.seq)? {
|
||||
@ -312,69 +309,69 @@ impl RpcServerImpl {
|
||||
})
|
||||
}
|
||||
|
||||
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||
// async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
// debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||
|
||||
let maybe_tx = self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_tx_by_data_root(&segment.root, false)
|
||||
.await?;
|
||||
// let maybe_tx = self
|
||||
// .ctx
|
||||
// .log_store
|
||||
// .get_tx_by_data_root(&segment.root, false)
|
||||
// .await?;
|
||||
|
||||
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
}
|
||||
// self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
// }
|
||||
|
||||
async fn put_segment_with_maybe_tx(
|
||||
&self,
|
||||
segment: SegmentWithProof,
|
||||
maybe_tx: Option<Transaction>,
|
||||
) -> RpcResult<()> {
|
||||
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
// async fn put_segment_with_maybe_tx(
|
||||
// &self,
|
||||
// segment: SegmentWithProof,
|
||||
// maybe_tx: Option<Transaction>,
|
||||
// ) -> RpcResult<()> {
|
||||
// self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
|
||||
if let Some(tx) = &maybe_tx {
|
||||
if tx.data_merkle_root != segment.root {
|
||||
return Err(error::internal_error("data root and tx seq not match"));
|
||||
}
|
||||
}
|
||||
// if let Some(tx) = &maybe_tx {
|
||||
// if tx.data_merkle_root != segment.root {
|
||||
// return Err(error::internal_error("data root and tx seq not match"));
|
||||
// }
|
||||
// }
|
||||
|
||||
let mut need_cache = false;
|
||||
if self
|
||||
.ctx
|
||||
.chunk_pool
|
||||
.check_already_has_cache(&segment.root)
|
||||
.await
|
||||
{
|
||||
need_cache = true;
|
||||
}
|
||||
// let mut need_cache = false;
|
||||
// if self
|
||||
// .ctx
|
||||
// .chunk_pool
|
||||
// .check_already_has_cache(&segment.root)
|
||||
// .await
|
||||
// {
|
||||
// need_cache = true;
|
||||
// }
|
||||
|
||||
if !need_cache {
|
||||
need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
|
||||
}
|
||||
// if !need_cache {
|
||||
// need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
|
||||
// }
|
||||
|
||||
segment.validate(self.ctx.config.chunks_per_segment)?;
|
||||
// segment.validate(self.ctx.config.chunks_per_segment)?;
|
||||
|
||||
let seg_info = SegmentInfo {
|
||||
root: segment.root,
|
||||
seg_data: segment.data,
|
||||
seg_proof: segment.proof,
|
||||
seg_index: segment.index,
|
||||
chunks_per_segment: self.ctx.config.chunks_per_segment,
|
||||
};
|
||||
// let seg_info = SegmentInfo {
|
||||
// root: segment.root,
|
||||
// seg_data: segment.data,
|
||||
// seg_proof: segment.proof,
|
||||
// seg_index: segment.index,
|
||||
// chunks_per_segment: self.ctx.config.chunks_per_segment,
|
||||
// };
|
||||
|
||||
if need_cache {
|
||||
self.ctx.chunk_pool.cache_chunks(seg_info).await?;
|
||||
} else {
|
||||
let file_id = FileID {
|
||||
root: seg_info.root,
|
||||
tx_id: maybe_tx.unwrap().id(),
|
||||
};
|
||||
self.ctx
|
||||
.chunk_pool
|
||||
.write_chunks(seg_info, file_id, segment.file_size)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// if need_cache {
|
||||
// self.ctx.chunk_pool.cache_chunks(seg_info).await?;
|
||||
// } else {
|
||||
// let file_id = FileID {
|
||||
// root: seg_info.root,
|
||||
// tx_id: maybe_tx.unwrap().id(),
|
||||
// };
|
||||
// self.ctx
|
||||
// .chunk_pool
|
||||
// .write_chunks(seg_info, file_id, segment.file_size)
|
||||
// .await?;
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
async fn get_segment_by_tx_seq(
|
||||
&self,
|
||||
@ -447,61 +444,3 @@ impl RpcServerImpl {
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
enum SegmentIndex {
|
||||
Single(usize),
|
||||
Range(usize, usize), // [start, end]
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndex {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
|
||||
match self {
|
||||
Self::Single(val) => write!(f, "{}", val),
|
||||
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SegmentIndexArray {
|
||||
items: Vec<SegmentIndex>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndexArray {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
|
||||
match self.items.first() {
|
||||
None => write!(f, "NULL"),
|
||||
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
|
||||
_ => write!(f, "{:?}", self.items),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentIndexArray {
|
||||
fn new(segments: &[SegmentWithProof]) -> Self {
|
||||
let mut items = Vec::new();
|
||||
|
||||
let mut current = match segments.first() {
|
||||
None => return SegmentIndexArray { items },
|
||||
Some(seg) => SegmentIndex::Single(seg.index),
|
||||
};
|
||||
|
||||
for index in segments.iter().skip(1).map(|seg| seg.index) {
|
||||
match current {
|
||||
SegmentIndex::Single(val) if val + 1 == index => {
|
||||
current = SegmentIndex::Range(val, index)
|
||||
}
|
||||
SegmentIndex::Range(start, end) if end + 1 == index => {
|
||||
current = SegmentIndex::Range(start, index)
|
||||
}
|
||||
_ => {
|
||||
items.push(current);
|
||||
current = SegmentIndex::Single(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
items.push(current);
|
||||
|
||||
SegmentIndexArray { items }
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use crate::zgs_grpc_proto::{PingRequest, PingReply};
|
||||
use crate::zgs_grpc_proto::{PingRequest, PingReply, UploadSegmentsByTxSeqRequest, Empty};
|
||||
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcService;
|
||||
use crate::Context;
|
||||
use crate::{rpc_helper, Context, SegmentIndexArray};
|
||||
use crate::types::SegmentWithProof as RpcSegment;
|
||||
|
||||
pub struct ZgsGrpcServiceImpl {
|
||||
pub ctx: Context,
|
||||
@ -16,4 +17,34 @@ impl ZgsGrpcService for ZgsGrpcServiceImpl {
|
||||
let reply = PingReply { message: format!("Echo: {}", msg) };
|
||||
Ok(tonic::Response::new(reply))
|
||||
}
|
||||
|
||||
async fn upload_segments_by_tx_seq(
|
||||
&self,
|
||||
request: tonic::Request<UploadSegmentsByTxSeqRequest>,
|
||||
) -> Result<tonic::Response<Empty>, tonic::Status> {
|
||||
let req = request.into_inner();
|
||||
let segments = req.segments;
|
||||
let tx_seq = req.tx_seq;
|
||||
|
||||
let rpc_segments: Vec<RpcSegment> = segments
|
||||
.into_iter()
|
||||
.map(RpcSegment::try_from)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
let indices = SegmentIndexArray::new(&rpc_segments);
|
||||
info!(%tx_seq, ?indices, "grpc_zgs_uploadSegmentsByTxSeq");
|
||||
|
||||
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await.map_err(|e| {
|
||||
tonic::Status::internal(format!("Failed to get transaction by sequence number: {}", e))
|
||||
})?;
|
||||
for segment in rpc_segments.into_iter() {
|
||||
rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx.clone())
|
||||
.await.map_err(|e| {
|
||||
tonic::Status::internal(format!("Failed to put segment: {}", e))
|
||||
})?;
|
||||
}
|
||||
|
||||
// Return an empty response
|
||||
Ok(tonic::Response::new(Empty {}))
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user