From 90026a8cbeaaa5777dee89bcdd9ec104142a9efc Mon Sep 17 00:00:00 2001 From: 0g-peterzhb <158457852+0g-peterzhb@users.noreply.github.com> Date: Thu, 27 Nov 2025 22:49:17 +0800 Subject: [PATCH] @peter/fix seal (#402) * fix seal issue --- node/storage/src/log_store/flow_store.rs | 22 ++++++++------------ node/storage/src/log_store/load_chunk/mod.rs | 5 +++++ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index c82ddd1..c74bd38 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -17,7 +17,7 @@ use append_merkle::{ }; use itertools::Itertools; use kvdb::DBTransaction; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use shared_types::{ChunkArray, DataRoot, FlowProof}; use ssz::{Decode, Encode}; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; @@ -263,10 +263,13 @@ impl FlowWrite for FlowStore { )?; if self.seal_manager.seal_worker_available() && batch.is_complete() { for seal_index in 0..SEALS_PER_LOAD { - to_seal_set.insert( - chunk_index as usize * SEALS_PER_LOAD + seal_index, - self.seal_manager.to_seal_version(), - ); + // Only add seals that are not already sealed + if !batch.is_seal_sealed(seal_index as u16) { + to_seal_set.insert( + chunk_index as usize * SEALS_PER_LOAD + seal_index, + self.seal_manager.to_seal_version(), + ); + } } } @@ -391,23 +394,17 @@ pub struct PadPair { pub struct FlowDBStore { kvdb: Arc, - // Mutex to prevent race condition between put_entry_batch_list and put_entry_raw - write_mutex: Mutex<()>, } impl FlowDBStore { pub fn new(kvdb: Arc) -> Self { - Self { - kvdb, - write_mutex: Mutex::new(()), - } + Self { kvdb } } fn put_entry_batch_list( &self, batch_list: Vec<(u64, EntryBatch)>, ) -> Result> { - let _lock = self.write_mutex.lock(); let start_time = Instant::now(); let mut completed_batches = Vec::new(); let mut tx = self.kvdb.transaction(); @@ -429,7 +426,6 @@ impl FlowDBStore { } fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> { - let _lock = self.write_mutex.lock(); let mut tx = self.kvdb.transaction(); for (batch_index, batch) in batch_list { tx.put( diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index d3ad197..0ce8582 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -71,6 +71,11 @@ impl EntryBatch { pub fn is_complete(&self) -> bool { matches!(self.data, EntryBatchData::Complete(_)) } + + /// Check if a specific seal is already sealed + pub fn is_seal_sealed(&self, seal_index: u16) -> bool { + self.seal.is_sealed(seal_index) + } } impl EntryBatch {