mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
3 Commits
6becd30a2c
...
33de1b5044
Author | SHA1 | Date | |
---|---|---|---|
![]() |
33de1b5044 | ||
![]() |
395aeabde7 | ||
![]() |
9593b51f5b |
@ -1,4 +1,5 @@
|
|||||||
use super::load_chunk::EntryBatch;
|
use super::load_chunk::EntryBatch;
|
||||||
|
use super::seal_task_manager::SealTaskManager;
|
||||||
use super::{MineLoadChunk, SealAnswer, SealTask};
|
use super::{MineLoadChunk, SealAnswer, SealTask};
|
||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
@ -24,11 +25,7 @@ use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_S
|
|||||||
|
|
||||||
pub struct FlowStore {
|
pub struct FlowStore {
|
||||||
db: FlowDBStore,
|
db: FlowDBStore,
|
||||||
// TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk.
|
seal_manager: SealTaskManager,
|
||||||
to_seal_set: RwLock<BTreeMap<usize, usize>>,
|
|
||||||
// Data sealing is an asynchronized process.
|
|
||||||
// The sealing service uses the version number to distinguish if revert happens during sealing.
|
|
||||||
to_seal_version: RwLock<usize>,
|
|
||||||
config: FlowConfig,
|
config: FlowConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,8 +33,7 @@ impl FlowStore {
|
|||||||
pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
|
pub fn new(db: Arc<dyn ZgsKeyValueDB>, config: FlowConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
db: FlowDBStore::new(db),
|
db: FlowDBStore::new(db),
|
||||||
to_seal_set: Default::default(),
|
seal_manager: Default::default(),
|
||||||
to_seal_version: RwLock::new(0),
|
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -80,14 +76,7 @@ impl FlowStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
||||||
let mut to_seal_set = self.to_seal_set.write();
|
self.seal_manager.delete_batch_list(batch_list);
|
||||||
for batch_index in batch_list {
|
|
||||||
for seal_index in (*batch_index as usize) * SEALS_PER_LOAD
|
|
||||||
..(*batch_index as usize + 1) * SEALS_PER_LOAD
|
|
||||||
{
|
|
||||||
to_seal_set.remove(&seal_index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.db.delete_batch_list(batch_list)
|
self.db.delete_batch_list(batch_list)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,7 +221,7 @@ impl FlowWrite for FlowStore {
|
|||||||
/// Return the roots of completed chunks. The order is guaranteed to be increasing
|
/// Return the roots of completed chunks. The order is guaranteed to be increasing
|
||||||
/// by chunk index.
|
/// by chunk index.
|
||||||
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
||||||
let mut to_seal_set = self.to_seal_set.write();
|
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||||
trace!("append_entries: {} {}", data.start_index, data.data.len());
|
trace!("append_entries: {} {}", data.start_index, data.data.len());
|
||||||
if data.data.len() % BYTES_PER_SECTOR != 0 {
|
if data.data.len() % BYTES_PER_SECTOR != 0 {
|
||||||
bail!("append_entries: invalid data size, len={}", data.data.len());
|
bail!("append_entries: invalid data size, len={}", data.data.len());
|
||||||
@ -263,12 +252,14 @@ impl FlowWrite for FlowStore {
|
|||||||
(chunk.start_index % self.config.batch_size as u64) as usize,
|
(chunk.start_index % self.config.batch_size as u64) as usize,
|
||||||
chunk.data,
|
chunk.data,
|
||||||
)?;
|
)?;
|
||||||
|
if self.seal_manager.seal_worker_available() {
|
||||||
completed_seals.into_iter().for_each(|x| {
|
completed_seals.into_iter().for_each(|x| {
|
||||||
to_seal_set.insert(
|
to_seal_set.insert(
|
||||||
chunk_index as usize * SEALS_PER_LOAD + x as usize,
|
chunk_index as usize * SEALS_PER_LOAD + x as usize,
|
||||||
*self.to_seal_version.read(),
|
self.seal_manager.to_seal_version(),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
batch_list.push((chunk_index, batch));
|
batch_list.push((chunk_index, batch));
|
||||||
}
|
}
|
||||||
@ -276,15 +267,14 @@ impl FlowWrite for FlowStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
|
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
|
||||||
let mut to_seal_set = self.to_seal_set.write();
|
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||||
let mut to_seal_version = self.to_seal_version.write();
|
|
||||||
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
|
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
|
||||||
|
|
||||||
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
|
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
|
||||||
*to_seal_version += 1;
|
let new_seal_version = self.seal_manager.inc_seal_version();
|
||||||
|
|
||||||
to_reseal.into_iter().for_each(|x| {
|
to_reseal.into_iter().for_each(|x| {
|
||||||
to_seal_set.insert(x, *to_seal_version);
|
to_seal_set.insert(x, new_seal_version);
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -296,7 +286,9 @@ impl FlowWrite for FlowStore {
|
|||||||
|
|
||||||
impl FlowSeal for FlowStore {
|
impl FlowSeal for FlowStore {
|
||||||
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
|
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
|
||||||
let to_seal_set = self.to_seal_set.read();
|
let to_seal_set = self.seal_manager.to_seal_set.read();
|
||||||
|
self.seal_manager.update_pull_time();
|
||||||
|
|
||||||
let mut to_seal_iter = to_seal_set.iter();
|
let mut to_seal_iter = to_seal_set.iter();
|
||||||
let (&first_index, &first_version) = try_option!(to_seal_iter.next());
|
let (&first_index, &first_version) = try_option!(to_seal_iter.next());
|
||||||
if first_index >= seal_index_max {
|
if first_index >= seal_index_max {
|
||||||
@ -330,7 +322,7 @@ impl FlowSeal for FlowStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
|
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
|
||||||
let mut to_seal_set = self.to_seal_set.write();
|
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||||
let is_consistent = |answer: &SealAnswer| {
|
let is_consistent = |answer: &SealAnswer| {
|
||||||
to_seal_set
|
to_seal_set
|
||||||
.get(&(answer.seal_index as usize))
|
.get(&(answer.seal_index as usize))
|
||||||
|
@ -269,8 +269,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
|
|
||||||
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
|
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
|
||||||
if self.check_tx_completed(old_tx_seq)? {
|
if self.check_tx_completed(old_tx_seq)? {
|
||||||
self.copy_tx_data(old_tx_seq, vec![tx.seq])?;
|
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
||||||
self.tx_store.finalize_tx(tx.seq)?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -293,7 +292,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
// Check if there are other same-root transaction not finalized.
|
// Check if there are other same-root transaction not finalized.
|
||||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||||
self.copy_tx_data(tx_seq, same_root_seq_list[1..].to_vec())?;
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||||
}
|
}
|
||||||
self.tx_store.finalize_tx(tx_seq)?;
|
self.tx_store.finalize_tx(tx_seq)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -329,7 +328,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
// Check if there are other same-root transaction not finalized.
|
// Check if there are other same-root transaction not finalized.
|
||||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||||
self.copy_tx_data(tx_seq, same_root_seq_list[1..].to_vec())?;
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||||
}
|
}
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
@ -1059,8 +1058,9 @@ impl LogManager {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn copy_tx_data(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
|
let shard_config = self.flow_store.get_shard_config();
|
||||||
// We have all the data need for this tx, so just copy them.
|
// We have all the data need for this tx, so just copy them.
|
||||||
let old_tx = self
|
let old_tx = self
|
||||||
.get_tx_by_seq_number(from_tx_seq)?
|
.get_tx_by_seq_number(from_tx_seq)?
|
||||||
@ -1074,6 +1074,12 @@ impl LogManager {
|
|||||||
let tx = self
|
let tx = self
|
||||||
.get_tx_by_seq_number(seq)?
|
.get_tx_by_seq_number(seq)?
|
||||||
.ok_or_else(|| anyhow!("to tx missing"))?;
|
.ok_or_else(|| anyhow!("to tx missing"))?;
|
||||||
|
// Data for `tx` is not available due to sharding.
|
||||||
|
if sector_to_segment(tx.start_entry_index) % shard_config.num_shard
|
||||||
|
!= sector_to_segment(old_tx.start_entry_index) % shard_config.num_shard
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
to_tx_offset_list.push((tx.seq, tx.start_entry_index - old_tx.start_entry_index));
|
to_tx_offset_list.push((tx.seq, tx.start_entry_index - old_tx.start_entry_index));
|
||||||
}
|
}
|
||||||
if to_tx_offset_list.is_empty() {
|
if to_tx_offset_list.is_empty() {
|
||||||
@ -1085,7 +1091,7 @@ impl LogManager {
|
|||||||
old_tx.start_entry_index,
|
old_tx.start_entry_index,
|
||||||
old_tx.start_entry_index + old_tx.num_entries() as u64,
|
old_tx.start_entry_index + old_tx.num_entries() as u64,
|
||||||
PORA_CHUNK_SIZE,
|
PORA_CHUNK_SIZE,
|
||||||
self.flow_store.get_shard_config(),
|
shard_config,
|
||||||
) {
|
) {
|
||||||
let batch_data = self
|
let batch_data = self
|
||||||
.get_chunk_by_flow_index(batch_start, batch_end - batch_start)?
|
.get_chunk_by_flow_index(batch_start, batch_end - batch_start)?
|
||||||
|
@ -15,6 +15,7 @@ pub mod config;
|
|||||||
mod flow_store;
|
mod flow_store;
|
||||||
mod load_chunk;
|
mod load_chunk;
|
||||||
pub mod log_manager;
|
pub mod log_manager;
|
||||||
|
mod seal_task_manager;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
pub mod tx_store;
|
pub mod tx_store;
|
||||||
@ -235,7 +236,7 @@ pub struct SealTask {
|
|||||||
/// The index (in seal) of chunks
|
/// The index (in seal) of chunks
|
||||||
pub seal_index: u64,
|
pub seal_index: u64,
|
||||||
/// An ephemeral version number to distinguish if revert happending
|
/// An ephemeral version number to distinguish if revert happending
|
||||||
pub version: usize,
|
pub version: u64,
|
||||||
/// The data to be sealed
|
/// The data to be sealed
|
||||||
pub non_sealed_data: [u8; BYTES_PER_SEAL],
|
pub non_sealed_data: [u8; BYTES_PER_SEAL],
|
||||||
}
|
}
|
||||||
@ -245,7 +246,7 @@ pub struct SealAnswer {
|
|||||||
/// The index (in seal) of chunks
|
/// The index (in seal) of chunks
|
||||||
pub seal_index: u64,
|
pub seal_index: u64,
|
||||||
/// An ephemeral version number to distinguish if revert happending
|
/// An ephemeral version number to distinguish if revert happending
|
||||||
pub version: usize,
|
pub version: u64,
|
||||||
/// The data to be sealed
|
/// The data to be sealed
|
||||||
pub sealed_data: [u8; BYTES_PER_SEAL],
|
pub sealed_data: [u8; BYTES_PER_SEAL],
|
||||||
/// The miner Id
|
/// The miner Id
|
||||||
|
69
node/storage/src/log_store/seal_task_manager.rs
Normal file
69
node/storage/src/log_store/seal_task_manager.rs
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
use std::{
|
||||||
|
collections::BTreeMap,
|
||||||
|
sync::atomic::{AtomicU64, Ordering},
|
||||||
|
time::{SystemTime, UNIX_EPOCH},
|
||||||
|
};
|
||||||
|
|
||||||
|
use parking_lot::RwLock;
|
||||||
|
use zgs_spec::SEALS_PER_LOAD;
|
||||||
|
|
||||||
|
pub struct SealTaskManager {
|
||||||
|
// TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk.
|
||||||
|
pub to_seal_set: RwLock<BTreeMap<usize, u64>>,
|
||||||
|
// Data sealing is an asynchronized process.
|
||||||
|
// The sealing service uses the version number to distinguish if revert happens during sealing.
|
||||||
|
to_seal_version: AtomicU64,
|
||||||
|
last_pull_time: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SealTaskManager {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
to_seal_set: Default::default(),
|
||||||
|
to_seal_version: Default::default(),
|
||||||
|
last_pull_time: AtomicU64::new(current_timestamp()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current_timestamp() -> u64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("unexpected negative timestamp")
|
||||||
|
.as_secs()
|
||||||
|
}
|
||||||
|
|
||||||
|
const SEAL_TASK_PULL_TIMEOUT_SECONDS: u64 = 300;
|
||||||
|
|
||||||
|
impl SealTaskManager {
|
||||||
|
pub fn delete_batch_list(&self, batch_list: &[u64]) {
|
||||||
|
let mut to_seal_set = self.to_seal_set.write();
|
||||||
|
for batch_index in batch_list {
|
||||||
|
for seal_index in (*batch_index as usize) * SEALS_PER_LOAD
|
||||||
|
..(*batch_index as usize + 1) * SEALS_PER_LOAD
|
||||||
|
{
|
||||||
|
to_seal_set.remove(&seal_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record the latest timestamp that the miner thread pull seal tasks from the seal status.
|
||||||
|
pub fn update_pull_time(&self) {
|
||||||
|
// Here we only need an approximate timestamp and can tolerate a few seconds of error, so we used Ordering::Relaxed
|
||||||
|
self.last_pull_time
|
||||||
|
.store(current_timestamp(), Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn seal_worker_available(&self) -> bool {
|
||||||
|
let last_pull_time = self.last_pull_time.load(Ordering::Relaxed);
|
||||||
|
current_timestamp().saturating_sub(last_pull_time) < SEAL_TASK_PULL_TIMEOUT_SECONDS
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_seal_version(&self) -> u64 {
|
||||||
|
self.to_seal_version.load(Ordering::Acquire)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn inc_seal_version(&self) -> u64 {
|
||||||
|
self.to_seal_version.fetch_add(1, Ordering::AcqRel) + 1
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user