mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-11-03 08:07:27 +00:00
@peter/add grpc (#377)
* add grpc for uploading segments * add reflection * add upload segments grpc function * format code * fix lint * fix test
This commit is contained in:
parent
28654efde1
commit
3ba369e9e5
6
.github/actions/setup-rust/action.yml
vendored
6
.github/actions/setup-rust/action.yml
vendored
@ -2,6 +2,12 @@ name: Setup Rust (cache & toolchain)
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
- name: Install protoc compiler
|
||||
shell: bash
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y protobuf-compiler
|
||||
|
||||
- name: Install toolchain 1.78.0
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@ -7,3 +7,5 @@ tests/tmp/**
|
||||
.vscode/*.json
|
||||
/0g-storage-contracts-dev
|
||||
/run/.env
|
||||
|
||||
**.bin
|
||||
|
||||
202
Cargo.lock
generated
202
Cargo.lock
generated
@ -551,6 +551,34 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.6.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core 0.3.4",
|
||||
"bitflags 1.3.2",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"hyper 0.14.29",
|
||||
"itoa",
|
||||
"matchit",
|
||||
"memchr",
|
||||
"mime",
|
||||
"percent-encoding",
|
||||
"pin-project-lite 0.2.14",
|
||||
"rustversion",
|
||||
"serde",
|
||||
"sync_wrapper 0.1.2",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.7.5"
|
||||
@ -558,7 +586,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
"axum-core 0.4.5",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.2.0",
|
||||
@ -578,6 +606,23 @@ dependencies = [
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"mime",
|
||||
"rustversion",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.4.5"
|
||||
@ -682,7 +727,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"peeking_take_while",
|
||||
"prettyplease",
|
||||
"prettyplease 0.2.20",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
@ -1189,7 +1234,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"prost 0.13.4",
|
||||
"prost-types 0.13.4",
|
||||
"tonic",
|
||||
"tonic 0.12.3",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
@ -1213,7 +1258,7 @@ dependencies = [
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tonic 0.12.3",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
@ -2293,7 +2338,7 @@ dependencies = [
|
||||
"ethers-core",
|
||||
"ethers-etherscan",
|
||||
"eyre",
|
||||
"prettyplease",
|
||||
"prettyplease 0.2.20",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
@ -3435,6 +3480,18 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-timeout"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
|
||||
dependencies = [
|
||||
"hyper 0.14.29",
|
||||
"pin-project-lite 0.2.14",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-timeout"
|
||||
version = "0.5.2"
|
||||
@ -6104,6 +6161,16 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
|
||||
|
||||
[[package]]
|
||||
name = "prettyplease"
|
||||
version = "0.1.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prettyplease"
|
||||
version = "0.2.20"
|
||||
@ -6277,6 +6344,16 @@ dependencies = [
|
||||
"prost-derive 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.11.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive 0.11.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.13.4"
|
||||
@ -6329,6 +6406,28 @@ dependencies = [
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-build"
|
||||
version = "0.11.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck 0.4.1",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"multimap",
|
||||
"petgraph",
|
||||
"prettyplease 0.1.25",
|
||||
"prost 0.11.9",
|
||||
"prost-types 0.11.9",
|
||||
"regex",
|
||||
"syn 1.0.109",
|
||||
"tempfile",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-codec"
|
||||
version = "0.1.0"
|
||||
@ -6368,6 +6467,19 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.11.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.10.5",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.13.4"
|
||||
@ -6401,6 +6513,15 @@ dependencies = [
|
||||
"prost 0.10.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.11.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
|
||||
dependencies = [
|
||||
"prost 0.11.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.13.4"
|
||||
@ -6866,11 +6987,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rpc"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"append_merkle",
|
||||
"base64 0.13.1",
|
||||
"chunk_pool",
|
||||
"ethereum-types 0.14.1",
|
||||
"file_location_cache",
|
||||
"futures",
|
||||
"futures-channel",
|
||||
@ -6881,6 +7003,9 @@ dependencies = [
|
||||
"miner",
|
||||
"network",
|
||||
"parking_lot 0.12.3",
|
||||
"prost 0.11.9",
|
||||
"prost-build 0.11.9",
|
||||
"prost-types 0.11.9",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"shared_types",
|
||||
@ -6889,6 +7014,9 @@ dependencies = [
|
||||
"sync",
|
||||
"task_executor",
|
||||
"tokio",
|
||||
"tonic 0.9.2",
|
||||
"tonic-build",
|
||||
"tonic-reflection",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@ -8188,6 +8316,34 @@ dependencies = [
|
||||
"winnow 0.6.13",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum 0.6.20",
|
||||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2 0.3.26",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"hyper 0.14.29",
|
||||
"hyper-timeout 0.4.1",
|
||||
"percent-encoding",
|
||||
"pin-project 1.1.5",
|
||||
"prost 0.11.9",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.12.3"
|
||||
@ -8196,7 +8352,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"axum 0.7.5",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"h2 0.4.7",
|
||||
@ -8204,7 +8360,7 @@ dependencies = [
|
||||
"http-body 1.0.1",
|
||||
"http-body-util",
|
||||
"hyper 1.5.2",
|
||||
"hyper-timeout",
|
||||
"hyper-timeout 0.5.2",
|
||||
"hyper-util",
|
||||
"percent-encoding",
|
||||
"pin-project 1.1.5",
|
||||
@ -8218,6 +8374,32 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-build"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07"
|
||||
dependencies = [
|
||||
"prettyplease 0.1.25",
|
||||
"proc-macro2",
|
||||
"prost-build 0.11.9",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-reflection"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0543d7092032041fbeac1f2c84304537553421a11a623c2301b12ef0264862c7"
|
||||
dependencies = [
|
||||
"prost 0.11.9",
|
||||
"prost-types 0.11.9",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.9.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
@ -8246,9 +8428,9 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.2"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
||||
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
[package]
|
||||
name = "rpc"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
build = "build.rs"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
@ -28,3 +29,12 @@ merkle_tree = { path = "../../common/merkle_tree"}
|
||||
futures-channel = "^0.3"
|
||||
metrics = { workspace = true }
|
||||
parking_lot = "0.12.3"
|
||||
tonic = { version = "0.9.2", features = ["transport"] }
|
||||
prost = "0.11.9"
|
||||
prost-types = "0.11.9"
|
||||
tonic-reflection = "0.9.2"
|
||||
ethereum-types = "0.14"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.9.2"
|
||||
prost-build = "0.11.9"
|
||||
|
||||
7
node/rpc/build.rs
Normal file
7
node/rpc/build.rs
Normal file
@ -0,0 +1,7 @@
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Compile proto/my_service.proto
|
||||
tonic_build::configure()
|
||||
.file_descriptor_set_path("proto/zgs_grpc_descriptor.bin")
|
||||
.compile(&["proto/zgs_grpc.proto"], &["proto"])?;
|
||||
Ok(())
|
||||
}
|
||||
48
node/rpc/proto/zgs_grpc.proto
Normal file
48
node/rpc/proto/zgs_grpc.proto
Normal file
@ -0,0 +1,48 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package zgs_grpc;
|
||||
|
||||
option go_package = "github.com/0glabs/0g-storage-client/node/proto;zgs_grpc";
|
||||
|
||||
message Empty {}
|
||||
|
||||
/// 32-byte hash root
|
||||
message DataRoot {
|
||||
bytes value = 1;
|
||||
}
|
||||
|
||||
/// A proof over a file-segment Merkle tree
|
||||
message FileProof {
|
||||
/// sequence of 32-byte hashes
|
||||
repeated bytes lemma = 1;
|
||||
/// bit-paths (left=false, right=true) alongside the lemmas
|
||||
repeated bool path = 2;
|
||||
}
|
||||
|
||||
/// A file segment plus its Merkle proof
|
||||
message SegmentWithProof {
|
||||
DataRoot root = 1; // file Merkle root
|
||||
bytes data = 2; // raw segment bytes
|
||||
uint64 index = 3; // segment index
|
||||
FileProof proof = 4; // Merkle proof of this leaf
|
||||
uint64 file_size = 5; // total file length
|
||||
}
|
||||
|
||||
message UploadSegmentsByTxSeqRequest {
|
||||
repeated SegmentWithProof segments = 1;
|
||||
uint64 tx_seq = 2;
|
||||
}
|
||||
|
||||
message PingRequest {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
message PingReply {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
// A trivial ping service
|
||||
service ZgsGrpcService {
|
||||
rpc Ping (PingRequest) returns (PingReply);
|
||||
rpc UploadSegmentsByTxSeq(UploadSegmentsByTxSeqRequest) returns (Empty);
|
||||
}
|
||||
@ -8,6 +8,7 @@ pub struct Config {
|
||||
pub enabled: bool,
|
||||
pub listen_address: SocketAddr,
|
||||
pub listen_address_admin: SocketAddr,
|
||||
pub listen_address_grpc: SocketAddr,
|
||||
pub chunks_per_segment: usize,
|
||||
pub max_request_body_size: u32,
|
||||
pub max_cache_file_size: usize,
|
||||
@ -19,6 +20,7 @@ impl Default for Config {
|
||||
enabled: true,
|
||||
listen_address: SocketAddr::from_str("0.0.0.0:5678").unwrap(),
|
||||
listen_address_admin: SocketAddr::from_str("127.0.0.1:5679").unwrap(),
|
||||
listen_address_grpc: SocketAddr::from_str("0.0.0.0:50051").unwrap(),
|
||||
chunks_per_segment: 1024,
|
||||
max_request_body_size: 100 * 1024 * 1024, // 100MB
|
||||
max_cache_file_size: 10 * 1024 * 1024, // 10MB
|
||||
|
||||
@ -8,10 +8,15 @@ mod config;
|
||||
mod error;
|
||||
mod middleware;
|
||||
mod miner;
|
||||
mod rpc_helper;
|
||||
pub mod types;
|
||||
mod zgs;
|
||||
mod zgs_grpc;
|
||||
|
||||
use crate::miner::RpcServer as MinerRpcServer;
|
||||
use crate::types::SegmentWithProof;
|
||||
use crate::zgs_grpc::r#impl::ZgsGrpcServiceImpl;
|
||||
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcServiceServer;
|
||||
use admin::RpcServer as AdminRpcServer;
|
||||
use chunk_pool::MemoryChunkPool;
|
||||
use file_location_cache::FileLocationCache;
|
||||
@ -20,11 +25,14 @@ use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
|
||||
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
|
||||
use std::error::Error;
|
||||
use std::fmt::{Debug, Formatter, Result as FmtResult};
|
||||
use std::sync::Arc;
|
||||
use storage_async::Store;
|
||||
use sync::{SyncRequest, SyncResponse, SyncSender};
|
||||
use task_executor::ShutdownReason;
|
||||
use tokio::sync::broadcast;
|
||||
use tonic::transport::Server;
|
||||
use tonic_reflection::server::Builder as ReflectionBuilder;
|
||||
use zgs::RpcServer as ZgsRpcServer;
|
||||
use zgs_miner::MinerMessage;
|
||||
|
||||
@ -33,6 +41,12 @@ pub use config::Config as RPCConfig;
|
||||
pub use miner::RpcClient as ZgsMinerRpcClient;
|
||||
pub use zgs::RpcClient as ZgsRPCClient;
|
||||
|
||||
pub mod zgs_grpc_proto {
|
||||
tonic::include_proto!("zgs_grpc");
|
||||
}
|
||||
|
||||
const DESCRIPTOR_SET: &[u8] = include_bytes!("../proto/zgs_grpc_descriptor.bin");
|
||||
|
||||
/// A wrapper around all the items required to spawn the HTTP server.
|
||||
///
|
||||
/// The server will gracefully handle the case where any fields are `None`.
|
||||
@ -73,7 +87,7 @@ pub async fn run_server(
|
||||
(run_server_all(ctx).await?, None)
|
||||
};
|
||||
|
||||
info!("Server started");
|
||||
info!("Rpc Server started");
|
||||
|
||||
Ok(handles)
|
||||
}
|
||||
@ -133,3 +147,76 @@ async fn run_server_public_private(
|
||||
|
||||
Ok((handle_public, Some(handle_private)))
|
||||
}
|
||||
|
||||
pub async fn run_grpc_server(ctx: Context) -> Result<(), Box<dyn Error>> {
|
||||
let grpc_addr = ctx.config.listen_address_grpc;
|
||||
let reflection = ReflectionBuilder::configure()
|
||||
.register_encoded_file_descriptor_set(DESCRIPTOR_SET)
|
||||
.build()?;
|
||||
|
||||
let server = ZgsGrpcServiceServer::new(ZgsGrpcServiceImpl { ctx });
|
||||
Server::builder()
|
||||
.add_service(server)
|
||||
.add_service(reflection)
|
||||
.serve(grpc_addr)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
enum SegmentIndex {
|
||||
Single(usize),
|
||||
Range(usize, usize), // [start, end]
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndex {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
|
||||
match self {
|
||||
Self::Single(val) => write!(f, "{}", val),
|
||||
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SegmentIndexArray {
|
||||
items: Vec<SegmentIndex>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndexArray {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
|
||||
match self.items.first() {
|
||||
None => write!(f, "NULL"),
|
||||
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
|
||||
_ => write!(f, "{:?}", self.items),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentIndexArray {
|
||||
fn new(segments: &[SegmentWithProof]) -> Self {
|
||||
let mut items = Vec::new();
|
||||
|
||||
let mut current = match segments.first() {
|
||||
None => return SegmentIndexArray { items },
|
||||
Some(seg) => SegmentIndex::Single(seg.index),
|
||||
};
|
||||
|
||||
for index in segments.iter().skip(1).map(|seg| seg.index) {
|
||||
match current {
|
||||
SegmentIndex::Single(val) if val + 1 == index => {
|
||||
current = SegmentIndex::Range(val, index)
|
||||
}
|
||||
SegmentIndex::Range(start, end) if end + 1 == index => {
|
||||
current = SegmentIndex::Range(start, index)
|
||||
}
|
||||
_ => {
|
||||
items.push(current);
|
||||
current = SegmentIndex::Single(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
items.push(current);
|
||||
|
||||
SegmentIndexArray { items }
|
||||
}
|
||||
}
|
||||
|
||||
99
node/rpc/src/rpc_helper.rs
Normal file
99
node/rpc/src/rpc_helper.rs
Normal file
@ -0,0 +1,99 @@
|
||||
use crate::error;
|
||||
use crate::types::SegmentWithProof;
|
||||
use crate::Context;
|
||||
use chunk_pool::SegmentInfo;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use shared_types::Transaction;
|
||||
|
||||
/// Put a single segment (mirrors your old `put_segment`)
|
||||
pub async fn put_segment(ctx: &Context, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||
|
||||
// fetch optional tx
|
||||
let maybe_tx = ctx
|
||||
.log_store
|
||||
.get_tx_by_data_root(&segment.root, false)
|
||||
.await?;
|
||||
|
||||
put_segment_with_maybe_tx(ctx, segment, maybe_tx).await
|
||||
}
|
||||
|
||||
/// Put a segment, given an optional Transaction (mirrors `put_segment_with_maybe_tx`)
|
||||
pub async fn put_segment_with_maybe_tx(
|
||||
ctx: &Context,
|
||||
segment: SegmentWithProof,
|
||||
maybe_tx: Option<Transaction>,
|
||||
) -> RpcResult<()> {
|
||||
ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
|
||||
if let Some(tx) = &maybe_tx {
|
||||
if tx.data_merkle_root != segment.root {
|
||||
return Err(error::internal_error("data root and tx seq not match"));
|
||||
}
|
||||
}
|
||||
|
||||
// decide cache vs write
|
||||
let need_cache = if ctx.chunk_pool.check_already_has_cache(&segment.root).await {
|
||||
true
|
||||
} else {
|
||||
check_need_cache(ctx, &maybe_tx, segment.file_size).await?
|
||||
};
|
||||
|
||||
segment.validate(ctx.config.chunks_per_segment)?;
|
||||
|
||||
let seg_info = SegmentInfo {
|
||||
root: segment.root,
|
||||
seg_data: segment.data,
|
||||
seg_proof: segment.proof,
|
||||
seg_index: segment.index,
|
||||
chunks_per_segment: ctx.config.chunks_per_segment,
|
||||
};
|
||||
|
||||
if need_cache {
|
||||
ctx.chunk_pool.cache_chunks(seg_info).await?;
|
||||
} else {
|
||||
let file_id = chunk_pool::FileID {
|
||||
root: seg_info.root,
|
||||
tx_id: maybe_tx.unwrap().id(),
|
||||
};
|
||||
ctx.chunk_pool
|
||||
.write_chunks(seg_info, file_id, segment.file_size)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The old `check_need_cache`
|
||||
pub async fn check_need_cache(
|
||||
ctx: &Context,
|
||||
maybe_tx: &Option<Transaction>,
|
||||
file_size: usize,
|
||||
) -> RpcResult<bool> {
|
||||
if let Some(tx) = maybe_tx {
|
||||
if tx.size != file_size as u64 {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"segment file size not matched with tx file size",
|
||||
));
|
||||
}
|
||||
if ctx.log_store.check_tx_completed(tx.seq).await? {
|
||||
return Err(error::invalid_params(
|
||||
"root",
|
||||
"already uploaded and finalized",
|
||||
));
|
||||
}
|
||||
if ctx.log_store.check_tx_pruned(tx.seq).await? {
|
||||
return Err(error::invalid_params("root", "already pruned"));
|
||||
}
|
||||
Ok(false)
|
||||
} else {
|
||||
if file_size > ctx.config.max_cache_file_size {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"caching of large file when tx is unavailable is not supported",
|
||||
));
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
use crate::error;
|
||||
use crate::{error, zgs_grpc_proto};
|
||||
use append_merkle::ZERO_HASHES;
|
||||
use ethereum_types::H256 as EthH256;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use merkle_light::hash::Algorithm;
|
||||
use merkle_light::merkle::{log2_pow2, next_pow2, MerkleTree};
|
||||
@ -11,12 +12,14 @@ use shared_types::{
|
||||
Transaction, CHUNK_SIZE,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
use std::convert::TryFrom;
|
||||
use std::hash::Hasher;
|
||||
use std::net::IpAddr;
|
||||
use std::time::Instant;
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::log_manager::bytes_to_entries;
|
||||
use storage::H256;
|
||||
use tonic::Status as GrpcStatus;
|
||||
|
||||
const ZERO_HASH: [u8; 32] = [
|
||||
0xd3, 0x97, 0xb3, 0xb0, 0x43, 0xd8, 0x7f, 0xcd, 0x6f, 0xad, 0x12, 0x91, 0xff, 0xb, 0xfd, 0x16,
|
||||
@ -76,6 +79,77 @@ pub struct SegmentWithProof {
|
||||
pub file_size: usize,
|
||||
}
|
||||
|
||||
/// Convert the proto DataRoot → your app’s DataRoot
|
||||
impl TryFrom<zgs_grpc_proto::DataRoot> for DataRoot {
|
||||
type Error = GrpcStatus;
|
||||
|
||||
fn try_from(value: zgs_grpc_proto::DataRoot) -> Result<Self, GrpcStatus> {
|
||||
let bytes = value.value;
|
||||
if bytes.len() != 32 {
|
||||
return Err(GrpcStatus::invalid_argument(format!(
|
||||
"Invalid hash length: got {}, want 32",
|
||||
bytes.len()
|
||||
)));
|
||||
}
|
||||
// assume AppDataRoot is a newtype around H256:
|
||||
let mut arr = [0u8; 32];
|
||||
arr.copy_from_slice(&bytes);
|
||||
Ok(EthH256(arr))
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert proto FileProof → your app’s FileProof
|
||||
impl TryFrom<zgs_grpc_proto::FileProof> for FileProof {
|
||||
type Error = GrpcStatus;
|
||||
|
||||
fn try_from(value: zgs_grpc_proto::FileProof) -> Result<Self, GrpcStatus> {
|
||||
// turn each `bytes` into an H256
|
||||
let mut lemma = Vec::with_capacity(value.lemma.len());
|
||||
for bin in value.lemma {
|
||||
if bin.len() != 32 {
|
||||
return Err(GrpcStatus::invalid_argument(format!(
|
||||
"Invalid hash length: got {}, want 32",
|
||||
bin.len()
|
||||
)));
|
||||
}
|
||||
let mut arr = [0u8; 32];
|
||||
arr.copy_from_slice(&bin);
|
||||
lemma.push(H256(arr));
|
||||
}
|
||||
|
||||
Ok(FileProof {
|
||||
lemma,
|
||||
path: value.path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert the full SegmentWithProof
|
||||
impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof {
|
||||
type Error = GrpcStatus;
|
||||
|
||||
fn try_from(grpc_segment: zgs_grpc_proto::SegmentWithProof) -> Result<Self, GrpcStatus> {
|
||||
let root = grpc_segment.root.unwrap().try_into()?;
|
||||
let data = grpc_segment.data;
|
||||
// index is u64 in proto, usize in app
|
||||
let index = grpc_segment.index.try_into().map_err(|_| {
|
||||
GrpcStatus::invalid_argument(format!("Invalid segment index: {}", grpc_segment.index))
|
||||
})?;
|
||||
let proof = grpc_segment.proof.unwrap().try_into()?;
|
||||
let file_size = grpc_segment.file_size.try_into().map_err(|_| {
|
||||
GrpcStatus::invalid_argument(format!("Invalid file size: {}", grpc_segment.file_size))
|
||||
})?;
|
||||
|
||||
Ok(SegmentWithProof {
|
||||
root,
|
||||
data,
|
||||
index,
|
||||
proof,
|
||||
file_size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentWithProof {
|
||||
/// Splits file into segments and returns the total number of segments and the last segment size.
|
||||
pub fn split_file_into_segments(
|
||||
|
||||
@ -1,12 +1,10 @@
|
||||
use super::api::RpcServer;
|
||||
use crate::error;
|
||||
use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
|
||||
use crate::Context;
|
||||
use chunk_pool::{FileID, SegmentInfo};
|
||||
use crate::{error, rpc_helper, SegmentIndexArray};
|
||||
use jsonrpsee::core::async_trait;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
|
||||
use std::fmt::{Debug, Formatter, Result};
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::tx_store::TxStatus;
|
||||
use storage::{try_option, H256};
|
||||
@ -39,7 +37,7 @@ impl RpcServer for RpcServerImpl {
|
||||
|
||||
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
info!(root = %segment.root, index = %segment.index, "zgs_uploadSegment");
|
||||
self.put_segment(segment).await
|
||||
rpc_helper::put_segment(&self.ctx, segment).await
|
||||
}
|
||||
|
||||
async fn upload_segment_by_tx_seq(
|
||||
@ -49,7 +47,7 @@ impl RpcServer for RpcServerImpl {
|
||||
) -> RpcResult<()> {
|
||||
info!(tx_seq = %tx_seq, index = %segment.index, "zgs_uploadSegmentByTxSeq");
|
||||
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx).await
|
||||
}
|
||||
|
||||
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
|
||||
@ -61,7 +59,7 @@ impl RpcServer for RpcServerImpl {
|
||||
info!(%root, ?indices, "zgs_uploadSegments");
|
||||
|
||||
for segment in segments.into_iter() {
|
||||
self.put_segment(segment).await?;
|
||||
rpc_helper::put_segment(&self.ctx, segment).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -77,9 +75,7 @@ impl RpcServer for RpcServerImpl {
|
||||
|
||||
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||
for segment in segments.into_iter() {
|
||||
match self
|
||||
.put_segment_with_maybe_tx(segment, maybe_tx.clone())
|
||||
.await
|
||||
match rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx.clone()).await
|
||||
{
|
||||
Ok(()) => {} // success
|
||||
Err(e)
|
||||
@ -235,44 +231,44 @@ impl RpcServer for RpcServerImpl {
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
async fn check_need_cache(
|
||||
&self,
|
||||
maybe_tx: &Option<Transaction>,
|
||||
file_size: usize,
|
||||
) -> RpcResult<bool> {
|
||||
if let Some(tx) = maybe_tx {
|
||||
if tx.size != file_size as u64 {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"segment file size not matched with tx file size",
|
||||
));
|
||||
}
|
||||
// async fn check_need_cache(
|
||||
// &self,
|
||||
// maybe_tx: &Option<Transaction>,
|
||||
// file_size: usize,
|
||||
// ) -> RpcResult<bool> {
|
||||
// if let Some(tx) = maybe_tx {
|
||||
// if tx.size != file_size as u64 {
|
||||
// return Err(error::invalid_params(
|
||||
// "file_size",
|
||||
// "segment file size not matched with tx file size",
|
||||
// ));
|
||||
// }
|
||||
|
||||
// Transaction already finalized for the specified file data root.
|
||||
if self.ctx.log_store.check_tx_completed(tx.seq).await? {
|
||||
return Err(error::invalid_params(
|
||||
"root",
|
||||
"already uploaded and finalized",
|
||||
));
|
||||
}
|
||||
// // Transaction already finalized for the specified file data root.
|
||||
// if self.ctx.log_store.check_tx_completed(tx.seq).await? {
|
||||
// return Err(error::invalid_params(
|
||||
// "root",
|
||||
// "already uploaded and finalized",
|
||||
// ));
|
||||
// }
|
||||
|
||||
if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
|
||||
return Err(error::invalid_params("root", "already pruned"));
|
||||
}
|
||||
// if self.ctx.log_store.check_tx_pruned(tx.seq).await? {
|
||||
// return Err(error::invalid_params("root", "already pruned"));
|
||||
// }
|
||||
|
||||
Ok(false)
|
||||
} else {
|
||||
//Check whether file is small enough to cache in the system
|
||||
if file_size > self.ctx.config.max_cache_file_size {
|
||||
return Err(error::invalid_params(
|
||||
"file_size",
|
||||
"caching of large file when tx is unavailable is not supported",
|
||||
));
|
||||
}
|
||||
// Ok(false)
|
||||
// } else {
|
||||
// //Check whether file is small enough to cache in the system
|
||||
// if file_size > self.ctx.config.max_cache_file_size {
|
||||
// return Err(error::invalid_params(
|
||||
// "file_size",
|
||||
// "caching of large file when tx is unavailable is not supported",
|
||||
// ));
|
||||
// }
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
// Ok(true)
|
||||
// }
|
||||
// }
|
||||
|
||||
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
||||
let (finalized, pruned) = match self.ctx.log_store.get_store().get_tx_status(tx.seq)? {
|
||||
@ -312,69 +308,69 @@ impl RpcServerImpl {
|
||||
})
|
||||
}
|
||||
|
||||
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||
// async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
// debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||
|
||||
let maybe_tx = self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_tx_by_data_root(&segment.root, false)
|
||||
.await?;
|
||||
// let maybe_tx = self
|
||||
// .ctx
|
||||
// .log_store
|
||||
// .get_tx_by_data_root(&segment.root, false)
|
||||
// .await?;
|
||||
|
||||
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
}
|
||||
// self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
// }
|
||||
|
||||
async fn put_segment_with_maybe_tx(
|
||||
&self,
|
||||
segment: SegmentWithProof,
|
||||
maybe_tx: Option<Transaction>,
|
||||
) -> RpcResult<()> {
|
||||
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
// async fn put_segment_with_maybe_tx(
|
||||
// &self,
|
||||
// segment: SegmentWithProof,
|
||||
// maybe_tx: Option<Transaction>,
|
||||
// ) -> RpcResult<()> {
|
||||
// self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
|
||||
if let Some(tx) = &maybe_tx {
|
||||
if tx.data_merkle_root != segment.root {
|
||||
return Err(error::internal_error("data root and tx seq not match"));
|
||||
}
|
||||
}
|
||||
// if let Some(tx) = &maybe_tx {
|
||||
// if tx.data_merkle_root != segment.root {
|
||||
// return Err(error::internal_error("data root and tx seq not match"));
|
||||
// }
|
||||
// }
|
||||
|
||||
let mut need_cache = false;
|
||||
if self
|
||||
.ctx
|
||||
.chunk_pool
|
||||
.check_already_has_cache(&segment.root)
|
||||
.await
|
||||
{
|
||||
need_cache = true;
|
||||
}
|
||||
// let mut need_cache = false;
|
||||
// if self
|
||||
// .ctx
|
||||
// .chunk_pool
|
||||
// .check_already_has_cache(&segment.root)
|
||||
// .await
|
||||
// {
|
||||
// need_cache = true;
|
||||
// }
|
||||
|
||||
if !need_cache {
|
||||
need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
|
||||
}
|
||||
// if !need_cache {
|
||||
// need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
|
||||
// }
|
||||
|
||||
segment.validate(self.ctx.config.chunks_per_segment)?;
|
||||
// segment.validate(self.ctx.config.chunks_per_segment)?;
|
||||
|
||||
let seg_info = SegmentInfo {
|
||||
root: segment.root,
|
||||
seg_data: segment.data,
|
||||
seg_proof: segment.proof,
|
||||
seg_index: segment.index,
|
||||
chunks_per_segment: self.ctx.config.chunks_per_segment,
|
||||
};
|
||||
// let seg_info = SegmentInfo {
|
||||
// root: segment.root,
|
||||
// seg_data: segment.data,
|
||||
// seg_proof: segment.proof,
|
||||
// seg_index: segment.index,
|
||||
// chunks_per_segment: self.ctx.config.chunks_per_segment,
|
||||
// };
|
||||
|
||||
if need_cache {
|
||||
self.ctx.chunk_pool.cache_chunks(seg_info).await?;
|
||||
} else {
|
||||
let file_id = FileID {
|
||||
root: seg_info.root,
|
||||
tx_id: maybe_tx.unwrap().id(),
|
||||
};
|
||||
self.ctx
|
||||
.chunk_pool
|
||||
.write_chunks(seg_info, file_id, segment.file_size)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// if need_cache {
|
||||
// self.ctx.chunk_pool.cache_chunks(seg_info).await?;
|
||||
// } else {
|
||||
// let file_id = FileID {
|
||||
// root: seg_info.root,
|
||||
// tx_id: maybe_tx.unwrap().id(),
|
||||
// };
|
||||
// self.ctx
|
||||
// .chunk_pool
|
||||
// .write_chunks(seg_info, file_id, segment.file_size)
|
||||
// .await?;
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
async fn get_segment_by_tx_seq(
|
||||
&self,
|
||||
@ -447,61 +443,3 @@ impl RpcServerImpl {
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
enum SegmentIndex {
|
||||
Single(usize),
|
||||
Range(usize, usize), // [start, end]
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndex {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
|
||||
match self {
|
||||
Self::Single(val) => write!(f, "{}", val),
|
||||
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SegmentIndexArray {
|
||||
items: Vec<SegmentIndex>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentIndexArray {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
|
||||
match self.items.first() {
|
||||
None => write!(f, "NULL"),
|
||||
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
|
||||
_ => write!(f, "{:?}", self.items),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentIndexArray {
|
||||
fn new(segments: &[SegmentWithProof]) -> Self {
|
||||
let mut items = Vec::new();
|
||||
|
||||
let mut current = match segments.first() {
|
||||
None => return SegmentIndexArray { items },
|
||||
Some(seg) => SegmentIndex::Single(seg.index),
|
||||
};
|
||||
|
||||
for index in segments.iter().skip(1).map(|seg| seg.index) {
|
||||
match current {
|
||||
SegmentIndex::Single(val) if val + 1 == index => {
|
||||
current = SegmentIndex::Range(val, index)
|
||||
}
|
||||
SegmentIndex::Range(start, end) if end + 1 == index => {
|
||||
current = SegmentIndex::Range(start, index)
|
||||
}
|
||||
_ => {
|
||||
items.push(current);
|
||||
current = SegmentIndex::Single(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
items.push(current);
|
||||
|
||||
SegmentIndexArray { items }
|
||||
}
|
||||
}
|
||||
|
||||
62
node/rpc/src/zgs_grpc/impl.rs
Normal file
62
node/rpc/src/zgs_grpc/impl.rs
Normal file
@ -0,0 +1,62 @@
|
||||
use crate::types::SegmentWithProof as RpcSegment;
|
||||
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcService;
|
||||
use crate::zgs_grpc_proto::{Empty, PingReply, PingRequest, UploadSegmentsByTxSeqRequest};
|
||||
use crate::{rpc_helper, Context, SegmentIndexArray};
|
||||
|
||||
pub struct ZgsGrpcServiceImpl {
|
||||
pub ctx: Context,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl ZgsGrpcService for ZgsGrpcServiceImpl {
|
||||
async fn ping(
|
||||
&self,
|
||||
request: tonic::Request<PingRequest>,
|
||||
) -> Result<tonic::Response<PingReply>, tonic::Status> {
|
||||
let msg = request.into_inner().message;
|
||||
let reply = PingReply {
|
||||
message: format!("Echo: {}", msg),
|
||||
};
|
||||
Ok(tonic::Response::new(reply))
|
||||
}
|
||||
|
||||
async fn upload_segments_by_tx_seq(
|
||||
&self,
|
||||
request: tonic::Request<UploadSegmentsByTxSeqRequest>,
|
||||
) -> Result<tonic::Response<Empty>, tonic::Status> {
|
||||
let req = request.into_inner();
|
||||
let segments = req.segments;
|
||||
let tx_seq = req.tx_seq;
|
||||
|
||||
let rpc_segments = segments
|
||||
.into_iter()
|
||||
.map(|s| {
|
||||
RpcSegment::try_from(s)
|
||||
.map_err(|e| tonic::Status::invalid_argument(format!("Invalid segment: {}", e)))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let indices = SegmentIndexArray::new(&rpc_segments);
|
||||
info!(%tx_seq, ?indices, "grpc_zgs_uploadSegmentsByTxSeq");
|
||||
|
||||
let maybe_tx = self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_tx_by_seq_number(tx_seq)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tonic::Status::internal(format!(
|
||||
"Failed to get transaction by sequence number: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
for segment in rpc_segments.into_iter() {
|
||||
rpc_helper::put_segment_with_maybe_tx(&self.ctx, segment, maybe_tx.clone())
|
||||
.await
|
||||
.map_err(|e| tonic::Status::internal(format!("Failed to put segment: {}", e)))?;
|
||||
}
|
||||
|
||||
// Return an empty response
|
||||
Ok(tonic::Response::new(Empty {}))
|
||||
}
|
||||
}
|
||||
1
node/rpc/src/zgs_grpc/mod.rs
Normal file
1
node/rpc/src/zgs_grpc/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod r#impl;
|
||||
@ -298,7 +298,7 @@ impl ClientBuilder {
|
||||
mine_service_sender: mine_send,
|
||||
};
|
||||
|
||||
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx)
|
||||
let (rpc_handle, maybe_admin_rpc_handle) = rpc::run_server(ctx.clone())
|
||||
.await
|
||||
.map_err(|e| format!("Unable to start HTTP RPC server: {:?}", e))?;
|
||||
|
||||
@ -307,6 +307,15 @@ impl ClientBuilder {
|
||||
executor.spawn(admin_rpc_handle, "rpc_admin");
|
||||
}
|
||||
|
||||
executor.spawn(
|
||||
async move {
|
||||
rpc::run_grpc_server(ctx.clone())
|
||||
.await
|
||||
.expect("Failed to start gRPC server");
|
||||
},
|
||||
"grpc",
|
||||
);
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
|
||||
@ -305,6 +305,9 @@ auto_sync_enabled = true
|
||||
# HTTP server address to bind for admin and debug RPC.
|
||||
# listen_address_admin = "127.0.0.1:5679"
|
||||
|
||||
## Grpc server address to bind
|
||||
# listen_address_grpc = "0.0.0.0:50051"
|
||||
|
||||
# Number of chunks for a single segment.
|
||||
# chunks_per_segment = 1024
|
||||
|
||||
|
||||
@ -317,6 +317,9 @@ auto_sync_enabled = true
|
||||
# HTTP server address to bind for admin and debug RPC.
|
||||
# listen_address_admin = "127.0.0.1:5679"
|
||||
|
||||
## Grpc server address to bind
|
||||
# listen_address_grpc = "0.0.0.0:50051"
|
||||
|
||||
# Number of chunks for a single segment.
|
||||
# chunks_per_segment = 1024
|
||||
|
||||
|
||||
@ -319,6 +319,9 @@
|
||||
# HTTP server address to bind for admin and debug RPC.
|
||||
# listen_address_admin = "127.0.0.1:5679"
|
||||
|
||||
## Grpc server address to bind
|
||||
# listen_address_grpc = "0.0.0.0:50051"
|
||||
|
||||
# Number of chunks for a single segment.
|
||||
# chunks_per_segment = 1024
|
||||
|
||||
|
||||
@ -8,6 +8,7 @@ from utility.utils import (
|
||||
initialize_toml_config,
|
||||
p2p_port,
|
||||
rpc_port,
|
||||
grpc_port,
|
||||
blockchain_rpc_port,
|
||||
)
|
||||
|
||||
@ -37,6 +38,7 @@ class ZgsNode(TestNode):
|
||||
libp2p_nodes.append(f"/ip4/127.0.0.1/tcp/{p2p_port(i)}")
|
||||
|
||||
rpc_listen_address = f"127.0.0.1:{rpc_port(index)}"
|
||||
grpc_listen_address = f"127.0.0.1:{grpc_port(index)}"
|
||||
|
||||
indexed_config = {
|
||||
"network_libp2p_port": p2p_port(index),
|
||||
@ -44,6 +46,7 @@ class ZgsNode(TestNode):
|
||||
"rpc": {
|
||||
"listen_address": rpc_listen_address,
|
||||
"listen_address_admin": rpc_listen_address,
|
||||
"listen_address_grpc": grpc_listen_address,
|
||||
},
|
||||
"network_libp2p_nodes": libp2p_nodes,
|
||||
"log_contract_address": log_contract_address,
|
||||
|
||||
@ -23,30 +23,33 @@ def p2p_port(n):
|
||||
def rpc_port(n):
|
||||
return PortMin.n + MAX_NODES + n
|
||||
|
||||
def grpc_port(n):
|
||||
return PortMin.n + 2 * MAX_NODES + n
|
||||
|
||||
|
||||
def blockchain_p2p_port(n):
|
||||
assert n <= MAX_BLOCKCHAIN_NODES
|
||||
return PortMin.n + MAX_NODES + MAX_BLOCKCHAIN_NODES + n
|
||||
return PortMin.n + 3 * MAX_NODES + n
|
||||
|
||||
|
||||
def blockchain_rpc_port(n):
|
||||
return PortMin.n + MAX_NODES + 2 * MAX_BLOCKCHAIN_NODES + n
|
||||
return PortMin.n + 3 * MAX_NODES + MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
|
||||
def blockchain_rpc_port_core(n):
|
||||
return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
|
||||
return PortMin.n + 3 * MAX_NODES + 2 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
|
||||
def blockchain_ws_port(n):
|
||||
return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
|
||||
return PortMin.n + 3 * MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
|
||||
def blockchain_rpc_port_tendermint(n):
|
||||
return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
|
||||
return PortMin.n + 3 * MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
|
||||
def pprof_port(n):
|
||||
return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n
|
||||
return PortMin.n + 3 * MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
|
||||
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user