Avoid appending to_seal_set if sealer is not enabled

This commit is contained in:
Bruno Valente 2024-09-25 15:43:27 +08:00
parent 84c415e959
commit 9593b51f5b
3 changed files with 92 additions and 30 deletions

View File

@ -1,4 +1,5 @@
use super::load_chunk::EntryBatch; use super::load_chunk::EntryBatch;
use super::seal_task_manager::SealTaskManager;
use super::{MineLoadChunk, SealAnswer, SealTask}; use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig; use crate::config::ShardConfig;
use crate::error::Error; use crate::error::Error;
@ -24,11 +25,7 @@ use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_S
pub struct FlowStore { pub struct FlowStore {
db: FlowDBStore, db: FlowDBStore,
// TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk. seal_manager: SealTaskManager,
to_seal_set: RwLock<BTreeMap<usize, usize>>,
// Data sealing is an asynchronized process.
// The sealing service uses the version number to distinguish if revert happens during sealing.
to_seal_version: RwLock<usize>,
config: FlowConfig, config: FlowConfig,
} }
@ -36,8 +33,7 @@ impl FlowStore {
pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self { pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
Self { Self {
db: FlowDBStore::new(db), db: FlowDBStore::new(db),
to_seal_set: Default::default(), seal_manager: Default::default(),
to_seal_version: RwLock::new(0),
config, config,
} }
} }
@ -80,14 +76,7 @@ impl FlowStore {
} }
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> { pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
let mut to_seal_set = self.to_seal_set.write(); self.seal_manager.delete_batch_list(batch_list);
for batch_index in batch_list {
for seal_index in (*batch_index as usize) * SEALS_PER_LOAD
..(*batch_index as usize + 1) * SEALS_PER_LOAD
{
to_seal_set.remove(&seal_index);
}
}
self.db.delete_batch_list(batch_list) self.db.delete_batch_list(batch_list)
} }
@ -232,7 +221,7 @@ impl FlowWrite for FlowStore {
/// Return the roots of completed chunks. The order is guaranteed to be increasing /// Return the roots of completed chunks. The order is guaranteed to be increasing
/// by chunk index. /// by chunk index.
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> { fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
let mut to_seal_set = self.to_seal_set.write(); let mut to_seal_set = self.seal_manager.to_seal_set.write();
trace!("append_entries: {} {}", data.start_index, data.data.len()); trace!("append_entries: {} {}", data.start_index, data.data.len());
if data.data.len() % BYTES_PER_SECTOR != 0 { if data.data.len() % BYTES_PER_SECTOR != 0 {
bail!("append_entries: invalid data size, len={}", data.data.len()); bail!("append_entries: invalid data size, len={}", data.data.len());
@ -263,12 +252,14 @@ impl FlowWrite for FlowStore {
(chunk.start_index % self.config.batch_size as u64) as usize, (chunk.start_index % self.config.batch_size as u64) as usize,
chunk.data, chunk.data,
)?; )?;
completed_seals.into_iter().for_each(|x| { if self.seal_manager.seal_worker_available() {
to_seal_set.insert( completed_seals.into_iter().for_each(|x| {
chunk_index as usize * SEALS_PER_LOAD + x as usize, to_seal_set.insert(
*self.to_seal_version.read(), chunk_index as usize * SEALS_PER_LOAD + x as usize,
); self.seal_manager.to_seal_version(),
}); );
});
}
batch_list.push((chunk_index, batch)); batch_list.push((chunk_index, batch));
} }
@ -276,15 +267,14 @@ impl FlowWrite for FlowStore {
} }
fn truncate(&self, start_index: u64) -> crate::error::Result<()> { fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
let mut to_seal_set = self.to_seal_set.write(); let mut to_seal_set = self.seal_manager.to_seal_set.write();
let mut to_seal_version = self.to_seal_version.write();
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?; let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL)); to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
*to_seal_version += 1; let new_seal_version = self.seal_manager.inc_seal_version();
to_reseal.into_iter().for_each(|x| { to_reseal.into_iter().for_each(|x| {
to_seal_set.insert(x, *to_seal_version); to_seal_set.insert(x, new_seal_version);
}); });
Ok(()) Ok(())
} }
@ -296,7 +286,9 @@ impl FlowWrite for FlowStore {
impl FlowSeal for FlowStore { impl FlowSeal for FlowStore {
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> { fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
let to_seal_set = self.to_seal_set.read(); let to_seal_set = self.seal_manager.to_seal_set.read();
self.seal_manager.update_pull_time();
let mut to_seal_iter = to_seal_set.iter(); let mut to_seal_iter = to_seal_set.iter();
let (&first_index, &first_version) = try_option!(to_seal_iter.next()); let (&first_index, &first_version) = try_option!(to_seal_iter.next());
if first_index >= seal_index_max { if first_index >= seal_index_max {
@ -330,7 +322,7 @@ impl FlowSeal for FlowStore {
} }
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> { fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
let mut to_seal_set = self.to_seal_set.write(); let mut to_seal_set = self.seal_manager.to_seal_set.write();
let is_consistent = |answer: &SealAnswer| { let is_consistent = |answer: &SealAnswer| {
to_seal_set to_seal_set
.get(&(answer.seal_index as usize)) .get(&(answer.seal_index as usize))

View File

@ -15,6 +15,7 @@ pub mod config;
mod flow_store; mod flow_store;
mod load_chunk; mod load_chunk;
pub mod log_manager; pub mod log_manager;
mod seal_task_manager;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub mod tx_store; pub mod tx_store;
@ -235,7 +236,7 @@ pub struct SealTask {
/// The index (in seal) of chunks /// The index (in seal) of chunks
pub seal_index: u64, pub seal_index: u64,
/// An ephemeral version number to distinguish if revert happending /// An ephemeral version number to distinguish if revert happending
pub version: usize, pub version: u64,
/// The data to be sealed /// The data to be sealed
pub non_sealed_data: [u8; BYTES_PER_SEAL], pub non_sealed_data: [u8; BYTES_PER_SEAL],
} }
@ -245,7 +246,7 @@ pub struct SealAnswer {
/// The index (in seal) of chunks /// The index (in seal) of chunks
pub seal_index: u64, pub seal_index: u64,
/// An ephemeral version number to distinguish if revert happending /// An ephemeral version number to distinguish if revert happending
pub version: usize, pub version: u64,
/// The data to be sealed /// The data to be sealed
pub sealed_data: [u8; BYTES_PER_SEAL], pub sealed_data: [u8; BYTES_PER_SEAL],
/// The miner Id /// The miner Id

View File

@ -0,0 +1,69 @@
use std::{
collections::BTreeMap,
sync::atomic::{AtomicU64, Ordering},
time::{SystemTime, UNIX_EPOCH},
};
use parking_lot::RwLock;
use zgs_spec::SEALS_PER_LOAD;
pub struct SealTaskManager {
// TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk.
pub to_seal_set: RwLock<BTreeMap<usize, u64>>,
// Data sealing is an asynchronized process.
// The sealing service uses the version number to distinguish if revert happens during sealing.
to_seal_version: AtomicU64,
last_pull_time: AtomicU64,
}
impl Default for SealTaskManager {
fn default() -> Self {
Self {
to_seal_set: Default::default(),
to_seal_version: Default::default(),
last_pull_time: AtomicU64::new(current_timestamp()),
}
}
}
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("unexpected negative timestamp")
.as_secs()
}
const SEAL_TASK_PULL_TIMEOUT_SECONDS: u64 = 300;
impl SealTaskManager {
pub fn delete_batch_list(&self, batch_list: &[u64]) {
let mut to_seal_set = self.to_seal_set.write();
for batch_index in batch_list {
for seal_index in (*batch_index as usize) * SEALS_PER_LOAD
..(*batch_index as usize + 1) * SEALS_PER_LOAD
{
to_seal_set.remove(&seal_index);
}
}
}
/// Record the latest timestamp that the miner thread pull seal tasks from the seal status.
pub fn update_pull_time(&self) {
// Here we only need an approximate timestamp and can tolerate a few seconds of error, so we used Ordering::Relaxed
self.last_pull_time
.store(current_timestamp(), Ordering::Relaxed)
}
pub fn seal_worker_available(&self) -> bool {
let last_pull_time = self.last_pull_time.load(Ordering::Relaxed);
current_timestamp().saturating_sub(last_pull_time) < SEAL_TASK_PULL_TIMEOUT_SECONDS
}
pub fn to_seal_version(&self) -> u64 {
self.to_seal_version.load(Ordering::Acquire)
}
pub fn inc_seal_version(&self) -> u64 {
self.to_seal_version.fetch_add(1, Ordering::AcqRel) + 1
}
}