From bc21520ec5c02e796c7720359e3f8c8c6c79ffd0 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 22 Nov 2025 19:17:51 +0800 Subject: [PATCH] debug api --- node/rpc/src/zgs/api.rs | 4 +- node/rpc/src/zgs/impl.rs | 16 ++- node/storage-async/src/lib.rs | 2 +- node/storage/src/log_store/flow_store.rs | 6 + .../src/log_store/load_chunk/chunk_data.rs | 3 +- node/storage/src/log_store/load_chunk/mod.rs | 112 +++++++++++++++--- node/storage/src/log_store/log_manager.rs | 6 +- 7 files changed, 121 insertions(+), 28 deletions(-) diff --git a/node/rpc/src/zgs/api.rs b/node/rpc/src/zgs/api.rs index 247054e..e346fc2 100644 --- a/node/rpc/src/zgs/api.rs +++ b/node/rpc/src/zgs/api.rs @@ -2,7 +2,7 @@ use crate::types::{FileInfo, Segment, SegmentWithProof, Status}; use jsonrpsee::core::RpcResult; use jsonrpsee::proc_macros::rpc; use shared_types::{DataRoot, FlowProof, TxSeqOrRoot}; -use storage::{H256, config::ShardConfig}; +use storage::{config::ShardConfig, H256}; #[rpc(server, client, namespace = "zgs")] pub trait Rpc { @@ -60,7 +60,7 @@ pub trait Rpc { ) -> RpcResult>; #[method(name = "getDataByNodeIndex")] - async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>>; + async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>>>; #[method(name = "checkFileFinalized")] async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult>; diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 30b026d..e25c97a 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -150,14 +150,22 @@ impl RpcServer for RpcServerImpl { self.get_segment_with_proof_by_tx(tx, index).await } - async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>> { + async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>>> { debug!(%node_index, "zgs_getDataByNodeIndex"); // Get the EntryBatch for the given segment/chunk index - let entry_batch = self.ctx.log_store.get_data_by_node_index(node_index).await?; - + let entry_batch = self + .ctx + .log_store + .get_data_by_node_index(node_index) + .await?; + // Convert to unsealed data using the to_unsealed_data() method - Ok(entry_batch.and_then(|batch| batch.to_unsealed_data())) + Ok(entry_batch.and_then(|batch| { + batch + .to_unsealed_data() + .map(|unsealed_list| unsealed_list.chunks) + })) } async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult> { diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 146bbdd..5cc7938 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -7,8 +7,8 @@ use shared_types::{ Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, }; use ssz::{Decode, Encode}; -use storage::log_store::load_chunk::EntryBatch; use std::sync::Arc; +use storage::log_store::load_chunk::EntryBatch; use storage::{error, error::Result, log_store::Store as LogStore, H256}; use task_executor::TaskExecutor; use tokio::sync::oneshot; diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 6b3964f..a492863 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -243,6 +243,9 @@ impl FlowWrite for FlowStore { "Entry batch data at 12288 before insert: {:?}", batch_data_12288 ); + if let Some(unsealed_data) = batch_data_12288.to_unsealed_data() { + debug!("Unsealed data before insert: {:?}", unsealed_data); + } let mut batch_list = Vec::new(); for (start_entry_index, end_entry_index) in batch_iter( @@ -300,6 +303,9 @@ impl FlowWrite for FlowStore { "Entry batch data at 12288 after insert: {:?}", batch_data_12288 ); + if let Some(unsealed_data) = batch_data_12288.to_unsealed_data() { + debug!("Unsealed data after insert: {:?}", unsealed_data); + } res } diff --git a/node/storage/src/log_store/load_chunk/chunk_data.rs b/node/storage/src/log_store/load_chunk/chunk_data.rs index 192293c..212ff38 100644 --- a/node/storage/src/log_store/load_chunk/chunk_data.rs +++ b/node/storage/src/log_store/load_chunk/chunk_data.rs @@ -76,10 +76,9 @@ impl Debug for PartialBatch { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "PartialBatch: start_offset={} data_len={} full_data={:?}", + "PartialBatch: start_offset={} data_len={}", self.start_sector, self.data.len(), - self.data ) } } diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index b6ce7a2..ae74d8e 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -19,6 +19,25 @@ use zgs_spec::{ SECTORS_PER_SEAL, }; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UnsealedDataList { + pub chunks: Vec>, +} + +impl UnsealedDataList { + pub fn new() -> Self { + Self { chunks: Vec::new() } + } + + pub fn add_chunk(&mut self, data: Vec) { + self.chunks.push(data); + } + + pub fn total_bytes(&self) -> usize { + self.chunks.iter().map(|chunk| chunk.len()).sum() + } +} + use super::SealAnswer; pub use chunk_data::EntryBatchData; use seal::SealInfo; @@ -158,26 +177,85 @@ impl EntryBatch { .collect() } - /// Convert the entire EntryBatch to unsealed Vec + /// Convert the entire EntryBatch to unsealed data list /// This iterates through all seals in the bitmap and unseals sealed data - /// Returns the complete raw (unsealed) data for this batch - pub fn to_unsealed_data(&self) -> Option> { - // Get all data for this batch - let mut res = Vec::with_capacity(BYTES_PER_LOAD); - for bit in 0..SEALS_PER_LOAD { - let start_byte = bit as usize * BYTES_PER_SEAL; - if self.seal.is_sealed(bit as u16) { - // If sealed, we need to unseal this part - let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec(); - self.seal.unseal(data_slice.as_mut_slice(), bit as u16); - res.extend_from_slice(&data_slice); - } else { - // If not sealed, we can directly copy the data - let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?; - res.extend_from_slice(data_slice); + /// Returns a list of unsealed data chunks instead of concatenating them + pub fn to_unsealed_data(&self) -> Option { + // Get all known data for this batch + // if current batch is complete, return its data directly + // if current batch is incomplete, then it contains a vec of partial batches + // we need to iterate through all partial batches to know where the known data starts and ends + // then we should read the data from those ranges, unsealing as necessary + + match &self.data { + EntryBatchData::Complete(_) => { + // If complete, iterate through all seals and unseal as necessary + let mut result = UnsealedDataList::new(); + for bit in 0..SEALS_PER_LOAD { + let start_byte = bit as usize * BYTES_PER_SEAL; + if self.seal.is_sealed(bit as u16) { + // If sealed, get the slice, unseal it, and add as separate chunk + let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec(); + self.seal.unseal(data_slice.as_mut_slice(), bit as u16); + result.add_chunk(data_slice); + } else { + // If not sealed, directly copy the data as separate chunk + let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec(); + result.add_chunk(data_slice); + } + } + Some(result) + } + EntryBatchData::Incomplete(incomplete_data) => { + // If incomplete, iterate through known partial batches to build the unsealed data + let mut result = UnsealedDataList::new(); + for partial_batch in &incomplete_data.known_data { + let start_sector = partial_batch.start_sector; + let data_len = partial_batch.data.len(); + + if data_len == 0 { + continue; + } + + let partial_start_byte = start_sector * BYTES_PER_SECTOR; + let partial_end_byte = partial_start_byte + data_len; + + // Calculate which seal this partial batch starts and ends in + let start_seal_index = (start_sector / SECTORS_PER_SEAL) as u16; + let end_seal_index = ((partial_end_byte - 1) / BYTES_PER_SEAL) as u16; + + // Iterate through each seal that this partial batch spans + for seal_index in start_seal_index..=end_seal_index { + let seal_start_byte = seal_index as usize * BYTES_PER_SEAL; + let seal_end_byte = seal_start_byte + BYTES_PER_SEAL; + + // Check if this seal is full or partial within the known data + let is_full_seal = partial_start_byte <= seal_start_byte + && partial_end_byte >= seal_end_byte; + + if is_full_seal && self.seal.is_sealed(seal_index) { + // Full seal and sealed -> unseal and add as separate chunk + let seal_data = self.data.get(seal_start_byte, BYTES_PER_SEAL)?; + let mut unsealed = seal_data.to_vec(); + self.seal.unseal(unsealed.as_mut_slice(), seal_index); + result.add_chunk(unsealed); + } else { + // Either partial seal (definitely not sealed) or full but unsealed -> copy overlap as separate chunk + let overlap_start = std::cmp::max(partial_start_byte, seal_start_byte); + let overlap_end = std::cmp::min(partial_end_byte, seal_end_byte); + let offset_in_partial = overlap_start - partial_start_byte; + let overlap_len = overlap_end - overlap_start; + + let overlap_data = partial_batch.data + [offset_in_partial..offset_in_partial + overlap_len] + .to_vec(); + result.add_chunk(overlap_data); + } + } + } + Some(result) } } - Some(res) } fn truncate_seal(&mut self, truncated_sector: usize) -> Vec { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 7611cdc..ecfa969 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -721,7 +721,6 @@ impl LogStoreRead for LogManager { } fn get_data_by_node_index(&self, start_index: u64) -> crate::error::Result> { - self.flow_store.load_raw_data(start_index, 1) } @@ -1131,7 +1130,10 @@ impl LogManager { let chunk_roots = self.flow_store.append_entries(flow_entry_array)?; debug!("fill leaf for pora_chunks_merkle"); for (chunk_index, chunk_root) in chunk_roots { - debug!("fill leaf: chunk_index={}, chunk_root={:?}", chunk_index, chunk_root); + debug!( + "fill leaf: chunk_index={}, chunk_root={:?}", + chunk_index, chunk_root + ); if chunk_index < merkle.pora_chunks_merkle.leaves() as u64 { merkle .pora_chunks_merkle