diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index a30c997..f231fb8 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -21,7 +21,6 @@ use parking_lot::{Mutex, RwLock}; use shared_types::{ChunkArray, DataRoot, FlowProof}; use ssz::{Decode, Encode}; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; -use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -420,53 +419,25 @@ pub struct PadPair { pub struct FlowDBStore { kvdb: Arc, - // Per-batch mutexes to prevent race conditions on individual batches - batch_mutexes: RwLock>>>, + // Mutex to prevent race condition between put_entry_batch_list and put_entry_raw + write_mutex: Mutex<()>, } impl FlowDBStore { pub fn new(kvdb: Arc) -> Self { Self { kvdb, - batch_mutexes: RwLock::new(HashMap::new()), + write_mutex: Mutex::new(()), } } - fn get_batch_mutex(&self, batch_index: u64) -> Arc> { - let read_guard = self.batch_mutexes.read(); - if let Some(mutex) = read_guard.get(&batch_index) { - return mutex.clone(); - } - drop(read_guard); - - let mut write_guard = self.batch_mutexes.write(); - // Double-check pattern - if let Some(mutex) = write_guard.get(&batch_index) { - return mutex.clone(); - } - let mutex = Arc::new(Mutex::new(())); - write_guard.insert(batch_index, mutex.clone()); - mutex - } - fn put_entry_batch_list( &self, batch_list: Vec<(u64, EntryBatch)>, ) -> Result> { + let _lock = self.write_mutex.lock(); let start_time = Instant::now(); let mut completed_batches = Vec::new(); - - // Collect all mutexes and locks first to avoid deadlocks - let batch_mutexes: Vec<_> = batch_list - .iter() - .map(|(batch_index, _)| (*batch_index, self.get_batch_mutex(*batch_index))) - .collect(); - - let _locks: Vec<_> = batch_mutexes - .iter() - .map(|(_, mutex)| mutex.lock()) - .collect(); - let mut tx = self.kvdb.transaction(); for (batch_index, batch) in batch_list { @@ -481,23 +452,12 @@ impl FlowDBStore { } } self.kvdb.write(tx)?; - // Locks are dropped here metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time); Ok(completed_batches) } fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> { - // Collect all mutexes and locks first to avoid deadlocks - let batch_mutexes: Vec<_> = batch_list - .iter() - .map(|(batch_index, _)| (*batch_index, self.get_batch_mutex(*batch_index))) - .collect(); - - let _locks: Vec<_> = batch_mutexes - .iter() - .map(|(_, mutex)| mutex.lock()) - .collect(); - + let _lock = self.write_mutex.lock(); let mut tx = self.kvdb.transaction(); for (batch_index, batch) in batch_list { tx.put( @@ -507,7 +467,6 @@ impl FlowDBStore { ); } self.kvdb.write(tx)?; - // Locks are dropped here Ok(()) } diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index ae74d8e..2db8e8c 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -13,7 +13,7 @@ use crate::log_store::log_manager::data_to_merkle_leaves; use crate::try_option; use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; use shared_types::{ChunkArray, DataRoot, Merkle}; -use tracing::trace; +use tracing::{debug, trace}; use zgs_spec::{ BYTES_PER_LOAD, BYTES_PER_SEAL, BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL, @@ -189,34 +189,49 @@ impl EntryBatch { match &self.data { EntryBatchData::Complete(_) => { - // If complete, iterate through all seals and unseal as necessary + // If complete, return the entire unsealed data as a single chunk let mut result = UnsealedDataList::new(); + let mut complete_data = Vec::with_capacity(BYTES_PER_LOAD); + for bit in 0..SEALS_PER_LOAD { let start_byte = bit as usize * BYTES_PER_SEAL; if self.seal.is_sealed(bit as u16) { - // If sealed, get the slice, unseal it, and add as separate chunk + // If sealed, get the slice, unseal it, and append let mut data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec(); self.seal.unseal(data_slice.as_mut_slice(), bit as u16); - result.add_chunk(data_slice); + complete_data.extend_from_slice(&data_slice); } else { - // If not sealed, directly copy the data as separate chunk - let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?.to_vec(); - result.add_chunk(data_slice); + // If not sealed, directly copy the data + let data_slice = self.data.get(start_byte, BYTES_PER_SEAL)?; + complete_data.extend_from_slice(data_slice); } } + + result.add_chunk(complete_data); Some(result) } EntryBatchData::Incomplete(incomplete_data) => { // If incomplete, iterate through known partial batches to build the unsealed data + debug!("Building unsealed data from incomplete known data"); let mut result = UnsealedDataList::new(); for partial_batch in &incomplete_data.known_data { + let start_sector = partial_batch.start_sector; let data_len = partial_batch.data.len(); + debug!( + "Processing partial batch: start_sector={} data_len={}", + start_sector, + data_len + ); + if data_len == 0 { continue; } + // Accumulate data for this partial batch + let mut partial_batch_data = Vec::new(); + let partial_start_byte = start_sector * BYTES_PER_SECTOR; let partial_end_byte = partial_start_byte + data_len; @@ -224,6 +239,18 @@ impl EntryBatch { let start_seal_index = (start_sector / SECTORS_PER_SEAL) as u16; let end_seal_index = ((partial_end_byte - 1) / BYTES_PER_SEAL) as u16; + debug!( + "Partial batch spans seals: start_seal_index={} end_seal_index={}", + start_seal_index, + end_seal_index + ); + + debug!( + "Partial batch byte range: {} to {}", + partial_start_byte, + partial_end_byte + ); + // Iterate through each seal that this partial batch spans for seal_index in start_seal_index..=end_seal_index { let seal_start_byte = seal_index as usize * BYTES_PER_SEAL; @@ -233,25 +260,34 @@ impl EntryBatch { let is_full_seal = partial_start_byte <= seal_start_byte && partial_end_byte >= seal_end_byte; + debug!( + "Processing seal_start_byte={} seal_end_byte={} is_full_seal={}", + seal_start_byte, + seal_end_byte, + is_full_seal + ); + if is_full_seal && self.seal.is_sealed(seal_index) { - // Full seal and sealed -> unseal and add as separate chunk + // Full seal and sealed -> unseal and append to partial batch data let seal_data = self.data.get(seal_start_byte, BYTES_PER_SEAL)?; let mut unsealed = seal_data.to_vec(); self.seal.unseal(unsealed.as_mut_slice(), seal_index); - result.add_chunk(unsealed); + partial_batch_data.extend_from_slice(&unsealed); } else { - // Either partial seal (definitely not sealed) or full but unsealed -> copy overlap as separate chunk + // Either partial seal (definitely not sealed) or full but unsealed -> copy overlap and append let overlap_start = std::cmp::max(partial_start_byte, seal_start_byte); let overlap_end = std::cmp::min(partial_end_byte, seal_end_byte); let offset_in_partial = overlap_start - partial_start_byte; let overlap_len = overlap_end - overlap_start; - let overlap_data = partial_batch.data - [offset_in_partial..offset_in_partial + overlap_len] - .to_vec(); - result.add_chunk(overlap_data); + let overlap_data = &partial_batch.data + [offset_in_partial..offset_in_partial + overlap_len]; + partial_batch_data.extend_from_slice(overlap_data); } } + + // Add the complete unsealed data for this partial batch as one chunk + result.add_chunk(partial_batch_data); } Some(result) }