debug api

This commit is contained in:
Peter Zhang 2025-11-23 17:09:01 +08:00
parent de620eee09
commit 1117cd65e9
2 changed files with 55 additions and 60 deletions

View File

@ -21,7 +21,6 @@ use parking_lot::{Mutex, RwLock};
use shared_types::{ChunkArray, DataRoot, FlowProof};
use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@ -420,53 +419,25 @@ pub struct PadPair {
pub struct FlowDBStore {
kvdb: Arc<dyn ZgsKeyValueDB>,
// Per-batch mutexes to prevent race conditions on individual batches
batch_mutexes: RwLock<HashMap<u64, Arc<Mutex<()>>>>,
// Mutex to prevent race condition between put_entry_batch_list and put_entry_raw
write_mutex: Mutex<()>,
}
impl FlowDBStore {
pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Self {
Self {
kvdb,
batch_mutexes: RwLock::new(HashMap::new()),
write_mutex: Mutex::new(()),
}
}
fn get_batch_mutex(&self, batch_index: u64) -> Arc<Mutex<()>> {
let read_guard = self.batch_mutexes.read();
if let Some(mutex) = read_guard.get(&batch_index) {
return mutex.clone();
}
drop(read_guard);
let mut write_guard = self.batch_mutexes.write();
// Double-check pattern
if let Some(mutex) = write_guard.get(&batch_index) {
return mutex.clone();
}
let mutex = Arc::new(Mutex::new(()));
write_guard.insert(batch_index, mutex.clone());
mutex
}
fn put_entry_batch_list(
&self,
batch_list: Vec<(u64, EntryBatch)>,
) -> Result<Vec<(u64, DataRoot)>> {
let _lock = self.write_mutex.lock();
let start_time = Instant::now();
let mut completed_batches = Vec::new();
// Collect all mutexes and locks first to avoid deadlocks
let batch_mutexes: Vec<_> = batch_list
.iter()
.map(|(batch_index, _)| (*batch_index, self.get_batch_mutex(*batch_index)))
.collect();
let _locks: Vec<_> = batch_mutexes
.iter()
.map(|(_, mutex)| mutex.lock())
.collect();
let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list {
@ -481,23 +452,12 @@ impl FlowDBStore {
}
}
self.kvdb.write(tx)?;
// Locks are dropped here
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
Ok(completed_batches)
}
fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> {
// Collect all mutexes and locks first to avoid deadlocks
let batch_mutexes: Vec<_> = batch_list
.iter()
.map(|(batch_index, _)| (*batch_index, self.get_batch_mutex(*batch_index)))
.collect();
let _locks: Vec<_> = batch_mutexes
.iter()
.map(|(_, mutex)| mutex.lock())
.collect();
let _lock = self.write_mutex.lock();
let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list {
tx.put(
@ -507,7 +467,6 @@ impl FlowDBStore {
);
}
self.kvdb.write(tx)?;
// Locks are dropped here
Ok(())
}

View File

@ -13,7 +13,7 @@ use crate::log_store::log_manager::data_to_merkle_leaves;
use crate::try_option;
use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm};
use shared_types::{ChunkArray, DataRoot, Merkle};
use tracing::trace;
use tracing::{debug, trace};
use zgs_spec::{
BYTES_PER_LOAD, BYTES_PER_SEAL, BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD,
SECTORS_PER_SEAL,
@ -189,34 +189,49 @@ impl EntryBatch {
match &self.data {
EntryBatchData::Complete(_) => {
// If complete, iterate through all seals and unseal as necessary
// If complete, return the entire unsealed data as a single chunk
let mut result = UnsealedDataList::new();
let mut complete_data = Vec::with_capacity(BYTES_PER_LOAD);
for bit in 0..SEALS_PER_LOAD {
let start_byte = bit as usize * BYTES_PER_SEAL;
if self.seal.is_sealed(bit as u16) {
// If sealed, get the slice, unseal it, and add as separate chunk
// If sealed, get the slice, unseal it, and append
let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec();
self.seal.unseal(data_slice.as_mut_slice(), bit as u16);
result.add_chunk(data_slice);
complete_data.extend_from_slice(&data_slice);
} else {
// If not sealed, directly copy the data as separate chunk
let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec();
result.add_chunk(data_slice);
// If not sealed, directly copy the data
let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?;
complete_data.extend_from_slice(data_slice);
}
}
result.add_chunk(complete_data);
Some(result)
}
EntryBatchData::Incomplete(incomplete_data) => {
// If incomplete, iterate through known partial batches to build the unsealed data
debug!("Building unsealed data from incomplete known data");
let mut result = UnsealedDataList::new();
for partial_batch in &incomplete_data.known_data {
let start_sector = partial_batch.start_sector;
let data_len = partial_batch.data.len();
debug!(
"Processing partial batch: start_sector={} data_len={}",
start_sector,
data_len
);
if data_len == 0 {
continue;
}
// Accumulate data for this partial batch
let mut partial_batch_data = Vec::new();
let partial_start_byte = start_sector * BYTES_PER_SECTOR;
let partial_end_byte = partial_start_byte + data_len;
@ -224,6 +239,18 @@ impl EntryBatch {
let start_seal_index = (start_sector / SECTORS_PER_SEAL) as u16;
let end_seal_index = ((partial_end_byte - 1) / BYTES_PER_SEAL) as u16;
debug!(
"Partial batch spans seals: start_seal_index={} end_seal_index={}",
start_seal_index,
end_seal_index
);
debug!(
"Partial batch byte range: {} to {}",
partial_start_byte,
partial_end_byte
);
// Iterate through each seal that this partial batch spans
for seal_index in start_seal_index..=end_seal_index {
let seal_start_byte = seal_index as usize * BYTES_PER_SEAL;
@ -233,25 +260,34 @@ impl EntryBatch {
let is_full_seal = partial_start_byte <= seal_start_byte
&& partial_end_byte >= seal_end_byte;
debug!(
"Processing seal_start_byte={} seal_end_byte={} is_full_seal={}",
seal_start_byte,
seal_end_byte,
is_full_seal
);
if is_full_seal && self.seal.is_sealed(seal_index) {
// Full seal and sealed -> unseal and add as separate chunk
// Full seal and sealed -> unseal and append to partial batch data
let seal_data = self.data.get(seal_start_byte, BYTES_PER_SEAL)?;
let mut unsealed = seal_data.to_vec();
self.seal.unseal(unsealed.as_mut_slice(), seal_index);
result.add_chunk(unsealed);
partial_batch_data.extend_from_slice(&unsealed);
} else {
// Either partial seal (definitely not sealed) or full but unsealed -> copy overlap as separate chunk
// Either partial seal (definitely not sealed) or full but unsealed -> copy overlap and append
let overlap_start = std::cmp::max(partial_start_byte, seal_start_byte);
let overlap_end = std::cmp::min(partial_end_byte, seal_end_byte);
let offset_in_partial = overlap_start - partial_start_byte;
let overlap_len = overlap_end - overlap_start;
let overlap_data = partial_batch.data
[offset_in_partial..offset_in_partial + overlap_len]
.to_vec();
result.add_chunk(overlap_data);
let overlap_data = &partial_batch.data
[offset_in_partial..offset_in_partial + overlap_len];
partial_batch_data.extend_from_slice(overlap_data);
}
}
// Add the complete unsealed data for this partial batch as one chunk
result.add_chunk(partial_batch_data);
}
Some(result)
}