add pad data store

This commit is contained in:
Peter Zhang 2024-11-06 21:53:53 +08:00
parent ce7a320098
commit e946400b82
4 changed files with 84 additions and 7 deletions

View File

@ -277,6 +277,8 @@ impl LogSyncManager {
.remove_finalized_block_interval_minutes,
);
// start the pad data store
let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel();
let watch_rx = log_sync_manager.log_fetcher.start_watch(

View File

@ -1,4 +1,5 @@
use super::load_chunk::EntryBatch;
use super::log_manager::{COL_PAD_DATA_LIST, COL_PAD_DATA_SYNC_HEIGH};
use super::seal_task_manager::SealTaskManager;
use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
@ -343,6 +344,35 @@ impl FlowSeal for FlowStore {
}
}
pub struct PadDataStore {
flow_db: Arc<FlowDBStore>,
data_db: Arc<FlowDBStore>,
}
impl PadDataStore {
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>) -> Self {
Self { flow_db, data_db }
}
pub fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> {
self.flow_db.put_pad_data(data_size, start_index)?;
Ok(())
}
pub fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> {
self.data_db.put_pad_data_sync_height(sync_index)?;
Ok(())
}
pub fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
self.data_db.get_pad_data_sync_heigh()
}
pub fn get_pad_data(&self, start_index: u64) -> Result<Option<usize>> {
self.flow_db.get_pad_data(start_index)
}
}
pub struct FlowDBStore {
kvdb: Arc<dyn ZgsKeyValueDB>,
}
@ -443,6 +473,49 @@ impl FlowDBStore {
}
Ok(self.kvdb.write(tx)?)
}
fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
tx.put(
COL_PAD_DATA_LIST,
&start_index.to_be_bytes(),
&data_size.to_be_bytes(),
);
self.kvdb.write(tx)?;
Ok(())
}
fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
tx.put(
COL_PAD_DATA_SYNC_HEIGH,
b"sync_height",
&sync_index.to_be_bytes(),
);
self.kvdb.write(tx)?;
Ok(())
}
fn get_pad_data_sync_heigh(&self) -> Result<Option<u64>> {
match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
Some(v) => Ok(Some(u64::from_be_bytes(
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
))),
None => Ok(None),
}
}
fn get_pad_data(&self, start_index: u64) -> Result<Option<usize>> {
match self
.kvdb
.get(COL_PAD_DATA_LIST, &start_index.to_be_bytes())?
{
Some(v) => Ok(Some(usize::from_be_bytes(
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
))),
None => Ok(None),
}
}
}
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]

View File

@ -1,7 +1,9 @@
use super::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
use crate::log_store::flow_store::{
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadDataStore,
};
use crate::log_store::tx_store::TransactionStore;
use crate::log_store::{
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
@ -43,6 +45,8 @@ pub const COL_SEAL_CONTEXT: u32 = 6;
pub const COL_FLOW_MPT_NODES: u32 = 7;
pub const COL_BLOCK_PROGRESS: u32 = 8;
pub const COL_NUM: u32 = 9;
pub const COL_PAD_DATA_LIST: u32 = 10;
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 11;
// Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20;
@ -61,7 +65,7 @@ pub struct UpdateFlowMessage {
}
pub struct LogManager {
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
pub(crate) pad_db: Arc<PadDataStore>,
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore,
flow_store: Arc<FlowStore>,
@ -640,6 +644,7 @@ impl LogManager {
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
let pad_db = Arc::new(PadDataStore::new(flow_db.clone(), data_db.clone()));
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
// first and call `put_tx` later.
let next_tx_seq = tx_store.next_tx_seq();
@ -743,7 +748,7 @@ impl LogManager {
let (sender, receiver) = mpsc::channel();
let mut log_manager = Self {
flow_db: flow_db_source,
pad_db,
data_db: data_db_source,
tx_store,
flow_store,
@ -956,10 +961,6 @@ impl LogManager {
let data_size = pad_data.len() / ENTRY_SIZE;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
pad_data: pad_data.len(),
tx_start_flow_index,
})?;
} else {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save

View File

@ -15,6 +15,7 @@ pub mod config;
mod flow_store;
mod load_chunk;
pub mod log_manager;
mod pad_data_store;
mod seal_task_manager;
#[cfg(test)]
mod tests;