From ce7a32009899da6c0bb66eca05cba28a810d57c1 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Wed, 6 Nov 2024 18:43:41 +0800 Subject: [PATCH 1/2] do not pad tx during sync phase --- node/storage/src/log_store/config.rs | 10 +++++----- node/storage/src/log_store/log_manager.rs | 4 +++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/node/storage/src/log_store/config.rs b/node/storage/src/log_store/config.rs index ef9fc18..ef30c66 100644 --- a/node/storage/src/log_store/config.rs +++ b/node/storage/src/log_store/config.rs @@ -63,22 +63,22 @@ impl ConfigurableExt for T {} impl Configurable for LogManager { fn get_config(&self, key: &[u8]) -> Result>> { - Ok(self.flow_db.get(COL_MISC, key)?) + Ok(self.data_db.get(COL_MISC, key)?) } fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.flow_db.put(COL_MISC, key, value)?; + self.data_db.put(COL_MISC, key, value)?; Ok(()) } fn remove_config(&self, key: &[u8]) -> Result<()> { - Ok(self.flow_db.delete(COL_MISC, key)?) + Ok(self.data_db.delete(COL_MISC, key)?) } fn exec_configs(&self, tx: ConfigTx) -> Result<()> { - let mut db_tx = self.flow_db.transaction(); + let mut db_tx = self.data_db.transaction(); db_tx.ops = tx.ops; - self.flow_db.write(db_tx)?; + self.data_db.write(db_tx)?; Ok(()) } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 5ef2fa9..fb2a122 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -62,6 +62,7 @@ pub struct UpdateFlowMessage { pub struct LogManager { pub(crate) flow_db: Arc, + pub(crate) data_db: Arc, tx_store: TransactionStore, flow_store: Arc, merkle: RwLock, @@ -635,7 +636,7 @@ impl LogManager { config: LogConfig, executor: task_executor::TaskExecutor, ) -> Result { - let tx_store = TransactionStore::new(flow_db_source.clone())?; + let tx_store = TransactionStore::new(data_db_source.clone())?; let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone())); @@ -743,6 +744,7 @@ impl LogManager { let mut log_manager = Self { flow_db: flow_db_source, + data_db: data_db_source, tx_store, flow_store, merkle, From e946400b82cc93688004a2bea974d074124158c9 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Wed, 6 Nov 2024 21:53:53 +0800 Subject: [PATCH 2/2] add pad data store --- node/log_entry_sync/src/sync_manager/mod.rs | 2 + node/storage/src/log_store/flow_store.rs | 73 +++++++++++++++++++++ node/storage/src/log_store/log_manager.rs | 15 +++-- node/storage/src/log_store/mod.rs | 1 + 4 files changed, 84 insertions(+), 7 deletions(-) diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index 8c07d8d..d665eee 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -277,6 +277,8 @@ impl LogSyncManager { .remove_finalized_block_interval_minutes, ); + // start the pad data store + let (watch_progress_tx, watch_progress_rx) = tokio::sync::mpsc::unbounded_channel(); let watch_rx = log_sync_manager.log_fetcher.start_watch( diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index ed41963..386c76f 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -1,4 +1,5 @@ use super::load_chunk::EntryBatch; +use super::log_manager::{COL_PAD_DATA_LIST, COL_PAD_DATA_SYNC_HEIGH}; use super::seal_task_manager::SealTaskManager; use super::{MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; @@ -343,6 +344,35 @@ impl FlowSeal for FlowStore { } } +pub struct PadDataStore { + flow_db: Arc, + data_db: Arc, +} + +impl PadDataStore { + pub fn new(flow_db: Arc, data_db: Arc) -> Self { + Self { flow_db, data_db } + } + + pub fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> { + self.flow_db.put_pad_data(data_size, start_index)?; + Ok(()) + } + + pub fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> { + self.data_db.put_pad_data_sync_height(sync_index)?; + Ok(()) + } + + pub fn get_pad_data_sync_height(&self) -> Result> { + self.data_db.get_pad_data_sync_heigh() + } + + pub fn get_pad_data(&self, start_index: u64) -> Result> { + self.flow_db.get_pad_data(start_index) + } +} + pub struct FlowDBStore { kvdb: Arc, } @@ -443,6 +473,49 @@ impl FlowDBStore { } Ok(self.kvdb.write(tx)?) } + + fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> { + let mut tx = self.kvdb.transaction(); + tx.put( + COL_PAD_DATA_LIST, + &start_index.to_be_bytes(), + &data_size.to_be_bytes(), + ); + self.kvdb.write(tx)?; + Ok(()) + } + + fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> { + let mut tx = self.kvdb.transaction(); + tx.put( + COL_PAD_DATA_SYNC_HEIGH, + b"sync_height", + &sync_index.to_be_bytes(), + ); + self.kvdb.write(tx)?; + Ok(()) + } + + fn get_pad_data_sync_heigh(&self) -> Result> { + match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? { + Some(v) => Ok(Some(u64::from_be_bytes( + v.try_into().map_err(|e| anyhow!("{:?}", e))?, + ))), + None => Ok(None), + } + } + + fn get_pad_data(&self, start_index: u64) -> Result> { + match self + .kvdb + .get(COL_PAD_DATA_LIST, &start_index.to_be_bytes())? + { + Some(v) => Ok(Some(usize::from_be_bytes( + v.try_into().map_err(|e| anyhow!("{:?}", e))?, + ))), + None => Ok(None), + } + } } #[derive(DeriveEncode, DeriveDecode, Clone, Debug)] diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index fb2a122..7beb3aa 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1,7 +1,9 @@ use super::tx_store::BlockHashAndSubmissionIndex; use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; -use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore}; +use crate::log_store::flow_store::{ + batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadDataStore, +}; use crate::log_store::tx_store::TransactionStore; use crate::log_store::{ FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite, @@ -43,6 +45,8 @@ pub const COL_SEAL_CONTEXT: u32 = 6; pub const COL_FLOW_MPT_NODES: u32 = 7; pub const COL_BLOCK_PROGRESS: u32 = 8; pub const COL_NUM: u32 = 9; +pub const COL_PAD_DATA_LIST: u32 = 10; +pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 11; // Process at most 1M entries (256MB) pad data at a time. const PAD_MAX_SIZE: usize = 1 << 20; @@ -61,7 +65,7 @@ pub struct UpdateFlowMessage { } pub struct LogManager { - pub(crate) flow_db: Arc, + pub(crate) pad_db: Arc, pub(crate) data_db: Arc, tx_store: TransactionStore, flow_store: Arc, @@ -640,6 +644,7 @@ impl LogManager { let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone())); + let pad_db = Arc::new(PadDataStore::new(flow_db.clone(), data_db.clone())); // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle` // first and call `put_tx` later. let next_tx_seq = tx_store.next_tx_seq(); @@ -743,7 +748,7 @@ impl LogManager { let (sender, receiver) = mpsc::channel(); let mut log_manager = Self { - flow_db: flow_db_source, + pad_db, data_db: data_db_source, tx_store, flow_store, @@ -956,10 +961,6 @@ impl LogManager { let data_size = pad_data.len() / ENTRY_SIZE; if is_full_empty { - self.sender.send(UpdateFlowMessage { - pad_data: pad_data.len(), - tx_start_flow_index, - })?; } else { // Update the flow database. // This should be called before `complete_last_chunk_merkle` so that we do not save diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 5f74a10..99389f1 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -15,6 +15,7 @@ pub mod config; mod flow_store; mod load_chunk; pub mod log_manager; +mod pad_data_store; mod seal_task_manager; #[cfg(test)] mod tests;