From 5a94554799c83de98450c801b7b20cb883664b9d Mon Sep 17 00:00:00 2001 From: 0g-peterzhb <158457852+0g-peterzhb@users.noreply.github.com> Date: Wed, 26 Nov 2025 16:01:58 +0800 Subject: [PATCH] @peter/fix pad rear data (#400) * debug log on the padding index * add node hash rpc * debug api * fix data race on seal vs batch * lint --- Cargo.lock | 1 + node/rpc/src/zgs/api.rs | 9 ++ node/rpc/src/zgs/impl.rs | 42 +++++ node/storage-async/Cargo.toml | 1 + node/storage-async/src/lib.rs | 4 + node/storage/src/log_store/flow_store.rs | 34 ++++- .../src/log_store/load_chunk/chunk_data.rs | 2 +- node/storage/src/log_store/load_chunk/mod.rs | 143 +++++++++++++++++- node/storage/src/log_store/log_manager.rs | 47 +++++- node/storage/src/log_store/mod.rs | 9 +- 10 files changed, 276 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35a912e..e4ac131 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7747,6 +7747,7 @@ name = "storage-async" version = "0.1.0" dependencies = [ "anyhow", + "append_merkle", "backtrace", "eth2_ssz", "shared_types", diff --git a/node/rpc/src/zgs/api.rs b/node/rpc/src/zgs/api.rs index a922da5..aa13320 100644 --- a/node/rpc/src/zgs/api.rs +++ b/node/rpc/src/zgs/api.rs @@ -59,6 +59,12 @@ pub trait Rpc { index: usize, ) -> RpcResult>; + #[method(name = "getDataByNodeIndex")] + async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>>>; + + #[method(name = "getMetaDataByNodeIndex")] + async fn get_meta_data_by_node_index(&self, node_index: u64) -> RpcResult>; + #[method(name = "checkFileFinalized")] async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult>; @@ -82,6 +88,9 @@ pub trait Rpc { flow_root: Option, ) -> RpcResult; + #[method(name = "getHashAtNodeIndex")] + async fn get_hash_at_node_index(&self, node_index: u64) -> RpcResult>; + #[method(name = "getFlowContext")] async fn get_flow_context(&self) -> RpcResult<(H256, u64)>; } diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index d1440a5..cbeae2e 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -150,6 +150,38 @@ impl RpcServer for RpcServerImpl { self.get_segment_with_proof_by_tx(tx, index).await } + async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult>>> { + debug!(%node_index, "zgs_getDataByNodeIndex"); + + // Get the EntryBatch for the given segment/chunk index + let entry_batch = self + .ctx + .log_store + .get_data_by_node_index(node_index) + .await?; + + // Convert to unsealed data using the to_unsealed_data() method + Ok(entry_batch.and_then(|batch| { + batch + .to_unsealed_data() + .map(|unsealed_list| unsealed_list.chunks) + })) + } + + async fn get_meta_data_by_node_index(&self, node_index: u64) -> RpcResult> { + debug!(%node_index, "zgs_getMetaDataByNodeIndex"); + + // Get the EntryBatch for the given segment/chunk index + let entry_batch = self + .ctx + .log_store + .get_data_by_node_index(node_index) + .await?; + + let res = format!("{:?}", entry_batch); + Ok(Some(res)) + } + async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult> { debug!(?tx_seq_or_root, "zgs_checkFileFinalized"); @@ -225,6 +257,16 @@ impl RpcServer for RpcServerImpl { Ok(proof.right_proof) } + async fn get_hash_at_node_index(&self, node_index: u64) -> RpcResult> { + debug!(%node_index, "zgs_getHashAtNodeIndex"); + let hash = self + .ctx + .log_store + .get_node_hash_by_index(node_index) + .await?; + Ok(hash.0) + } + async fn get_flow_context(&self) -> RpcResult<(H256, u64)> { Ok(self.ctx.log_store.get_context().await?) } diff --git a/node/storage-async/Cargo.toml b/node/storage-async/Cargo.toml index 01a693f..8341b97 100644 --- a/node/storage-async/Cargo.toml +++ b/node/storage-async/Cargo.toml @@ -8,6 +8,7 @@ anyhow = { version = "1.0.58", features = ["backtrace"] } shared_types = { path = "../shared_types" } storage = { path = "../storage" } task_executor = { path = "../../common/task_executor" } +append_merkle = { path = "../../common/append_merkle" } tokio = { version = "1.19.2", features = ["sync"] } tracing = "0.1.35" eth2_ssz = "0.4.0" diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 57de0e2..5cc7938 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -2,11 +2,13 @@ extern crate tracing; use anyhow::bail; +use append_merkle::OptionalHash; use shared_types::{ Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, }; use ssz::{Decode, Encode}; use std::sync::Arc; +use storage::log_store::load_chunk::EntryBatch; use storage::{error, error::Result, log_store::Store as LogStore, H256}; use task_executor::TaskExecutor; use tokio::sync::oneshot; @@ -57,6 +59,8 @@ impl Store { delegate!(fn prune_tx(tx_seq: u64) -> Result<()>); delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result); delegate!(fn get_proof_at_root(root: Option, index: u64, length: u64) -> Result); + delegate!(fn get_node_hash_by_index(index: u64) -> Result); + delegate!(fn get_data_by_node_index(index: u64) -> Result>); delegate!(fn get_context() -> Result<(DataRoot, u64)>); pub async fn get_tx_seq_by_data_root( diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 3d862fa..c82ddd1 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -17,7 +17,7 @@ use append_merkle::{ }; use itertools::Itertools; use kvdb::DBTransaction; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use shared_types::{ChunkArray, DataRoot, FlowProof}; use ssz::{Decode, Encode}; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; @@ -196,6 +196,11 @@ impl FlowRead for FlowStore { Ok(Some(mine_chunk)) } + fn load_raw_data(&self, chunk_index: u64, _length: u64) -> Result> { + let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?); + Ok(Some(batch)) + } + fn get_num_entries(&self) -> Result { // This is an over-estimation as it assumes each batch is full. self.data_db @@ -228,6 +233,7 @@ impl FlowWrite for FlowStore { if data.data.len() % BYTES_PER_SECTOR != 0 { bail!("append_entries: invalid data size, len={}", data.data.len()); } + let mut batch_list = Vec::new(); for (start_entry_index, end_entry_index) in batch_iter( data.start_index, @@ -250,22 +256,28 @@ impl FlowWrite for FlowStore { .data_db .get_entry_batch(chunk_index)? .unwrap_or_else(|| EntryBatch::new(chunk_index)); - let completed_seals = batch.insert_data( + + let _ = batch.insert_data( (chunk.start_index % self.config.batch_size as u64) as usize, chunk.data, )?; - if self.seal_manager.seal_worker_available() { - completed_seals.into_iter().for_each(|x| { + if self.seal_manager.seal_worker_available() && batch.is_complete() { + for seal_index in 0..SEALS_PER_LOAD { to_seal_set.insert( - chunk_index as usize * SEALS_PER_LOAD + x as usize, + chunk_index as usize * SEALS_PER_LOAD + seal_index, self.seal_manager.to_seal_version(), ); - }); + } } batch_list.push((chunk_index, batch)); } + // print which indexes are being pushed to batch_list + for (chunk_index, _) in &batch_list { + debug!("Preparing to insert chunk at index: {}", chunk_index); + } + metrics::APPEND_ENTRIES.update_since(start_time); self.data_db.put_entry_batch_list(batch_list) } @@ -379,20 +391,27 @@ pub struct PadPair { pub struct FlowDBStore { kvdb: Arc, + // Mutex to prevent race condition between put_entry_batch_list and put_entry_raw + write_mutex: Mutex<()>, } impl FlowDBStore { pub fn new(kvdb: Arc) -> Self { - Self { kvdb } + Self { + kvdb, + write_mutex: Mutex::new(()), + } } fn put_entry_batch_list( &self, batch_list: Vec<(u64, EntryBatch)>, ) -> Result> { + let _lock = self.write_mutex.lock(); let start_time = Instant::now(); let mut completed_batches = Vec::new(); let mut tx = self.kvdb.transaction(); + for (batch_index, batch) in batch_list { tx.put( COL_ENTRY_BATCH, @@ -410,6 +429,7 @@ impl FlowDBStore { } fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> { + let _lock = self.write_mutex.lock(); let mut tx = self.kvdb.transaction(); for (batch_index, batch) in batch_list { tx.put( diff --git a/node/storage/src/log_store/load_chunk/chunk_data.rs b/node/storage/src/log_store/load_chunk/chunk_data.rs index 86b22eb..212ff38 100644 --- a/node/storage/src/log_store/load_chunk/chunk_data.rs +++ b/node/storage/src/log_store/load_chunk/chunk_data.rs @@ -78,7 +78,7 @@ impl Debug for PartialBatch { f, "PartialBatch: start_offset={} data_len={}", self.start_sector, - self.data.len() + self.data.len(), ) } } diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index 95c002c..d3ad197 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -13,12 +13,37 @@ use crate::log_store::log_manager::data_to_merkle_leaves; use crate::try_option; use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; use shared_types::{ChunkArray, DataRoot, Merkle}; -use tracing::trace; +use tracing::{debug, trace}; use zgs_spec::{ BYTES_PER_LOAD, BYTES_PER_SEAL, BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL, }; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UnsealedDataList { + pub chunks: Vec>, +} + +impl UnsealedDataList { + pub fn new() -> Self { + Self { chunks: Vec::new() } + } + + pub fn add_chunk(&mut self, data: Vec) { + self.chunks.push(data); + } + + pub fn total_bytes(&self) -> usize { + self.chunks.iter().map(|chunk| chunk.len()).sum() + } +} + +impl Default for UnsealedDataList { + fn default() -> Self { + Self::new() + } +} + use super::SealAnswer; pub use chunk_data::EntryBatchData; use seal::SealInfo; @@ -41,6 +66,11 @@ impl EntryBatch { pub fn is_empty(&self) -> bool { self.data.is_empty() } + + /// Check if the batch data is complete (all data has been filled) + pub fn is_complete(&self) -> bool { + matches!(self.data, EntryBatchData::Complete(_)) + } } impl EntryBatch { @@ -158,6 +188,117 @@ impl EntryBatch { .collect() } + /// Convert the entire EntryBatch to unsealed data list + /// This iterates through all seals in the bitmap and unseals sealed data + /// Returns a list of unsealed data chunks instead of concatenating them + pub fn to_unsealed_data(&self) -> Option { + // Get all known data for this batch + // if current batch is complete, return its data directly + // if current batch is incomplete, then it contains a vec of partial batches + // we need to iterate through all partial batches to know where the known data starts and ends + // then we should read the data from those ranges, unsealing as necessary + + match &self.data { + EntryBatchData::Complete(_) => { + // If complete, return the entire unsealed data as a single chunk + let mut result = UnsealedDataList::new(); + let mut complete_data = Vec::with_capacity(BYTES_PER_LOAD); + + for bit in 0..SEALS_PER_LOAD { + let start_byte = bit * BYTES_PER_SEAL; + if self.seal.is_sealed(bit as u16) { + // If sealed, get the slice, unseal it, and append + let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec(); + self.seal.unseal(data_slice.as_mut_slice(), bit as u16); + complete_data.extend_from_slice(&data_slice); + } else { + // If not sealed, directly copy the data + let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?; + complete_data.extend_from_slice(data_slice); + } + } + + result.add_chunk(complete_data); + Some(result) + } + EntryBatchData::Incomplete(incomplete_data) => { + // If incomplete, iterate through known partial batches to build the unsealed data + debug!("Building unsealed data from incomplete known data"); + let mut result = UnsealedDataList::new(); + for partial_batch in &incomplete_data.known_data { + let start_sector = partial_batch.start_sector; + let data_len = partial_batch.data.len(); + + debug!( + "Processing partial batch: start_sector={} data_len={}", + start_sector, data_len + ); + + if data_len == 0 { + continue; + } + + // Accumulate data for this partial batch + let mut partial_batch_data = Vec::new(); + + let partial_start_byte = start_sector * BYTES_PER_SECTOR; + let partial_end_byte = partial_start_byte + data_len; + + // Calculate which seal this partial batch starts and ends in + let start_seal_index = (start_sector / SECTORS_PER_SEAL) as u16; + let end_seal_index = ((partial_end_byte - 1) / BYTES_PER_SEAL) as u16; + + debug!( + "Partial batch spans seals: start_seal_index={} end_seal_index={}", + start_seal_index, end_seal_index + ); + + debug!( + "Partial batch byte range: {} to {}", + partial_start_byte, partial_end_byte + ); + + // Iterate through each seal that this partial batch spans + for seal_index in start_seal_index..=end_seal_index { + let seal_start_byte = seal_index as usize * BYTES_PER_SEAL; + let seal_end_byte = seal_start_byte + BYTES_PER_SEAL; + + // Check if this seal is full or partial within the known data + let is_full_seal = partial_start_byte <= seal_start_byte + && partial_end_byte >= seal_end_byte; + + debug!( + "Processing seal_start_byte={} seal_end_byte={} is_full_seal={}", + seal_start_byte, seal_end_byte, is_full_seal + ); + + if is_full_seal && self.seal.is_sealed(seal_index) { + // Full seal and sealed -> unseal and append to partial batch data + let seal_data = self.data.get(seal_start_byte, BYTES_PER_SEAL)?; + let mut unsealed = seal_data.to_vec(); + self.seal.unseal(unsealed.as_mut_slice(), seal_index); + partial_batch_data.extend_from_slice(&unsealed); + } else { + // Either partial seal (definitely not sealed) or full but unsealed -> copy overlap and append + let overlap_start = std::cmp::max(partial_start_byte, seal_start_byte); + let overlap_end = std::cmp::min(partial_end_byte, seal_end_byte); + let offset_in_partial = overlap_start - partial_start_byte; + let overlap_len = overlap_end - overlap_start; + + let overlap_data = &partial_batch.data + [offset_in_partial..offset_in_partial + overlap_len]; + partial_batch_data.extend_from_slice(overlap_data); + } + } + + // Add the complete unsealed data for this partial batch as one chunk + result.add_chunk(partial_batch_data); + } + Some(result) + } + } + } + fn truncate_seal(&mut self, truncated_sector: usize) -> Vec { let reverted_seal_index = (truncated_sector / SECTORS_PER_SEAL) as u16; diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 921819a..ecfa969 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -2,6 +2,7 @@ use crate::config::ShardConfig; use crate::log_store::flow_store::{ batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair, }; +use crate::log_store::load_chunk::EntryBatch; use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus}; use crate::log_store::{ FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, @@ -564,6 +565,12 @@ impl LogStoreRead for LogManager { self.tx_store.get_tx_by_seq_number(seq) } + fn get_node_hash_by_index(&self, index: u64) -> crate::error::Result { + let merkle = self.merkle.read(); + let opt = merkle.pora_chunks_merkle.leaf_at(index as usize)?; + opt.ok_or_else(|| anyhow!("node hash not found at index {}", index)) + } + fn get_tx_seq_by_data_root( &self, data_root: &DataRoot, @@ -713,6 +720,10 @@ impl LogStoreRead for LogManager { self.flow_store.load_sealed_data(chunk_index) } + fn get_data_by_node_index(&self, start_index: u64) -> crate::error::Result> { + self.flow_store.load_raw_data(start_index, 1) + } + fn get_shard_config(&self) -> ShardConfig { self.flow_store.get_shard_config() } @@ -1115,8 +1126,14 @@ impl LogManager { .pora_chunks_merkle .update_last(merkle.last_chunk_merkle.root()); } + let chunk_roots = self.flow_store.append_entries(flow_entry_array)?; + debug!("fill leaf for pora_chunks_merkle"); for (chunk_index, chunk_root) in chunk_roots { + debug!( + "fill leaf: chunk_index={}, chunk_root={:?}", + chunk_index, chunk_root + ); if chunk_index < merkle.pora_chunks_merkle.leaves() as u64 { merkle .pora_chunks_merkle @@ -1157,8 +1174,8 @@ impl LogManager { let (segments_for_proof, last_segment_size_for_proof) = compute_segment_size(chunks, PORA_CHUNK_SIZE); debug!( - "segments_for_proof: {}, last_segment_size_for_proof: {}", - segments_for_proof, last_segment_size_for_proof + "tx seq: {}, segments_for_proof: {}, last_segment_size_for_proof: {}", + tx.seq, segments_for_proof, last_segment_size_for_proof ); let chunks_for_file = bytes_to_entries(tx.size) as usize; @@ -1169,6 +1186,16 @@ impl LogManager { segments_for_file, last_segment_size_for_file ); + // Padding should only start after real data ends + let real_data_end_index = + (segments_for_file - 1) * PORA_CHUNK_SIZE + last_segment_size_for_file; + debug!( + "Padding: real_data_end_index={}, padding_start_segment={}, padding_start_offset={}", + real_data_end_index, + segments_for_file - 1, + last_segment_size_for_file + ); + while segments_for_file <= segments_for_proof { let padding_size = if segments_for_file == segments_for_proof { (last_segment_size_for_proof - last_segment_size_for_file) * ENTRY_SIZE @@ -1176,7 +1203,17 @@ impl LogManager { (PORA_CHUNK_SIZE - last_segment_size_for_file) * ENTRY_SIZE }; - debug!("Padding size: {}", padding_size); + let padding_start_index = + ((segments_for_file - 1) * PORA_CHUNK_SIZE + last_segment_size_for_file) as u64; + + debug!( + "Padding iteration: segment={}, offset={}, padding_size={}, start_index={}", + segments_for_file - 1, + last_segment_size_for_file, + padding_size, + padding_start_index + ); + if padding_size > 0 { // This tx hash is guaranteed to be consistent. self.put_chunks_with_tx_hash( @@ -1184,9 +1221,7 @@ impl LogManager { tx.hash(), ChunkArray { data: vec![0u8; padding_size], - start_index: ((segments_for_file - 1) * PORA_CHUNK_SIZE - + last_segment_size_for_file) - as u64, + start_index: padding_start_index, }, None, )?; diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 593f6b3..06b24b8 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -1,5 +1,6 @@ -use crate::config::ShardConfig; +use crate::{config::ShardConfig, log_store::load_chunk::EntryBatch}; +use append_merkle::OptionalHash; use ethereum_types::H256; use flow_store::PadPair; use shared_types::{ @@ -30,6 +31,8 @@ pub trait LogStoreRead: LogStoreChunkRead { /// Get a transaction by its global log sequence number. fn get_tx_by_seq_number(&self, seq: u64) -> Result>; + fn get_node_hash_by_index(&self, index: u64) -> Result; + /// Get a transaction by the data root of its data. /// If all txs are not finalized, return the first one if need available is false. /// Otherwise, return the first finalized tx. @@ -100,6 +103,8 @@ pub trait LogStoreRead: LogStoreChunkRead { fn load_sealed_data(&self, chunk_index: u64) -> Result>; + fn get_data_by_node_index(&self, start_index: u64) -> Result>; + fn get_shard_config(&self) -> ShardConfig; } @@ -231,6 +236,8 @@ pub trait FlowRead { fn load_sealed_data(&self, chunk_index: u64) -> Result>; + fn load_raw_data(&self, chunk_index: u64, length: u64) -> Result>; + // An estimation of the number of entries in the flow db. fn get_num_entries(&self) -> Result;