mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-11-29 20:37:26 +00:00
@peter/fix seal (#402)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* fix seal issue
This commit is contained in:
parent
5a94554799
commit
90026a8cbe
@ -17,7 +17,7 @@ use append_merkle::{
|
|||||||
};
|
};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use kvdb::DBTransaction;
|
use kvdb::DBTransaction;
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::RwLock;
|
||||||
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||||
@ -263,12 +263,15 @@ impl FlowWrite for FlowStore {
|
|||||||
)?;
|
)?;
|
||||||
if self.seal_manager.seal_worker_available() && batch.is_complete() {
|
if self.seal_manager.seal_worker_available() && batch.is_complete() {
|
||||||
for seal_index in 0..SEALS_PER_LOAD {
|
for seal_index in 0..SEALS_PER_LOAD {
|
||||||
|
// Only add seals that are not already sealed
|
||||||
|
if !batch.is_seal_sealed(seal_index as u16) {
|
||||||
to_seal_set.insert(
|
to_seal_set.insert(
|
||||||
chunk_index as usize * SEALS_PER_LOAD + seal_index,
|
chunk_index as usize * SEALS_PER_LOAD + seal_index,
|
||||||
self.seal_manager.to_seal_version(),
|
self.seal_manager.to_seal_version(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
batch_list.push((chunk_index, batch));
|
batch_list.push((chunk_index, batch));
|
||||||
}
|
}
|
||||||
@ -391,23 +394,17 @@ pub struct PadPair {
|
|||||||
|
|
||||||
pub struct FlowDBStore {
|
pub struct FlowDBStore {
|
||||||
kvdb: Arc<dyn ZgsKeyValueDB>,
|
kvdb: Arc<dyn ZgsKeyValueDB>,
|
||||||
// Mutex to prevent race condition between put_entry_batch_list and put_entry_raw
|
|
||||||
write_mutex: Mutex<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowDBStore {
|
impl FlowDBStore {
|
||||||
pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Self {
|
pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Self {
|
||||||
Self {
|
Self { kvdb }
|
||||||
kvdb,
|
|
||||||
write_mutex: Mutex::new(()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_entry_batch_list(
|
fn put_entry_batch_list(
|
||||||
&self,
|
&self,
|
||||||
batch_list: Vec<(u64, EntryBatch)>,
|
batch_list: Vec<(u64, EntryBatch)>,
|
||||||
) -> Result<Vec<(u64, DataRoot)>> {
|
) -> Result<Vec<(u64, DataRoot)>> {
|
||||||
let _lock = self.write_mutex.lock();
|
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
let mut completed_batches = Vec::new();
|
let mut completed_batches = Vec::new();
|
||||||
let mut tx = self.kvdb.transaction();
|
let mut tx = self.kvdb.transaction();
|
||||||
@ -429,7 +426,6 @@ impl FlowDBStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> {
|
fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> {
|
||||||
let _lock = self.write_mutex.lock();
|
|
||||||
let mut tx = self.kvdb.transaction();
|
let mut tx = self.kvdb.transaction();
|
||||||
for (batch_index, batch) in batch_list {
|
for (batch_index, batch) in batch_list {
|
||||||
tx.put(
|
tx.put(
|
||||||
|
|||||||
@ -71,6 +71,11 @@ impl EntryBatch {
|
|||||||
pub fn is_complete(&self) -> bool {
|
pub fn is_complete(&self) -> bool {
|
||||||
matches!(self.data, EntryBatchData::Complete(_))
|
matches!(self.data, EntryBatchData::Complete(_))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if a specific seal is already sealed
|
||||||
|
pub fn is_seal_sealed(&self, seal_index: u16) -> bool {
|
||||||
|
self.seal.is_sealed(seal_index)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EntryBatch {
|
impl EntryBatch {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user