From 7e27b8cf1a3c2910b85b24f5622d1f6701ec5801 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 22 Nov 2025 14:34:46 +0800 Subject: [PATCH] debug api --- node/rpc/src/zgs/api.rs | 7 +++-- node/rpc/src/zgs/impl.rs | 31 ++++++++++--------- node/storage-async/src/lib.rs | 2 ++ node/storage/src/log_store/flow_store.rs | 5 +++ .../src/log_store/load_chunk/chunk_data.rs | 5 +-- node/storage/src/log_store/load_chunk/mod.rs | 22 +++++++++++++ node/storage/src/log_store/log_manager.rs | 6 ++++ node/storage/src/log_store/mod.rs | 6 +++- 8 files changed, 64 insertions(+), 20 deletions(-) diff --git a/node/rpc/src/zgs/api.rs b/node/rpc/src/zgs/api.rs index a3b0ac1..247054e 100644 --- a/node/rpc/src/zgs/api.rs +++ b/node/rpc/src/zgs/api.rs @@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status}; use jsonrpsee::core::RpcResult; use jsonrpsee::proc_macros::rpc; use shared_types::{DataRoot, FlowProof, TxSeqOrRoot}; -use storage::{config::ShardConfig, H256}; +use storage::{H256, config::ShardConfig}; #[rpc(server, client, namespace = "zgs")] pub trait Rpc { @@ -59,6 +59,9 @@ pub trait Rpc { index: usize, ) -> RpcResult>; + #[method(name = "getDataByNodeIndex")] + async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>>; + #[method(name = "checkFileFinalized")] async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult>; @@ -89,5 +92,5 @@ pub trait Rpc { async fn get_flow_context(&self) -> RpcResult<(H256, u64)>; #[method(name = "getChunkByIndex")] - async fn get_chunk_by_index(&self, chunk_index: u64) -> RpcResult>>; + async fn get_chunk_by_node_index(&self, node_index: u64) -> RpcResult>>; } diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 60b5eab..30b026d 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -150,6 +150,16 @@ impl RpcServer for RpcServerImpl { self.get_segment_with_proof_by_tx(tx, index).await } + async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>> { + debug!(%node_index, "zgs_getDataByNodeIndex"); + + // Get the EntryBatch for the given segment/chunk index + let entry_batch = self.ctx.log_store.get_data_by_node_index(node_index).await?; + + // Convert to unsealed data using the to_unsealed_data() method + Ok(entry_batch.and_then(|batch| batch.to_unsealed_data())) + } + async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult> { debug!(?tx_seq_or_root, "zgs_checkFileFinalized"); @@ -239,23 +249,14 @@ impl RpcServer for RpcServerImpl { Ok(self.ctx.log_store.get_context().await?) } - async fn get_chunk_by_index(&self, chunk_index: u64) -> RpcResult>> { - debug!(%chunk_index, "zgs_getChunkByIndex"); - match self + async fn get_chunk_by_node_index(&self, node_index: u64) -> RpcResult>> { + debug!(%node_index, "zgs_getChunkByNodeIndex"); + let chunk = self .ctx .log_store - .get_chunk_by_flow_index(chunk_index, 1) - .await? - { - Some(chunk_array) => { - if chunk_array.data.len() > 0 { - Ok(Some(chunk_array.data)) - } else { - Ok(None) - } - } - None => Ok(None), - } + .get_chunk_by_flow_index(node_index, 1) + .await?; + Ok(chunk.map(|c| c.data)) } } diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 989b748..146bbdd 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -7,6 +7,7 @@ use shared_types::{ Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, }; use ssz::{Decode, Encode}; +use storage::log_store::load_chunk::EntryBatch; use std::sync::Arc; use storage::{error, error::Result, log_store::Store as LogStore, H256}; use task_executor::TaskExecutor; @@ -59,6 +60,7 @@ impl Store { delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result); delegate!(fn get_proof_at_root(root: Option, index: u64, length: u64) -> Result); delegate!(fn get_node_hash_by_index(index: u64) -> Result); + delegate!(fn get_data_by_node_index(index: u64) -> Result>); delegate!(fn get_context() -> Result<(DataRoot, u64)>); pub async fn get_tx_seq_by_data_root( diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index cb1b248..6b3964f 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -196,6 +196,11 @@ impl FlowRead for FlowStore { Ok(Some(mine_chunk)) } + fn load_raw_data(&self, chunk_index: u64, _length: u64) -> Result> { + let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?); + Ok(Some(batch)) + } + fn get_num_entries(&self) -> Result { // This is an over-estimation as it assumes each batch is full. self.data_db diff --git a/node/storage/src/log_store/load_chunk/chunk_data.rs b/node/storage/src/log_store/load_chunk/chunk_data.rs index 86b22eb..192293c 100644 --- a/node/storage/src/log_store/load_chunk/chunk_data.rs +++ b/node/storage/src/log_store/load_chunk/chunk_data.rs @@ -76,9 +76,10 @@ impl Debug for PartialBatch { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "PartialBatch: start_offset={} data_len={}", + "PartialBatch: start_offset={} data_len={} full_data={:?}", self.start_sector, - self.data.len() + self.data.len(), + self.data ) } } diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index 95c002c..b6ce7a2 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -158,6 +158,28 @@ impl EntryBatch { .collect() } + /// Convert the entire EntryBatch to unsealed Vec + /// This iterates through all seals in the bitmap and unseals sealed data + /// Returns the complete raw (unsealed) data for this batch + pub fn to_unsealed_data(&self) -> Option> { + // Get all data for this batch + let mut res = Vec::with_capacity(BYTES_PER_LOAD); + for bit in 0..SEALS_PER_LOAD { + let start_byte = bit as usize * BYTES_PER_SEAL; + if self.seal.is_sealed(bit as u16) { + // If sealed, we need to unseal this part + let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec(); + self.seal.unseal(data_slice.as_mut_slice(), bit as u16); + res.extend_from_slice(&data_slice); + } else { + // If not sealed, we can directly copy the data + let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?; + res.extend_from_slice(data_slice); + } + } + Some(res) + } + fn truncate_seal(&mut self, truncated_sector: usize) -> Vec { let reverted_seal_index = (truncated_sector / SECTORS_PER_SEAL) as u16; diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 02058b9..7611cdc 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -2,6 +2,7 @@ use crate::config::ShardConfig; use crate::log_store::flow_store::{ batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair, }; +use crate::log_store::load_chunk::EntryBatch; use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus}; use crate::log_store::{ FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, @@ -719,6 +720,11 @@ impl LogStoreRead for LogManager { self.flow_store.load_sealed_data(chunk_index) } + fn get_data_by_node_index(&self, start_index: u64) -> crate::error::Result> { + + self.flow_store.load_raw_data(start_index, 1) + } + fn get_shard_config(&self) -> ShardConfig { self.flow_store.get_shard_config() } diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 4ac1b77..06b24b8 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -1,4 +1,4 @@ -use crate::config::ShardConfig; +use crate::{config::ShardConfig, log_store::load_chunk::EntryBatch}; use append_merkle::OptionalHash; use ethereum_types::H256; @@ -103,6 +103,8 @@ pub trait LogStoreRead: LogStoreChunkRead { fn load_sealed_data(&self, chunk_index: u64) -> Result>; + fn get_data_by_node_index(&self, start_index: u64) -> Result>; + fn get_shard_config(&self) -> ShardConfig; } @@ -234,6 +236,8 @@ pub trait FlowRead { fn load_sealed_data(&self, chunk_index: u64) -> Result>; + fn load_raw_data(&self, chunk_index: u64, length: u64) -> Result>; + // An estimation of the number of entries in the flow db. fn get_num_entries(&self) -> Result;