diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index ff7a52e..3c00c69 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -112,8 +112,13 @@ impl ClientBuilder { pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result { let executor = require!("sync", self, runtime_context).clone().executor; let store = Arc::new( - LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor) - .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, + LogManager::rocksdb( + config.log_config.clone(), + &config.db_dir.join("flow_db"), + &config.db_dir.join("data_db"), + executor, + ) + .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, ); self.store = Some(store.clone()); diff --git a/node/storage/benches/benchmark.rs b/node/storage/benches/benchmark.rs index 43fce03..4516c9b 100644 --- a/node/storage/benches/benchmark.rs +++ b/node/storage/benches/benchmark.rs @@ -25,9 +25,14 @@ fn write_performance(c: &mut Criterion) { let executor = runtime.task_executor.clone(); let store: Arc> = Arc::new(RwLock::new( - LogManager::rocksdb(LogConfig::default(), "db_write", executor) - .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) - .unwrap(), + LogManager::rocksdb( + LogConfig::default(), + "db_flow_write", + "db_data_write", + executor, + ) + .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) + .unwrap(), )); let chunk_count = 2048; @@ -114,9 +119,14 @@ fn read_performance(c: &mut Criterion) { let executor = runtime.task_executor.clone(); let store: Arc> = Arc::new(RwLock::new( - LogManager::rocksdb(LogConfig::default(), "db_read", executor) - .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) - .unwrap(), + LogManager::rocksdb( + LogConfig::default(), + "db_flow_read", + "db_data_read", + executor, + ) + .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) + .unwrap(), )); let tx_size = 1000; diff --git a/node/storage/src/log_store/config.rs b/node/storage/src/log_store/config.rs index ace314a..ef9fc18 100644 --- a/node/storage/src/log_store/config.rs +++ b/node/storage/src/log_store/config.rs @@ -63,22 +63,22 @@ impl ConfigurableExt for T {} impl Configurable for LogManager { fn get_config(&self, key: &[u8]) -> Result>> { - Ok(self.db.get(COL_MISC, key)?) + Ok(self.flow_db.get(COL_MISC, key)?) } fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.db.put(COL_MISC, key, value)?; + self.flow_db.put(COL_MISC, key, value)?; Ok(()) } fn remove_config(&self, key: &[u8]) -> Result<()> { - Ok(self.db.delete(COL_MISC, key)?) + Ok(self.flow_db.delete(COL_MISC, key)?) } fn exec_configs(&self, tx: ConfigTx) -> Result<()> { - let mut db_tx = self.db.transaction(); + let mut db_tx = self.flow_db.transaction(); db_tx.ops = tx.ops; - self.db.write(db_tx)?; + self.flow_db.write(db_tx)?; Ok(()) } diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 88b11e9..ce98540 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -27,22 +27,24 @@ use tracing::{debug, error, trace}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; pub struct FlowStore { - db: Arc, + flow_db: Arc, + data_db: Arc, seal_manager: SealTaskManager, config: FlowConfig, } impl FlowStore { - pub fn new(db: Arc, config: FlowConfig) -> Self { + pub fn new(flow_db: Arc, data_db: Arc, config: FlowConfig) -> Self { Self { - db, + flow_db, + data_db, seal_manager: Default::default(), config, } } pub fn put_batch_root_list(&self, root_map: BTreeMap) -> Result<()> { - self.db.put_batch_root_list(root_map) + self.data_db.put_batch_root_list(root_map) } pub fn insert_subtree_list_for_batch( @@ -51,18 +53,19 @@ impl FlowStore { subtree_list: Vec<(usize, usize, DataRoot)>, ) -> Result<()> { let mut batch = self - .db + .data_db .get_entry_batch(batch_index as u64)? .unwrap_or_else(|| EntryBatch::new(batch_index as u64)); batch.set_subtree_list(subtree_list); - self.db.put_entry_raw(vec![(batch_index as u64, batch)])?; + self.data_db + .put_entry_raw(vec![(batch_index as u64, batch)])?; Ok(()) } pub fn gen_proof_in_batch(&self, batch_index: usize, sector_index: usize) -> Result { let batch = self - .db + .data_db .get_entry_batch(batch_index as u64)? .ok_or_else(|| anyhow!("batch missing, index={}", batch_index))?; let merkle = batch.to_merkle_tree(batch_index == 0)?.ok_or_else(|| { @@ -75,20 +78,20 @@ impl FlowStore { } pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> { - self.db.put_mpt_node_list(node_list) + self.flow_db.put_mpt_node_list(node_list) } pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> { self.seal_manager.delete_batch_list(batch_list); - self.db.delete_batch_list(batch_list) + self.data_db.delete_batch_list(batch_list) } pub fn get_raw_batch(&self, batch_index: u64) -> Result> { - self.db.get_entry_batch(batch_index) + self.data_db.get_entry_batch(batch_index) } pub fn get_batch_root(&self, batch_index: u64) -> Result> { - self.db.get_batch_root(batch_index) + self.data_db.get_batch_root(batch_index) } } @@ -134,7 +137,7 @@ impl FlowRead for FlowStore { length -= 1; } - let entry_batch = try_option!(self.db.get_entry_batch(chunk_index)?); + let entry_batch = try_option!(self.data_db.get_entry_batch(chunk_index)?); let mut entry_batch_data = try_option!(entry_batch.get_unsealed_data(offset as usize, length as usize)); data.append(&mut entry_batch_data); @@ -163,7 +166,7 @@ impl FlowRead for FlowStore { let chunk_index = start_entry_index / self.config.batch_size as u64; if let Some(mut data_list) = self - .db + .data_db .get_entry_batch(chunk_index)? .map(|b| b.into_data_list(start_entry_index)) { @@ -189,11 +192,11 @@ impl FlowRead for FlowStore { /// Return the list of all stored chunk roots. fn get_chunk_root_list(&self) -> Result> { - self.db.get_batch_root_list() + self.data_db.get_batch_root_list() } fn load_sealed_data(&self, chunk_index: u64) -> Result> { - let batch = try_option!(self.db.get_entry_batch(chunk_index)?); + let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?); let mut mine_chunk = MineLoadChunk::default(); for (seal_index, (sealed, validity)) in mine_chunk .loaded_chunk @@ -211,7 +214,7 @@ impl FlowRead for FlowStore { fn get_num_entries(&self) -> Result { // This is an over-estimation as it assumes each batch is full. - self.db + self.data_db .kvdb .num_keys(COL_ENTRY_BATCH) .map(|num_batches| num_batches * PORA_CHUNK_SIZE as u64) @@ -251,7 +254,7 @@ impl FlowWrite for FlowStore { // TODO: Try to avoid loading from db if possible. let mut batch = self - .db + .data_db .get_entry_batch(chunk_index)? .unwrap_or_else(|| EntryBatch::new(chunk_index)); let completed_seals = batch.insert_data( @@ -269,12 +272,12 @@ impl FlowWrite for FlowStore { batch_list.push((chunk_index, batch)); } - self.db.put_entry_batch_list(batch_list) + self.data_db.put_entry_batch_list(batch_list) } fn truncate(&self, start_index: u64) -> crate::error::Result<()> { let mut to_seal_set = self.seal_manager.to_seal_set.write(); - let to_reseal = self.db.truncate(start_index, self.config.batch_size)?; + let to_reseal = self.data_db.truncate(start_index, self.config.batch_size)?; to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL)); let new_seal_version = self.seal_manager.inc_seal_version(); @@ -304,7 +307,7 @@ impl FlowSeal for FlowStore { let mut tasks = Vec::with_capacity(SEALS_PER_LOAD); let batch_data = self - .db + .data_db .get_entry_batch((first_index / SEALS_PER_LOAD) as u64)? .expect("Lost data chunk in to_seal_set"); @@ -343,7 +346,7 @@ impl FlowSeal for FlowStore { .chunk_by(|answer| answer.seal_index / SEALS_PER_LOAD as u64) { let mut batch_chunk = self - .db + .data_db .get_entry_batch(load_index)? .expect("Can not find chunk data"); for answer in answers_in_chunk { @@ -359,7 +362,7 @@ impl FlowSeal for FlowStore { to_seal_set.remove(&idx); } - self.db.put_entry_raw(updated_chunk)?; + self.data_db.put_entry_raw(updated_chunk)?; Ok(()) } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 118bedb..15d0f10 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -62,7 +62,7 @@ pub struct UpdateFlowMessage { } pub struct LogManager { - pub(crate) db: Arc, + pub(crate) flow_db: Arc, tx_store: TransactionStore, flow_store: Arc, merkle: RwLock, @@ -615,28 +615,37 @@ impl LogStoreRead for LogManager { impl LogManager { pub fn rocksdb( config: LogConfig, - path: impl AsRef, + flow_path: impl AsRef, + data_path: impl AsRef, executor: task_executor::TaskExecutor, ) -> Result { let mut db_config = DatabaseConfig::with_columns(COL_NUM); db_config.enable_statistics = true; - let db = Arc::new(Database::open(&db_config, path)?); - Self::new(db, config, executor) + let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?); + let data_db_source = Arc::new(Database::open(&db_config, data_path)?); + Self::new(flow_db_source, data_db_source, config, executor) } pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result { - let db = Arc::new(kvdb_memorydb::create(COL_NUM)); - Self::new(db, config, executor) + let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM)); + let data_db = Arc::new(kvdb_memorydb::create(COL_NUM)); + Self::new(flow_db, data_db, config, executor) } fn new( - db: Arc, + flow_db_source: Arc, + data_db_source: Arc, config: LogConfig, executor: task_executor::TaskExecutor, ) -> Result { - let tx_store = TransactionStore::new(db.clone())?; - let flow_db = Arc::new(FlowDBStore::new(db.clone())); - let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone())); + let tx_store = TransactionStore::new(flow_db_source.clone())?; + let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); + let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); + let flow_store = Arc::new(FlowStore::new( + flow_db.clone(), + data_db.clone(), + config.flow.clone(), + )); // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle` // first and call `put_tx` later. let next_tx_seq = tx_store.next_tx_seq(); @@ -740,7 +749,7 @@ impl LogManager { let (sender, receiver) = mpsc::channel(); let mut log_manager = Self { - db, + flow_db: flow_db_source, tx_store, flow_store, merkle,