debug api

This commit is contained in:
Peter Zhang 2025-11-22 14:34:46 +08:00
parent 3ceda31452
commit 7e27b8cf1a
8 changed files with 64 additions and 20 deletions

View File

@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status};
use jsonrpsee::core::RpcResult;
use jsonrpsee::proc_macros::rpc;
use shared_types::{DataRoot, FlowProof, TxSeqOrRoot};
use storage::{config::ShardConfig, H256};
use storage::{H256, config::ShardConfig};
#[rpc(server, client, namespace = "zgs")]
pub trait Rpc {
@ -59,6 +59,9 @@ pub trait Rpc {
index: usize,
) -> RpcResult<Option<SegmentWithProof>>;
#[method(name = "getDataByNodeIndex")]
async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<u8>>>;
#[method(name = "checkFileFinalized")]
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
@ -89,5 +92,5 @@ pub trait Rpc {
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
#[method(name = "getChunkByIndex")]
async fn get_chunk_by_index(&self, chunk_index: u64) -> RpcResult<Option<Vec<u8>>>;
async fn get_chunk_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<u8>>>;
}

View File

@ -150,6 +150,16 @@ impl RpcServer for RpcServerImpl {
self.get_segment_with_proof_by_tx(tx, index).await
}
async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<u8>>> {
debug!(%node_index, "zgs_getDataByNodeIndex");
// Get the EntryBatch for the given segment/chunk index
let entry_batch = self.ctx.log_store.get_data_by_node_index(node_index).await?;
// Convert to unsealed data using the to_unsealed_data() method
Ok(entry_batch.and_then(|batch| batch.to_unsealed_data()))
}
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
debug!(?tx_seq_or_root, "zgs_checkFileFinalized");
@ -239,23 +249,14 @@ impl RpcServer for RpcServerImpl {
Ok(self.ctx.log_store.get_context().await?)
}
async fn get_chunk_by_index(&self, chunk_index: u64) -> RpcResult<Option<Vec<u8>>> {
debug!(%chunk_index, "zgs_getChunkByIndex");
match self
async fn get_chunk_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<u8>>> {
debug!(%node_index, "zgs_getChunkByNodeIndex");
let chunk = self
.ctx
.log_store
.get_chunk_by_flow_index(chunk_index, 1)
.await?
{
Some(chunk_array) => {
if chunk_array.data.len() > 0 {
Ok(Some(chunk_array.data))
} else {
Ok(None)
}
}
None => Ok(None),
}
.get_chunk_by_flow_index(node_index, 1)
.await?;
Ok(chunk.map(|c| c.data))
}
}

View File

@ -7,6 +7,7 @@ use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
};
use ssz::{Decode, Encode};
use storage::log_store::load_chunk::EntryBatch;
use std::sync::Arc;
use storage::{error, error::Result, log_store::Store as LogStore, H256};
use task_executor::TaskExecutor;
@ -59,6 +60,7 @@ impl Store {
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_node_hash_by_index(index: u64) -> Result<OptionalHash>);
delegate!(fn get_data_by_node_index(index: u64) -> Result<Option<EntryBatch>>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
pub async fn get_tx_seq_by_data_root(

View File

@ -196,6 +196,11 @@ impl FlowRead for FlowStore {
Ok(Some(mine_chunk))
}
fn load_raw_data(&self, chunk_index: u64, _length: u64) -> Result<Option<EntryBatch>> {
let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
Ok(Some(batch))
}
fn get_num_entries(&self) -> Result<u64> {
// This is an over-estimation as it assumes each batch is full.
self.data_db

View File

@ -76,9 +76,10 @@ impl Debug for PartialBatch {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PartialBatch: start_offset={} data_len={}",
"PartialBatch: start_offset={} data_len={} full_data={:?}",
self.start_sector,
self.data.len()
self.data.len(),
self.data
)
}
}

View File

@ -158,6 +158,28 @@ impl EntryBatch {
.collect()
}
/// Convert the entire EntryBatch to unsealed Vec<u8>
/// This iterates through all seals in the bitmap and unseals sealed data
/// Returns the complete raw (unsealed) data for this batch
pub fn to_unsealed_data(&self) -> Option<Vec<u8>> {
// Get all data for this batch
let mut res = Vec::with_capacity(BYTES_PER_LOAD);
for bit in 0..SEALS_PER_LOAD {
let start_byte = bit as usize * BYTES_PER_SEAL;
if self.seal.is_sealed(bit as u16) {
// If sealed, we need to unseal this part
let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec();
self.seal.unseal(data_slice.as_mut_slice(), bit as u16);
res.extend_from_slice(&data_slice);
} else {
// If not sealed, we can directly copy the data
let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?;
res.extend_from_slice(data_slice);
}
}
Some(res)
}
fn truncate_seal(&mut self, truncated_sector: usize) -> Vec<u16> {
let reverted_seal_index = (truncated_sector / SECTORS_PER_SEAL) as u16;

View File

@ -2,6 +2,7 @@ use crate::config::ShardConfig;
use crate::log_store::flow_store::{
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
};
use crate::log_store::load_chunk::EntryBatch;
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus};
use crate::log_store::{
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
@ -719,6 +720,11 @@ impl LogStoreRead for LogManager {
self.flow_store.load_sealed_data(chunk_index)
}
fn get_data_by_node_index(&self, start_index: u64) -> crate::error::Result<Option<EntryBatch>> {
self.flow_store.load_raw_data(start_index, 1)
}
fn get_shard_config(&self) -> ShardConfig {
self.flow_store.get_shard_config()
}

View File

@ -1,4 +1,4 @@
use crate::config::ShardConfig;
use crate::{config::ShardConfig, log_store::load_chunk::EntryBatch};
use append_merkle::OptionalHash;
use ethereum_types::H256;
@ -103,6 +103,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
fn get_data_by_node_index(&self, start_index: u64) -> Result<Option<EntryBatch>>;
fn get_shard_config(&self) -> ShardConfig;
}
@ -234,6 +236,8 @@ pub trait FlowRead {
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
fn load_raw_data(&self, chunk_index: u64, length: u64) -> Result<Option<EntryBatch>>;
// An estimation of the number of entries in the flow db.
fn get_num_entries(&self) -> Result<u64>;