mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 08:37:27 +00:00 
			
		
		
		
	Avoid appending to_seal_set if sealer is not enabled (#212)
This commit is contained in:
		
							parent
							
								
									20266e0a6c
								
							
						
					
					
						commit
						27db2d6496
					
				@ -1,4 +1,5 @@
 | 
				
			|||||||
use super::load_chunk::EntryBatch;
 | 
					use super::load_chunk::EntryBatch;
 | 
				
			||||||
 | 
					use super::seal_task_manager::SealTaskManager;
 | 
				
			||||||
use super::{MineLoadChunk, SealAnswer, SealTask};
 | 
					use super::{MineLoadChunk, SealAnswer, SealTask};
 | 
				
			||||||
use crate::config::ShardConfig;
 | 
					use crate::config::ShardConfig;
 | 
				
			||||||
use crate::error::Error;
 | 
					use crate::error::Error;
 | 
				
			||||||
@ -25,11 +26,7 @@ use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_S
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
pub struct FlowStore {
 | 
					pub struct FlowStore {
 | 
				
			||||||
    db: FlowDBStore,
 | 
					    db: FlowDBStore,
 | 
				
			||||||
    // TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk.
 | 
					    seal_manager: SealTaskManager,
 | 
				
			||||||
    to_seal_set: RwLock<BTreeMap<usize, usize>>,
 | 
					 | 
				
			||||||
    // Data sealing is an asynchronized process.
 | 
					 | 
				
			||||||
    // The sealing service uses the version number to distinguish if revert happens during sealing.
 | 
					 | 
				
			||||||
    to_seal_version: RwLock<usize>,
 | 
					 | 
				
			||||||
    config: FlowConfig,
 | 
					    config: FlowConfig,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -37,8 +34,7 @@ impl FlowStore {
 | 
				
			|||||||
    pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
 | 
					    pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
 | 
				
			||||||
        Self {
 | 
					        Self {
 | 
				
			||||||
            db: FlowDBStore::new(db),
 | 
					            db: FlowDBStore::new(db),
 | 
				
			||||||
            to_seal_set: Default::default(),
 | 
					            seal_manager: Default::default(),
 | 
				
			||||||
            to_seal_version: RwLock::new(0),
 | 
					 | 
				
			||||||
            config,
 | 
					            config,
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -81,14 +77,7 @@ impl FlowStore {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
 | 
					    pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
 | 
				
			||||||
        let mut to_seal_set = self.to_seal_set.write();
 | 
					        self.seal_manager.delete_batch_list(batch_list);
 | 
				
			||||||
        for batch_index in batch_list {
 | 
					 | 
				
			||||||
            for seal_index in (*batch_index as usize) * SEALS_PER_LOAD
 | 
					 | 
				
			||||||
                ..(*batch_index as usize + 1) * SEALS_PER_LOAD
 | 
					 | 
				
			||||||
            {
 | 
					 | 
				
			||||||
                to_seal_set.remove(&seal_index);
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        self.db.delete_batch_list(batch_list)
 | 
					        self.db.delete_batch_list(batch_list)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -233,7 +222,7 @@ impl FlowWrite for FlowStore {
 | 
				
			|||||||
    /// Return the roots of completed chunks. The order is guaranteed to be increasing
 | 
					    /// Return the roots of completed chunks. The order is guaranteed to be increasing
 | 
				
			||||||
    /// by chunk index.
 | 
					    /// by chunk index.
 | 
				
			||||||
    fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
 | 
					    fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
 | 
				
			||||||
        let mut to_seal_set = self.to_seal_set.write();
 | 
					        let mut to_seal_set = self.seal_manager.to_seal_set.write();
 | 
				
			||||||
        trace!("append_entries: {} {}", data.start_index, data.data.len());
 | 
					        trace!("append_entries: {} {}", data.start_index, data.data.len());
 | 
				
			||||||
        if data.data.len() % BYTES_PER_SECTOR != 0 {
 | 
					        if data.data.len() % BYTES_PER_SECTOR != 0 {
 | 
				
			||||||
            bail!("append_entries: invalid data size, len={}", data.data.len());
 | 
					            bail!("append_entries: invalid data size, len={}", data.data.len());
 | 
				
			||||||
@ -264,12 +253,14 @@ impl FlowWrite for FlowStore {
 | 
				
			|||||||
                (chunk.start_index % self.config.batch_size as u64) as usize,
 | 
					                (chunk.start_index % self.config.batch_size as u64) as usize,
 | 
				
			||||||
                chunk.data,
 | 
					                chunk.data,
 | 
				
			||||||
            )?;
 | 
					            )?;
 | 
				
			||||||
 | 
					            if self.seal_manager.seal_worker_available() {
 | 
				
			||||||
                completed_seals.into_iter().for_each(|x| {
 | 
					                completed_seals.into_iter().for_each(|x| {
 | 
				
			||||||
                    to_seal_set.insert(
 | 
					                    to_seal_set.insert(
 | 
				
			||||||
                        chunk_index as usize * SEALS_PER_LOAD + x as usize,
 | 
					                        chunk_index as usize * SEALS_PER_LOAD + x as usize,
 | 
				
			||||||
                    *self.to_seal_version.read(),
 | 
					                        self.seal_manager.to_seal_version(),
 | 
				
			||||||
                    );
 | 
					                    );
 | 
				
			||||||
                });
 | 
					                });
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            batch_list.push((chunk_index, batch));
 | 
					            batch_list.push((chunk_index, batch));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -277,15 +268,14 @@ impl FlowWrite for FlowStore {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
 | 
					    fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
 | 
				
			||||||
        let mut to_seal_set = self.to_seal_set.write();
 | 
					        let mut to_seal_set = self.seal_manager.to_seal_set.write();
 | 
				
			||||||
        let mut to_seal_version = self.to_seal_version.write();
 | 
					 | 
				
			||||||
        let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
 | 
					        let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
 | 
					        to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
 | 
				
			||||||
        *to_seal_version += 1;
 | 
					        let new_seal_version = self.seal_manager.inc_seal_version();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        to_reseal.into_iter().for_each(|x| {
 | 
					        to_reseal.into_iter().for_each(|x| {
 | 
				
			||||||
            to_seal_set.insert(x, *to_seal_version);
 | 
					            to_seal_set.insert(x, new_seal_version);
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
        Ok(())
 | 
					        Ok(())
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -297,7 +287,9 @@ impl FlowWrite for FlowStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
impl FlowSeal for FlowStore {
 | 
					impl FlowSeal for FlowStore {
 | 
				
			||||||
    fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
 | 
					    fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
 | 
				
			||||||
        let to_seal_set = self.to_seal_set.read();
 | 
					        let to_seal_set = self.seal_manager.to_seal_set.read();
 | 
				
			||||||
 | 
					        self.seal_manager.update_pull_time();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let mut to_seal_iter = to_seal_set.iter();
 | 
					        let mut to_seal_iter = to_seal_set.iter();
 | 
				
			||||||
        let (&first_index, &first_version) = try_option!(to_seal_iter.next());
 | 
					        let (&first_index, &first_version) = try_option!(to_seal_iter.next());
 | 
				
			||||||
        if first_index >= seal_index_max {
 | 
					        if first_index >= seal_index_max {
 | 
				
			||||||
@ -331,7 +323,7 @@ impl FlowSeal for FlowStore {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
 | 
					    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
 | 
				
			||||||
        let mut to_seal_set = self.to_seal_set.write();
 | 
					        let mut to_seal_set = self.seal_manager.to_seal_set.write();
 | 
				
			||||||
        let is_consistent = |answer: &SealAnswer| {
 | 
					        let is_consistent = |answer: &SealAnswer| {
 | 
				
			||||||
            to_seal_set
 | 
					            to_seal_set
 | 
				
			||||||
                .get(&(answer.seal_index as usize))
 | 
					                .get(&(answer.seal_index as usize))
 | 
				
			||||||
 | 
				
			|||||||
@ -15,6 +15,7 @@ pub mod config;
 | 
				
			|||||||
mod flow_store;
 | 
					mod flow_store;
 | 
				
			||||||
mod load_chunk;
 | 
					mod load_chunk;
 | 
				
			||||||
pub mod log_manager;
 | 
					pub mod log_manager;
 | 
				
			||||||
 | 
					mod seal_task_manager;
 | 
				
			||||||
#[cfg(test)]
 | 
					#[cfg(test)]
 | 
				
			||||||
mod tests;
 | 
					mod tests;
 | 
				
			||||||
pub mod tx_store;
 | 
					pub mod tx_store;
 | 
				
			||||||
@ -238,7 +239,7 @@ pub struct SealTask {
 | 
				
			|||||||
    /// The index (in seal) of chunks
 | 
					    /// The index (in seal) of chunks
 | 
				
			||||||
    pub seal_index: u64,
 | 
					    pub seal_index: u64,
 | 
				
			||||||
    /// An ephemeral version number to distinguish if revert happending
 | 
					    /// An ephemeral version number to distinguish if revert happending
 | 
				
			||||||
    pub version: usize,
 | 
					    pub version: u64,
 | 
				
			||||||
    /// The data to be sealed
 | 
					    /// The data to be sealed
 | 
				
			||||||
    pub non_sealed_data: [u8; BYTES_PER_SEAL],
 | 
					    pub non_sealed_data: [u8; BYTES_PER_SEAL],
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -248,7 +249,7 @@ pub struct SealAnswer {
 | 
				
			|||||||
    /// The index (in seal) of chunks
 | 
					    /// The index (in seal) of chunks
 | 
				
			||||||
    pub seal_index: u64,
 | 
					    pub seal_index: u64,
 | 
				
			||||||
    /// An ephemeral version number to distinguish if revert happending
 | 
					    /// An ephemeral version number to distinguish if revert happending
 | 
				
			||||||
    pub version: usize,
 | 
					    pub version: u64,
 | 
				
			||||||
    /// The data to be sealed
 | 
					    /// The data to be sealed
 | 
				
			||||||
    pub sealed_data: [u8; BYTES_PER_SEAL],
 | 
					    pub sealed_data: [u8; BYTES_PER_SEAL],
 | 
				
			||||||
    /// The miner Id
 | 
					    /// The miner Id
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										69
									
								
								node/storage/src/log_store/seal_task_manager.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								node/storage/src/log_store/seal_task_manager.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,69 @@
 | 
				
			|||||||
 | 
					use std::{
 | 
				
			||||||
 | 
					    collections::BTreeMap,
 | 
				
			||||||
 | 
					    sync::atomic::{AtomicU64, Ordering},
 | 
				
			||||||
 | 
					    time::{SystemTime, UNIX_EPOCH},
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use parking_lot::RwLock;
 | 
				
			||||||
 | 
					use zgs_spec::SEALS_PER_LOAD;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub struct SealTaskManager {
 | 
				
			||||||
 | 
					    // TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk.
 | 
				
			||||||
 | 
					    pub to_seal_set: RwLock<BTreeMap<usize, u64>>,
 | 
				
			||||||
 | 
					    // Data sealing is an asynchronized process.
 | 
				
			||||||
 | 
					    // The sealing service uses the version number to distinguish if revert happens during sealing.
 | 
				
			||||||
 | 
					    to_seal_version: AtomicU64,
 | 
				
			||||||
 | 
					    last_pull_time: AtomicU64,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl Default for SealTaskManager {
 | 
				
			||||||
 | 
					    fn default() -> Self {
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            to_seal_set: Default::default(),
 | 
				
			||||||
 | 
					            to_seal_version: Default::default(),
 | 
				
			||||||
 | 
					            last_pull_time: AtomicU64::new(current_timestamp()),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					fn current_timestamp() -> u64 {
 | 
				
			||||||
 | 
					    SystemTime::now()
 | 
				
			||||||
 | 
					        .duration_since(UNIX_EPOCH)
 | 
				
			||||||
 | 
					        .expect("unexpected negative timestamp")
 | 
				
			||||||
 | 
					        .as_secs()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const SEAL_TASK_PULL_TIMEOUT_SECONDS: u64 = 300;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl SealTaskManager {
 | 
				
			||||||
 | 
					    pub fn delete_batch_list(&self, batch_list: &[u64]) {
 | 
				
			||||||
 | 
					        let mut to_seal_set = self.to_seal_set.write();
 | 
				
			||||||
 | 
					        for batch_index in batch_list {
 | 
				
			||||||
 | 
					            for seal_index in (*batch_index as usize) * SEALS_PER_LOAD
 | 
				
			||||||
 | 
					                ..(*batch_index as usize + 1) * SEALS_PER_LOAD
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                to_seal_set.remove(&seal_index);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Record the latest timestamp that the miner thread pull seal tasks from the seal status.
 | 
				
			||||||
 | 
					    pub fn update_pull_time(&self) {
 | 
				
			||||||
 | 
					        // Here we only need an approximate timestamp and can tolerate a few seconds of error, so we used Ordering::Relaxed
 | 
				
			||||||
 | 
					        self.last_pull_time
 | 
				
			||||||
 | 
					            .store(current_timestamp(), Ordering::Relaxed)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn seal_worker_available(&self) -> bool {
 | 
				
			||||||
 | 
					        let last_pull_time = self.last_pull_time.load(Ordering::Relaxed);
 | 
				
			||||||
 | 
					        current_timestamp().saturating_sub(last_pull_time) < SEAL_TASK_PULL_TIMEOUT_SECONDS
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn to_seal_version(&self) -> u64 {
 | 
				
			||||||
 | 
					        self.to_seal_version.load(Ordering::Acquire)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn inc_seal_version(&self) -> u64 {
 | 
				
			||||||
 | 
					        self.to_seal_version.fetch_add(1, Ordering::AcqRel) + 1
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user