diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index a641dff..6369c02 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -1,4 +1,5 @@ use super::load_chunk::EntryBatch; +use super::seal_task_manager::SealTaskManager; use super::{MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; use crate::error::Error; @@ -24,11 +25,7 @@ use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_S pub struct FlowStore { db: FlowDBStore, - // TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk. - to_seal_set: RwLock>, - // Data sealing is an asynchronized process. - // The sealing service uses the version number to distinguish if revert happens during sealing. - to_seal_version: RwLock, + seal_manager: SealTaskManager, config: FlowConfig, } @@ -36,8 +33,7 @@ impl FlowStore { pub fn new(db: Arc, config: FlowConfig) -> Self { Self { db: FlowDBStore::new(db), - to_seal_set: Default::default(), - to_seal_version: RwLock::new(0), + seal_manager: Default::default(), config, } } @@ -80,14 +76,7 @@ impl FlowStore { } pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> { - let mut to_seal_set = self.to_seal_set.write(); - for batch_index in batch_list { - for seal_index in (*batch_index as usize) * SEALS_PER_LOAD - ..(*batch_index as usize + 1) * SEALS_PER_LOAD - { - to_seal_set.remove(&seal_index); - } - } + self.seal_manager.delete_batch_list(batch_list); self.db.delete_batch_list(batch_list) } @@ -232,7 +221,7 @@ impl FlowWrite for FlowStore { /// Return the roots of completed chunks. The order is guaranteed to be increasing /// by chunk index. fn append_entries(&self, data: ChunkArray) -> Result> { - let mut to_seal_set = self.to_seal_set.write(); + let mut to_seal_set = self.seal_manager.to_seal_set.write(); trace!("append_entries: {} {}", data.start_index, data.data.len()); if data.data.len() % BYTES_PER_SECTOR != 0 { bail!("append_entries: invalid data size, len={}", data.data.len()); @@ -263,12 +252,14 @@ impl FlowWrite for FlowStore { (chunk.start_index % self.config.batch_size as u64) as usize, chunk.data, )?; - completed_seals.into_iter().for_each(|x| { - to_seal_set.insert( - chunk_index as usize * SEALS_PER_LOAD + x as usize, - *self.to_seal_version.read(), - ); - }); + if self.seal_manager.seal_worker_available() { + completed_seals.into_iter().for_each(|x| { + to_seal_set.insert( + chunk_index as usize * SEALS_PER_LOAD + x as usize, + self.seal_manager.to_seal_version(), + ); + }); + } batch_list.push((chunk_index, batch)); } @@ -276,15 +267,14 @@ impl FlowWrite for FlowStore { } fn truncate(&self, start_index: u64) -> crate::error::Result<()> { - let mut to_seal_set = self.to_seal_set.write(); - let mut to_seal_version = self.to_seal_version.write(); + let mut to_seal_set = self.seal_manager.to_seal_set.write(); let to_reseal = self.db.truncate(start_index, self.config.batch_size)?; to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL)); - *to_seal_version += 1; + let new_seal_version = self.seal_manager.inc_seal_version(); to_reseal.into_iter().for_each(|x| { - to_seal_set.insert(x, *to_seal_version); + to_seal_set.insert(x, new_seal_version); }); Ok(()) } @@ -296,7 +286,9 @@ impl FlowWrite for FlowStore { impl FlowSeal for FlowStore { fn pull_seal_chunk(&self, seal_index_max: usize) -> Result>> { - let to_seal_set = self.to_seal_set.read(); + let to_seal_set = self.seal_manager.to_seal_set.read(); + self.seal_manager.update_pull_time(); + let mut to_seal_iter = to_seal_set.iter(); let (&first_index, &first_version) = try_option!(to_seal_iter.next()); if first_index >= seal_index_max { @@ -330,7 +322,7 @@ impl FlowSeal for FlowStore { } fn submit_seal_result(&self, answers: Vec) -> Result<()> { - let mut to_seal_set = self.to_seal_set.write(); + let mut to_seal_set = self.seal_manager.to_seal_set.write(); let is_consistent = |answer: &SealAnswer| { to_seal_set .get(&(answer.seal_index as usize)) diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index ae3b07e..5bd2b45 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -15,6 +15,7 @@ pub mod config; mod flow_store; mod load_chunk; pub mod log_manager; +mod seal_task_manager; #[cfg(test)] mod tests; pub mod tx_store; @@ -235,7 +236,7 @@ pub struct SealTask { /// The index (in seal) of chunks pub seal_index: u64, /// An ephemeral version number to distinguish if revert happending - pub version: usize, + pub version: u64, /// The data to be sealed pub non_sealed_data: [u8; BYTES_PER_SEAL], } @@ -245,7 +246,7 @@ pub struct SealAnswer { /// The index (in seal) of chunks pub seal_index: u64, /// An ephemeral version number to distinguish if revert happending - pub version: usize, + pub version: u64, /// The data to be sealed pub sealed_data: [u8; BYTES_PER_SEAL], /// The miner Id diff --git a/node/storage/src/log_store/seal_task_manager.rs b/node/storage/src/log_store/seal_task_manager.rs new file mode 100644 index 0000000..35aadf6 --- /dev/null +++ b/node/storage/src/log_store/seal_task_manager.rs @@ -0,0 +1,69 @@ +use std::{ + collections::BTreeMap, + sync::atomic::{AtomicU64, Ordering}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use parking_lot::RwLock; +use zgs_spec::SEALS_PER_LOAD; + +pub struct SealTaskManager { + // TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk. + pub to_seal_set: RwLock>, + // Data sealing is an asynchronized process. + // The sealing service uses the version number to distinguish if revert happens during sealing. + to_seal_version: AtomicU64, + last_pull_time: AtomicU64, +} + +impl Default for SealTaskManager { + fn default() -> Self { + Self { + to_seal_set: Default::default(), + to_seal_version: Default::default(), + last_pull_time: AtomicU64::new(current_timestamp()), + } + } +} + +fn current_timestamp() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("unexpected negative timestamp") + .as_secs() +} + +const SEAL_TASK_PULL_TIMEOUT_SECONDS: u64 = 300; + +impl SealTaskManager { + pub fn delete_batch_list(&self, batch_list: &[u64]) { + let mut to_seal_set = self.to_seal_set.write(); + for batch_index in batch_list { + for seal_index in (*batch_index as usize) * SEALS_PER_LOAD + ..(*batch_index as usize + 1) * SEALS_PER_LOAD + { + to_seal_set.remove(&seal_index); + } + } + } + + /// Record the latest timestamp that the miner thread pull seal tasks from the seal status. + pub fn update_pull_time(&self) { + // Here we only need an approximate timestamp and can tolerate a few seconds of error, so we used Ordering::Relaxed + self.last_pull_time + .store(current_timestamp(), Ordering::Relaxed) + } + + pub fn seal_worker_available(&self) -> bool { + let last_pull_time = self.last_pull_time.load(Ordering::Relaxed); + current_timestamp().saturating_sub(last_pull_time) < SEAL_TASK_PULL_TIMEOUT_SECONDS + } + + pub fn to_seal_version(&self) -> u64 { + self.to_seal_version.load(Ordering::Acquire) + } + + pub fn inc_seal_version(&self) -> u64 { + self.to_seal_version.fetch_add(1, Ordering::AcqRel) + 1 + } +}