@peter/fix pad rear data (#400)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled

* debug log on the padding index

* add node hash rpc

* debug api

* fix data race on seal vs batch

* lint
This commit is contained in:
0g-peterzhb 2025-11-26 16:01:58 +08:00 committed by GitHub
parent b857728660
commit 5a94554799
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 276 additions and 16 deletions

1
Cargo.lock generated
View File

@ -7747,6 +7747,7 @@ name = "storage-async"
version = "0.1.0"
dependencies = [
"anyhow",
"append_merkle",
"backtrace",
"eth2_ssz",
"shared_types",

View File

@ -59,6 +59,12 @@ pub trait Rpc {
index: usize,
) -> RpcResult<Option<SegmentWithProof>>;
#[method(name = "getDataByNodeIndex")]
async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<Vec<u8>>>>;
#[method(name = "getMetaDataByNodeIndex")]
async fn get_meta_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<String>>;
#[method(name = "checkFileFinalized")]
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
@ -82,6 +88,9 @@ pub trait Rpc {
flow_root: Option<DataRoot>,
) -> RpcResult<FlowProof>;
#[method(name = "getHashAtNodeIndex")]
async fn get_hash_at_node_index(&self, node_index: u64) -> RpcResult<Option<H256>>;
#[method(name = "getFlowContext")]
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
}

View File

@ -150,6 +150,38 @@ impl RpcServer for RpcServerImpl {
self.get_segment_with_proof_by_tx(tx, index).await
}
async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<Vec<u8>>>> {
debug!(%node_index, "zgs_getDataByNodeIndex");
// Get the EntryBatch for the given segment/chunk index
let entry_batch = self
.ctx
.log_store
.get_data_by_node_index(node_index)
.await?;
// Convert to unsealed data using the to_unsealed_data() method
Ok(entry_batch.and_then(|batch| {
batch
.to_unsealed_data()
.map(|unsealed_list| unsealed_list.chunks)
}))
}
async fn get_meta_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<String>> {
debug!(%node_index, "zgs_getMetaDataByNodeIndex");
// Get the EntryBatch for the given segment/chunk index
let entry_batch = self
.ctx
.log_store
.get_data_by_node_index(node_index)
.await?;
let res = format!("{:?}", entry_batch);
Ok(Some(res))
}
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
debug!(?tx_seq_or_root, "zgs_checkFileFinalized");
@ -225,6 +257,16 @@ impl RpcServer for RpcServerImpl {
Ok(proof.right_proof)
}
async fn get_hash_at_node_index(&self, node_index: u64) -> RpcResult<Option<H256>> {
debug!(%node_index, "zgs_getHashAtNodeIndex");
let hash = self
.ctx
.log_store
.get_node_hash_by_index(node_index)
.await?;
Ok(hash.0)
}
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
Ok(self.ctx.log_store.get_context().await?)
}

View File

@ -8,6 +8,7 @@ anyhow = { version = "1.0.58", features = ["backtrace"] }
shared_types = { path = "../shared_types" }
storage = { path = "../storage" }
task_executor = { path = "../../common/task_executor" }
append_merkle = { path = "../../common/append_merkle" }
tokio = { version = "1.19.2", features = ["sync"] }
tracing = "0.1.35"
eth2_ssz = "0.4.0"

View File

@ -2,11 +2,13 @@
extern crate tracing;
use anyhow::bail;
use append_merkle::OptionalHash;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
};
use ssz::{Decode, Encode};
use std::sync::Arc;
use storage::log_store::load_chunk::EntryBatch;
use storage::{error, error::Result, log_store::Store as LogStore, H256};
use task_executor::TaskExecutor;
use tokio::sync::oneshot;
@ -57,6 +59,8 @@ impl Store {
delegate!(fn prune_tx(tx_seq: u64) -> Result<()>);
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_node_hash_by_index(index: u64) -> Result<OptionalHash>);
delegate!(fn get_data_by_node_index(index: u64) -> Result<Option<EntryBatch>>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
pub async fn get_tx_seq_by_data_root(

View File

@ -17,7 +17,7 @@ use append_merkle::{
};
use itertools::Itertools;
use kvdb::DBTransaction;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use shared_types::{ChunkArray, DataRoot, FlowProof};
use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
@ -196,6 +196,11 @@ impl FlowRead for FlowStore {
Ok(Some(mine_chunk))
}
fn load_raw_data(&self, chunk_index: u64, _length: u64) -> Result<Option<EntryBatch>> {
let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
Ok(Some(batch))
}
fn get_num_entries(&self) -> Result<u64> {
// This is an over-estimation as it assumes each batch is full.
self.data_db
@ -228,6 +233,7 @@ impl FlowWrite for FlowStore {
if data.data.len() % BYTES_PER_SECTOR != 0 {
bail!("append_entries: invalid data size, len={}", data.data.len());
}
let mut batch_list = Vec::new();
for (start_entry_index, end_entry_index) in batch_iter(
data.start_index,
@ -250,22 +256,28 @@ impl FlowWrite for FlowStore {
.data_db
.get_entry_batch(chunk_index)?
.unwrap_or_else(|| EntryBatch::new(chunk_index));
let completed_seals = batch.insert_data(
let _ = batch.insert_data(
(chunk.start_index % self.config.batch_size as u64) as usize,
chunk.data,
)?;
if self.seal_manager.seal_worker_available() {
completed_seals.into_iter().for_each(|x| {
if self.seal_manager.seal_worker_available() && batch.is_complete() {
for seal_index in 0..SEALS_PER_LOAD {
to_seal_set.insert(
chunk_index as usize * SEALS_PER_LOAD + x as usize,
chunk_index as usize * SEALS_PER_LOAD + seal_index,
self.seal_manager.to_seal_version(),
);
});
}
}
batch_list.push((chunk_index, batch));
}
// print which indexes are being pushed to batch_list
for (chunk_index, _) in &batch_list {
debug!("Preparing to insert chunk at index: {}", chunk_index);
}
metrics::APPEND_ENTRIES.update_since(start_time);
self.data_db.put_entry_batch_list(batch_list)
}
@ -379,20 +391,27 @@ pub struct PadPair {
pub struct FlowDBStore {
kvdb: Arc<dyn ZgsKeyValueDB>,
// Mutex to prevent race condition between put_entry_batch_list and put_entry_raw
write_mutex: Mutex<()>,
}
impl FlowDBStore {
pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Self {
Self { kvdb }
Self {
kvdb,
write_mutex: Mutex::new(()),
}
}
fn put_entry_batch_list(
&self,
batch_list: Vec<(u64, EntryBatch)>,
) -> Result<Vec<(u64, DataRoot)>> {
let _lock = self.write_mutex.lock();
let start_time = Instant::now();
let mut completed_batches = Vec::new();
let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list {
tx.put(
COL_ENTRY_BATCH,
@ -410,6 +429,7 @@ impl FlowDBStore {
}
fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> {
let _lock = self.write_mutex.lock();
let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list {
tx.put(

View File

@ -78,7 +78,7 @@ impl Debug for PartialBatch {
f,
"PartialBatch: start_offset={} data_len={}",
self.start_sector,
self.data.len()
self.data.len(),
)
}
}

View File

@ -13,12 +13,37 @@ use crate::log_store::log_manager::data_to_merkle_leaves;
use crate::try_option;
use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm};
use shared_types::{ChunkArray, DataRoot, Merkle};
use tracing::trace;
use tracing::{debug, trace};
use zgs_spec::{
BYTES_PER_LOAD, BYTES_PER_SEAL, BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD,
SECTORS_PER_SEAL,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnsealedDataList {
pub chunks: Vec<Vec<u8>>,
}
impl UnsealedDataList {
pub fn new() -> Self {
Self { chunks: Vec::new() }
}
pub fn add_chunk(&mut self, data: Vec<u8>) {
self.chunks.push(data);
}
pub fn total_bytes(&self) -> usize {
self.chunks.iter().map(|chunk| chunk.len()).sum()
}
}
impl Default for UnsealedDataList {
fn default() -> Self {
Self::new()
}
}
use super::SealAnswer;
pub use chunk_data::EntryBatchData;
use seal::SealInfo;
@ -41,6 +66,11 @@ impl EntryBatch {
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
/// Check if the batch data is complete (all data has been filled)
pub fn is_complete(&self) -> bool {
matches!(self.data, EntryBatchData::Complete(_))
}
}
impl EntryBatch {
@ -158,6 +188,117 @@ impl EntryBatch {
.collect()
}
/// Convert the entire EntryBatch to unsealed data list
/// This iterates through all seals in the bitmap and unseals sealed data
/// Returns a list of unsealed data chunks instead of concatenating them
pub fn to_unsealed_data(&self) -> Option<UnsealedDataList> {
// Get all known data for this batch
// if current batch is complete, return its data directly
// if current batch is incomplete, then it contains a vec of partial batches
// we need to iterate through all partial batches to know where the known data starts and ends
// then we should read the data from those ranges, unsealing as necessary
match &self.data {
EntryBatchData::Complete(_) => {
// If complete, return the entire unsealed data as a single chunk
let mut result = UnsealedDataList::new();
let mut complete_data = Vec::with_capacity(BYTES_PER_LOAD);
for bit in 0..SEALS_PER_LOAD {
let start_byte = bit * BYTES_PER_SEAL;
if self.seal.is_sealed(bit as u16) {
// If sealed, get the slice, unseal it, and append
let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec();
self.seal.unseal(data_slice.as_mut_slice(), bit as u16);
complete_data.extend_from_slice(&data_slice);
} else {
// If not sealed, directly copy the data
let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?;
complete_data.extend_from_slice(data_slice);
}
}
result.add_chunk(complete_data);
Some(result)
}
EntryBatchData::Incomplete(incomplete_data) => {
// If incomplete, iterate through known partial batches to build the unsealed data
debug!("Building unsealed data from incomplete known data");
let mut result = UnsealedDataList::new();
for partial_batch in &incomplete_data.known_data {
let start_sector = partial_batch.start_sector;
let data_len = partial_batch.data.len();
debug!(
"Processing partial batch: start_sector={} data_len={}",
start_sector, data_len
);
if data_len == 0 {
continue;
}
// Accumulate data for this partial batch
let mut partial_batch_data = Vec::new();
let partial_start_byte = start_sector * BYTES_PER_SECTOR;
let partial_end_byte = partial_start_byte + data_len;
// Calculate which seal this partial batch starts and ends in
let start_seal_index = (start_sector / SECTORS_PER_SEAL) as u16;
let end_seal_index = ((partial_end_byte - 1) / BYTES_PER_SEAL) as u16;
debug!(
"Partial batch spans seals: start_seal_index={} end_seal_index={}",
start_seal_index, end_seal_index
);
debug!(
"Partial batch byte range: {} to {}",
partial_start_byte, partial_end_byte
);
// Iterate through each seal that this partial batch spans
for seal_index in start_seal_index..=end_seal_index {
let seal_start_byte = seal_index as usize * BYTES_PER_SEAL;
let seal_end_byte = seal_start_byte + BYTES_PER_SEAL;
// Check if this seal is full or partial within the known data
let is_full_seal = partial_start_byte <= seal_start_byte
&& partial_end_byte >= seal_end_byte;
debug!(
"Processing seal_start_byte={} seal_end_byte={} is_full_seal={}",
seal_start_byte, seal_end_byte, is_full_seal
);
if is_full_seal && self.seal.is_sealed(seal_index) {
// Full seal and sealed -> unseal and append to partial batch data
let seal_data = self.data.get(seal_start_byte, BYTES_PER_SEAL)?;
let mut unsealed = seal_data.to_vec();
self.seal.unseal(unsealed.as_mut_slice(), seal_index);
partial_batch_data.extend_from_slice(&unsealed);
} else {
// Either partial seal (definitely not sealed) or full but unsealed -> copy overlap and append
let overlap_start = std::cmp::max(partial_start_byte, seal_start_byte);
let overlap_end = std::cmp::min(partial_end_byte, seal_end_byte);
let offset_in_partial = overlap_start - partial_start_byte;
let overlap_len = overlap_end - overlap_start;
let overlap_data = &partial_batch.data
[offset_in_partial..offset_in_partial + overlap_len];
partial_batch_data.extend_from_slice(overlap_data);
}
}
// Add the complete unsealed data for this partial batch as one chunk
result.add_chunk(partial_batch_data);
}
Some(result)
}
}
}
fn truncate_seal(&mut self, truncated_sector: usize) -> Vec<u16> {
let reverted_seal_index = (truncated_sector / SECTORS_PER_SEAL) as u16;

View File

@ -2,6 +2,7 @@ use crate::config::ShardConfig;
use crate::log_store::flow_store::{
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
};
use crate::log_store::load_chunk::EntryBatch;
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus};
use crate::log_store::{
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
@ -564,6 +565,12 @@ impl LogStoreRead for LogManager {
self.tx_store.get_tx_by_seq_number(seq)
}
fn get_node_hash_by_index(&self, index: u64) -> crate::error::Result<OptionalHash> {
let merkle = self.merkle.read();
let opt = merkle.pora_chunks_merkle.leaf_at(index as usize)?;
opt.ok_or_else(|| anyhow!("node hash not found at index {}", index))
}
fn get_tx_seq_by_data_root(
&self,
data_root: &DataRoot,
@ -713,6 +720,10 @@ impl LogStoreRead for LogManager {
self.flow_store.load_sealed_data(chunk_index)
}
fn get_data_by_node_index(&self, start_index: u64) -> crate::error::Result<Option<EntryBatch>> {
self.flow_store.load_raw_data(start_index, 1)
}
fn get_shard_config(&self) -> ShardConfig {
self.flow_store.get_shard_config()
}
@ -1115,8 +1126,14 @@ impl LogManager {
.pora_chunks_merkle
.update_last(merkle.last_chunk_merkle.root());
}
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
debug!("fill leaf for pora_chunks_merkle");
for (chunk_index, chunk_root) in chunk_roots {
debug!(
"fill leaf: chunk_index={}, chunk_root={:?}",
chunk_index, chunk_root
);
if chunk_index < merkle.pora_chunks_merkle.leaves() as u64 {
merkle
.pora_chunks_merkle
@ -1157,8 +1174,8 @@ impl LogManager {
let (segments_for_proof, last_segment_size_for_proof) =
compute_segment_size(chunks, PORA_CHUNK_SIZE);
debug!(
"segments_for_proof: {}, last_segment_size_for_proof: {}",
segments_for_proof, last_segment_size_for_proof
"tx seq: {}, segments_for_proof: {}, last_segment_size_for_proof: {}",
tx.seq, segments_for_proof, last_segment_size_for_proof
);
let chunks_for_file = bytes_to_entries(tx.size) as usize;
@ -1169,6 +1186,16 @@ impl LogManager {
segments_for_file, last_segment_size_for_file
);
// Padding should only start after real data ends
let real_data_end_index =
(segments_for_file - 1) * PORA_CHUNK_SIZE + last_segment_size_for_file;
debug!(
"Padding: real_data_end_index={}, padding_start_segment={}, padding_start_offset={}",
real_data_end_index,
segments_for_file - 1,
last_segment_size_for_file
);
while segments_for_file <= segments_for_proof {
let padding_size = if segments_for_file == segments_for_proof {
(last_segment_size_for_proof - last_segment_size_for_file) * ENTRY_SIZE
@ -1176,7 +1203,17 @@ impl LogManager {
(PORA_CHUNK_SIZE - last_segment_size_for_file) * ENTRY_SIZE
};
debug!("Padding size: {}", padding_size);
let padding_start_index =
((segments_for_file - 1) * PORA_CHUNK_SIZE + last_segment_size_for_file) as u64;
debug!(
"Padding iteration: segment={}, offset={}, padding_size={}, start_index={}",
segments_for_file - 1,
last_segment_size_for_file,
padding_size,
padding_start_index
);
if padding_size > 0 {
// This tx hash is guaranteed to be consistent.
self.put_chunks_with_tx_hash(
@ -1184,9 +1221,7 @@ impl LogManager {
tx.hash(),
ChunkArray {
data: vec![0u8; padding_size],
start_index: ((segments_for_file - 1) * PORA_CHUNK_SIZE
+ last_segment_size_for_file)
as u64,
start_index: padding_start_index,
},
None,
)?;

View File

@ -1,5 +1,6 @@
use crate::config::ShardConfig;
use crate::{config::ShardConfig, log_store::load_chunk::EntryBatch};
use append_merkle::OptionalHash;
use ethereum_types::H256;
use flow_store::PadPair;
use shared_types::{
@ -30,6 +31,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
/// Get a transaction by its global log sequence number.
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
fn get_node_hash_by_index(&self, index: u64) -> Result<OptionalHash>;
/// Get a transaction by the data root of its data.
/// If all txs are not finalized, return the first one if need available is false.
/// Otherwise, return the first finalized tx.
@ -100,6 +103,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
fn get_data_by_node_index(&self, start_index: u64) -> Result<Option<EntryBatch>>;
fn get_shard_config(&self) -> ShardConfig;
}
@ -231,6 +236,8 @@ pub trait FlowRead {
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
fn load_raw_data(&self, chunk_index: u64, length: u64) -> Result<Option<EntryBatch>>;
// An estimation of the number of entries in the flow db.
fn get_num_entries(&self) -> Result<u64>;