feat: upload_segments (#21)

* feat: upload_segments

* feat: max request body
This commit is contained in:
MiniFrenchBread 2024-02-06 17:51:31 +08:00 committed by GitHub
parent 8f328a9eab
commit 01c2dd1135
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 79 additions and 49 deletions

View File

@ -213,6 +213,10 @@ impl MemoryChunkPool {
/// Updates the cached file info when log entry retrieved from blockchain. /// Updates the cached file info when log entry retrieved from blockchain.
pub async fn update_file_info(&self, tx: &Transaction) -> Result<bool> { pub async fn update_file_info(&self, tx: &Transaction) -> Result<bool> {
info!(
"start to flush cached segments to log store. data root: {}, tx_seq:{}",
tx.data_merkle_root, tx.seq
);
let maybe_file = self let maybe_file = self
.inner .inner
.lock() .lock()
@ -225,7 +229,7 @@ impl MemoryChunkPool {
self.write_chunks( self.write_chunks(
SegmentInfo { SegmentInfo {
root: tx.data_merkle_root, root: tx.data_merkle_root,
seg_data: seg.data.clone(), seg_data: seg.data,
seg_proof: proof, seg_proof: proof,
seg_index, seg_index,
chunks_per_segment: file.chunks_per_segment, chunks_per_segment: file.chunks_per_segment,
@ -236,6 +240,10 @@ impl MemoryChunkPool {
.await? .await?
} }
} }
info!(
"cached segments flushed to log store. data root: {}, tx_seq:{}",
tx.data_merkle_root, tx.seq
);
Ok(true) Ok(true)
} }

View File

@ -6,5 +6,6 @@ pub struct Config {
pub listen_address: SocketAddr, pub listen_address: SocketAddr,
pub listen_address_admin: Option<SocketAddr>, pub listen_address_admin: Option<SocketAddr>,
pub chunks_per_segment: usize, pub chunks_per_segment: usize,
pub max_request_body_size: u32,
pub max_cache_file_size: usize, pub max_cache_file_size: usize,
} }

View File

@ -74,6 +74,10 @@ pub async fn run_server(
Ok(handles) Ok(handles)
} }
fn server_builder(ctx: Context) -> HttpServerBuilder {
HttpServerBuilder::default().max_request_body_size(ctx.config.max_request_body_size)
}
/// Run a single RPC server for all namespace RPCs. /// Run a single RPC server for all namespace RPCs.
async fn run_server_all(ctx: Context) -> Result<HttpServerHandle, Box<dyn Error>> { async fn run_server_all(ctx: Context) -> Result<HttpServerHandle, Box<dyn Error>> {
// public rpc // public rpc
@ -89,7 +93,7 @@ async fn run_server_all(ctx: Context) -> Result<HttpServerHandle, Box<dyn Error>
zgs.merge(mine)?; zgs.merge(mine)?;
} }
Ok(HttpServerBuilder::default() Ok(server_builder(ctx.clone())
.build(ctx.config.listen_address) .build(ctx.config.listen_address)
.await? .await?
.start(zgs)?) .start(zgs)?)
@ -112,12 +116,12 @@ async fn run_server_public_private(
admin.merge(mine)?; admin.merge(mine)?;
} }
let handle_public = HttpServerBuilder::default() let handle_public = server_builder(ctx.clone())
.build(ctx.config.listen_address) .build(ctx.config.listen_address)
.await? .await?
.start(zgs)?; .start(zgs)?;
let handle_private = HttpServerBuilder::default() let handle_private = server_builder(ctx.clone())
.build(listen_addr_private) .build(listen_addr_private)
.await? .await?
.start(admin)?; .start(admin)?;

View File

@ -11,6 +11,9 @@ pub trait Rpc {
#[method(name = "uploadSegment")] #[method(name = "uploadSegment")]
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()>; async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()>;
#[method(name = "uploadSegments")]
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()>;
#[method(name = "downloadSegment")] #[method(name = "downloadSegment")]
async fn download_segment( async fn download_segment(
&self, &self,

View File

@ -35,52 +35,14 @@ impl RpcServer for RpcServerImpl {
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()> { async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
debug!(root = %segment.root, index = %segment.index, "zgs_uploadSegment"); debug!(root = %segment.root, index = %segment.index, "zgs_uploadSegment");
self.put_segment(segment).await
}
let _ = self.ctx.chunk_pool.validate_segment_size(&segment.data)?; async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
debug!("zgs_uploadSegments()");
let maybe_tx = self for segment in segments.into_iter() {
.ctx self.put_segment(segment).await?;
.log_store
.get_tx_by_data_root(&segment.root)
.await?;
let mut need_cache = false;
if self
.ctx
.chunk_pool
.check_already_has_cache(&segment.root)
.await
{
need_cache = true;
} }
if !need_cache {
need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
}
segment.validate(self.ctx.config.chunks_per_segment)?;
let seg_info = SegmentInfo {
root: segment.root,
seg_data: segment.data,
seg_proof: segment.proof,
seg_index: segment.index,
chunks_per_segment: self.ctx.config.chunks_per_segment,
};
if need_cache {
self.ctx.chunk_pool.cache_chunks(seg_info).await?;
} else {
let file_id = FileID {
root: seg_info.root,
tx_id: maybe_tx.unwrap().id(),
};
self.ctx
.chunk_pool
.write_chunks(seg_info, file_id, segment.file_size)
.await?;
}
Ok(()) Ok(())
} }
@ -236,4 +198,54 @@ impl RpcServerImpl {
uploaded_seg_num, uploaded_seg_num,
}) })
} }
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
debug!(root = %segment.root, index = %segment.index, "putSegment");
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
let maybe_tx = self
.ctx
.log_store
.get_tx_by_data_root(&segment.root)
.await?;
let mut need_cache = false;
if self
.ctx
.chunk_pool
.check_already_has_cache(&segment.root)
.await
{
need_cache = true;
}
if !need_cache {
need_cache = self.check_need_cache(&maybe_tx, segment.file_size).await?;
}
segment.validate(self.ctx.config.chunks_per_segment)?;
let seg_info = SegmentInfo {
root: segment.root,
seg_data: segment.data,
seg_proof: segment.proof,
seg_index: segment.index,
chunks_per_segment: self.ctx.config.chunks_per_segment,
};
if need_cache {
self.ctx.chunk_pool.cache_chunks(seg_info).await?;
} else {
let file_id = FileID {
root: seg_info.root,
tx_id: maybe_tx.unwrap().id(),
};
self.ctx
.chunk_pool
.write_chunks(seg_info, file_id, segment.file_size)
.await?;
}
Ok(())
}
} }

View File

@ -219,7 +219,7 @@ impl ClientBuilder {
let async_store = require!("rpc", self, async_store).clone(); let async_store = require!("rpc", self, async_store).clone();
let network_send = require!("rpc", self, network).send.clone(); let network_send = require!("rpc", self, network).send.clone();
let mine_send = self.miner.as_ref().map(|x| x.send.clone()); let mine_send = self.miner.as_ref().map(|x| x.send.clone());
let synced_tx_recv = require!("sync", self, log_sync).send.subscribe(); let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe();
let (chunk_pool, chunk_pool_handler) = let (chunk_pool, chunk_pool_handler) =
chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone()); chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone());

View File

@ -77,6 +77,7 @@ impl ZgsConfig {
enabled: self.rpc_enabled, enabled: self.rpc_enabled,
listen_address, listen_address,
listen_address_admin, listen_address_admin,
max_request_body_size: self.max_request_body_size,
chunks_per_segment: self.rpc_chunks_per_segment, chunks_per_segment: self.rpc_chunks_per_segment,
max_cache_file_size: self.rpc_max_cache_file_size, max_cache_file_size: self.rpc_max_cache_file_size,
}) })

View File

@ -38,6 +38,7 @@ build_config! {
(rpc_enabled, (bool), true) (rpc_enabled, (bool), true)
(rpc_listen_address, (String), "127.0.0.1:5678".to_string()) (rpc_listen_address, (String), "127.0.0.1:5678".to_string())
(rpc_listen_address_admin, (String), "127.0.0.1:5679".to_string()) (rpc_listen_address_admin, (String), "127.0.0.1:5679".to_string())
(max_request_body_size, (u32), 100*1024*1024) // 100MB
(rpc_chunks_per_segment, (usize), 1024) (rpc_chunks_per_segment, (usize), 1024)
(rpc_max_cache_file_size, (usize), 10*1024*1024) //10MB (rpc_max_cache_file_size, (usize), 10*1024*1024) //10MB