mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-18 11:05:18 +00:00
feat: tx_seq rpc (#226)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
This commit is contained in:
parent
b131dc532f
commit
79d960d4ea
@ -12,9 +12,23 @@ pub trait Rpc {
|
||||
#[method(name = "uploadSegment")]
|
||||
async fn upload_segment(&self, segment: SegmentWithProof) -> RpcResult<()>;
|
||||
|
||||
#[method(name = "uploadSegmentByTxSeq")]
|
||||
async fn upload_segment_by_tx_seq(
|
||||
&self,
|
||||
segment: SegmentWithProof,
|
||||
tx_seq: u64,
|
||||
) -> RpcResult<()>;
|
||||
|
||||
#[method(name = "uploadSegments")]
|
||||
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()>;
|
||||
|
||||
#[method(name = "uploadSegmentsByTxSeq")]
|
||||
async fn upload_segments_by_tx_seq(
|
||||
&self,
|
||||
segments: Vec<SegmentWithProof>,
|
||||
tx_seq: u64,
|
||||
) -> RpcResult<()>;
|
||||
|
||||
#[method(name = "downloadSegment")]
|
||||
async fn download_segment(
|
||||
&self,
|
||||
@ -23,6 +37,14 @@ pub trait Rpc {
|
||||
end_index: usize,
|
||||
) -> RpcResult<Option<Segment>>;
|
||||
|
||||
#[method(name = "downloadSegmentByTxSeq")]
|
||||
async fn download_segment_by_tx_seq(
|
||||
&self,
|
||||
tx_seq: u64,
|
||||
start_index: usize,
|
||||
end_index: usize,
|
||||
) -> RpcResult<Option<Segment>>;
|
||||
|
||||
#[method(name = "downloadSegmentWithProof")]
|
||||
async fn download_segment_with_proof(
|
||||
&self,
|
||||
@ -30,6 +52,13 @@ pub trait Rpc {
|
||||
index: usize,
|
||||
) -> RpcResult<Option<SegmentWithProof>>;
|
||||
|
||||
#[method(name = "downloadSegmentWithProofByTxSeq")]
|
||||
async fn download_segment_with_proof_by_tx_seq(
|
||||
&self,
|
||||
tx_seq: u64,
|
||||
index: usize,
|
||||
) -> RpcResult<Option<SegmentWithProof>>;
|
||||
|
||||
#[method(name = "checkFileFinalized")]
|
||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
|
||||
|
||||
|
@ -42,6 +42,16 @@ impl RpcServer for RpcServerImpl {
|
||||
self.put_segment(segment).await
|
||||
}
|
||||
|
||||
async fn upload_segment_by_tx_seq(
|
||||
&self,
|
||||
segment: SegmentWithProof,
|
||||
tx_seq: u64,
|
||||
) -> RpcResult<()> {
|
||||
info!(tx_seq = %tx_seq, index = %segment.index, "zgs_uploadSegmentByTxSeq");
|
||||
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
}
|
||||
|
||||
async fn upload_segments(&self, segments: Vec<SegmentWithProof>) -> RpcResult<()> {
|
||||
let root = match segments.first() {
|
||||
None => return Ok(()),
|
||||
@ -57,6 +67,23 @@ impl RpcServer for RpcServerImpl {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn upload_segments_by_tx_seq(
|
||||
&self,
|
||||
segments: Vec<SegmentWithProof>,
|
||||
tx_seq: u64,
|
||||
) -> RpcResult<()> {
|
||||
let indices = SegmentIndexArray::new(&segments);
|
||||
info!(%tx_seq, ?indices, "zgs_uploadSegmentsByTxSeq");
|
||||
|
||||
let maybe_tx = self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?;
|
||||
for segment in segments.into_iter() {
|
||||
self.put_segment_with_maybe_tx(segment, maybe_tx.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_segment(
|
||||
&self,
|
||||
data_root: DataRoot,
|
||||
@ -65,34 +92,26 @@ impl RpcServer for RpcServerImpl {
|
||||
) -> RpcResult<Option<Segment>> {
|
||||
info!(%data_root, %start_index, %end_index, "zgs_downloadSegment");
|
||||
|
||||
if start_index >= end_index {
|
||||
return Err(error::invalid_params("end_index", "invalid chunk index"));
|
||||
}
|
||||
|
||||
if end_index - start_index > self.ctx.config.chunks_per_segment {
|
||||
return Err(error::invalid_params(
|
||||
"end_index",
|
||||
format!(
|
||||
"exceeds maximum chunks {}",
|
||||
self.ctx.config.chunks_per_segment
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let tx_seq = try_option!(
|
||||
self.ctx
|
||||
.log_store
|
||||
.get_tx_seq_by_data_root(&data_root)
|
||||
.await?
|
||||
);
|
||||
let segment = try_option!(
|
||||
self.ctx
|
||||
.log_store
|
||||
.get_chunks_by_tx_and_index_range(tx_seq, start_index, end_index)
|
||||
.await?
|
||||
);
|
||||
|
||||
Ok(Some(Segment(segment.data)))
|
||||
self.get_segment_by_tx_seq(tx_seq, start_index, end_index)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download_segment_by_tx_seq(
|
||||
&self,
|
||||
tx_seq: u64,
|
||||
start_index: usize,
|
||||
end_index: usize,
|
||||
) -> RpcResult<Option<Segment>> {
|
||||
info!(%tx_seq, %start_index, %end_index, "zgs_downloadSegmentByTxSeq");
|
||||
self.get_segment_by_tx_seq(tx_seq, start_index, end_index)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn download_segment_with_proof(
|
||||
@ -104,40 +123,19 @@ impl RpcServer for RpcServerImpl {
|
||||
|
||||
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
|
||||
|
||||
// validate index
|
||||
let chunks_per_segment = self.ctx.config.chunks_per_segment;
|
||||
let (num_segments, last_segment_size) =
|
||||
SegmentWithProof::split_file_into_segments(tx.size as usize, chunks_per_segment)?;
|
||||
self.get_segment_with_proof_by_tx(tx, index).await
|
||||
}
|
||||
|
||||
if index >= num_segments {
|
||||
return Err(error::invalid_params("index", "index out of bound"));
|
||||
}
|
||||
async fn download_segment_with_proof_by_tx_seq(
|
||||
&self,
|
||||
tx_seq: u64,
|
||||
index: usize,
|
||||
) -> RpcResult<Option<SegmentWithProof>> {
|
||||
info!(%tx_seq, %index, "zgs_downloadSegmentWithProofByTxSeq");
|
||||
|
||||
// calculate chunk start and end index
|
||||
let start_index = index * chunks_per_segment;
|
||||
let end_index = if index == num_segments - 1 {
|
||||
// last segment without padding chunks by flow
|
||||
start_index + last_segment_size / CHUNK_SIZE
|
||||
} else {
|
||||
start_index + chunks_per_segment
|
||||
};
|
||||
let tx = try_option!(self.ctx.log_store.get_tx_by_seq_number(tx_seq).await?);
|
||||
|
||||
let segment = try_option!(
|
||||
self.ctx
|
||||
.log_store
|
||||
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index, None)
|
||||
.await?
|
||||
);
|
||||
|
||||
let proof = tx.compute_segment_proof(&segment, chunks_per_segment)?;
|
||||
|
||||
Ok(Some(SegmentWithProof {
|
||||
root: data_root,
|
||||
data: segment.chunks.data,
|
||||
index,
|
||||
proof,
|
||||
file_size: tx.size as usize,
|
||||
}))
|
||||
self.get_segment_with_proof_by_tx(tx, index).await
|
||||
}
|
||||
|
||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
|
||||
@ -277,15 +275,29 @@ impl RpcServerImpl {
|
||||
async fn put_segment(&self, segment: SegmentWithProof) -> RpcResult<()> {
|
||||
debug!(root = %segment.root, index = %segment.index, "putSegment");
|
||||
|
||||
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
|
||||
let maybe_tx = self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_tx_by_data_root(&segment.root)
|
||||
.await?;
|
||||
let mut need_cache = false;
|
||||
|
||||
self.put_segment_with_maybe_tx(segment, maybe_tx).await
|
||||
}
|
||||
|
||||
async fn put_segment_with_maybe_tx(
|
||||
&self,
|
||||
segment: SegmentWithProof,
|
||||
maybe_tx: Option<Transaction>,
|
||||
) -> RpcResult<()> {
|
||||
self.ctx.chunk_pool.validate_segment_size(&segment.data)?;
|
||||
|
||||
if let Some(tx) = &maybe_tx {
|
||||
if tx.data_merkle_root != segment.root {
|
||||
return Err(error::internal_error("data root and tx seq not match"));
|
||||
}
|
||||
}
|
||||
|
||||
let mut need_cache = false;
|
||||
if self
|
||||
.ctx
|
||||
.chunk_pool
|
||||
@ -323,6 +335,77 @@ impl RpcServerImpl {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_segment_by_tx_seq(
|
||||
&self,
|
||||
tx_seq: u64,
|
||||
start_index: usize,
|
||||
end_index: usize,
|
||||
) -> RpcResult<Option<Segment>> {
|
||||
if start_index >= end_index {
|
||||
return Err(error::invalid_params("end_index", "invalid chunk index"));
|
||||
}
|
||||
|
||||
if end_index - start_index > self.ctx.config.chunks_per_segment {
|
||||
return Err(error::invalid_params(
|
||||
"end_index",
|
||||
format!(
|
||||
"exceeds maximum chunks {}",
|
||||
self.ctx.config.chunks_per_segment
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let segment = try_option!(
|
||||
self.ctx
|
||||
.log_store
|
||||
.get_chunks_by_tx_and_index_range(tx_seq, start_index, end_index)
|
||||
.await?
|
||||
);
|
||||
|
||||
Ok(Some(Segment(segment.data)))
|
||||
}
|
||||
|
||||
async fn get_segment_with_proof_by_tx(
|
||||
&self,
|
||||
tx: Transaction,
|
||||
index: usize,
|
||||
) -> RpcResult<Option<SegmentWithProof>> {
|
||||
// validate index
|
||||
let chunks_per_segment = self.ctx.config.chunks_per_segment;
|
||||
let (num_segments, last_segment_size) =
|
||||
SegmentWithProof::split_file_into_segments(tx.size as usize, chunks_per_segment)?;
|
||||
|
||||
if index >= num_segments {
|
||||
return Err(error::invalid_params("index", "index out of bound"));
|
||||
}
|
||||
|
||||
// calculate chunk start and end index
|
||||
let start_index = index * chunks_per_segment;
|
||||
let end_index = if index == num_segments - 1 {
|
||||
// last segment without padding chunks by flow
|
||||
start_index + last_segment_size / CHUNK_SIZE
|
||||
} else {
|
||||
start_index + chunks_per_segment
|
||||
};
|
||||
|
||||
let segment = try_option!(
|
||||
self.ctx
|
||||
.log_store
|
||||
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index, None)
|
||||
.await?
|
||||
);
|
||||
|
||||
let proof = tx.compute_segment_proof(&segment, chunks_per_segment)?;
|
||||
|
||||
Ok(Some(SegmentWithProof {
|
||||
root: tx.data_merkle_root,
|
||||
data: segment.chunks.data,
|
||||
index,
|
||||
proof,
|
||||
file_size: tx.size as usize,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
enum SegmentIndex {
|
||||
|
Loading…
Reference in New Issue
Block a user