diff --git a/node/miner/src/miner_id.rs b/node/miner/src/miner_id.rs index bf638d5..16f692f 100644 --- a/node/miner/src/miner_id.rs +++ b/node/miner/src/miner_id.rs @@ -5,17 +5,20 @@ use ethereum_types::Address; use ethers::contract::ContractCall; use ethers::contract::EthEvent; use std::sync::Arc; +use storage::log_store::log_manager::DATA_DB_KEY; use storage::H256; use storage_async::Store; const MINER_ID: &str = "mine.miner_id"; pub async fn load_miner_id(store: &Store) -> storage::error::Result> { - store.get_config_decoded(&MINER_ID).await + store.get_config_decoded(&MINER_ID, DATA_DB_KEY).await } async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> { - store.set_config_encoded(&MINER_ID, miner_id).await + store + .set_config_encoded(&MINER_ID, miner_id, DATA_DB_KEY) + .await } pub(crate) async fn check_and_request_miner_id( diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index 0a1e33a..cbcc187 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -11,7 +11,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; -use storage::log_store::log_manager::PORA_CHUNK_SIZE; +use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE}; use storage_async::Store; use task_executor::TaskExecutor; use tokio::sync::{broadcast, mpsc}; @@ -252,7 +252,7 @@ impl Pruner { .update_shard_config(self.config.shard_config) .await; self.store - .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) + .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config, DATA_DB_KEY) .await } @@ -265,17 +265,22 @@ impl Pruner { .set_config_encoded( &FIRST_REWARDABLE_CHUNK_KEY, &(new_first_rewardable_chunk, new_first_tx_seq), + FLOW_DB_KEY, ) .await } } async fn get_shard_config(store: &Store) -> Result> { - store.get_config_decoded(&SHARD_CONFIG_KEY).await + store + .get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY) + .await } async fn get_first_rewardable_chunk(store: &Store) -> Result> { - store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await + store + .get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY) + .await } #[derive(Debug)] diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 95cf6b5..b564f42 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -74,9 +74,11 @@ impl Store { pub async fn get_config_decoded + Send + Sync, T: Decode + Send + 'static>( &self, key: &K, + dest: &str, ) -> Result> { let key = key.as_ref().to_vec(); - self.spawn(move |store| store.get_config_decoded(&key)) + let dest = dest.to_string(); + self.spawn(move |store| store.get_config_decoded(&key, &dest)) .await } @@ -84,10 +86,12 @@ impl Store { &self, key: &K, value: &T, + dest: &str, ) -> anyhow::Result<()> { let key = key.as_ref().to_vec(); let value = value.as_ssz_bytes(); - self.spawn(move |store| store.set_config(&key, &value)) + let dest = dest.to_string(); + self.spawn(move |store| store.set_config(&key, &value, &dest)) .await } diff --git a/node/storage/src/log_store/config.rs b/node/storage/src/log_store/config.rs index ef30c66..b47f47f 100644 --- a/node/storage/src/log_store/config.rs +++ b/node/storage/src/log_store/config.rs @@ -2,16 +2,55 @@ use anyhow::{anyhow, Result}; use kvdb::{DBKey, DBOp}; use ssz::{Decode, Encode}; +use crate::log_store::log_manager::{COL_MISC, DATA_DB_KEY, FLOW_DB_KEY}; use crate::LogManager; -use super::log_manager::COL_MISC; +macro_rules! db_operation { + ($self:expr, $dest:expr, get, $key:expr) => {{ + let db = match $dest { + DATA_DB_KEY => &$self.data_db, + FLOW_DB_KEY => &$self.flow_db, + _ => return Err(anyhow!("Invalid destination")), + }; + Ok(db.get(COL_MISC, $key)?) + }}; + + ($self:expr, $dest:expr, put, $key:expr, $value:expr) => {{ + let db = match $dest { + DATA_DB_KEY => &$self.data_db, + FLOW_DB_KEY => &$self.flow_db, + _ => return Err(anyhow!("Invalid destination")), + }; + Ok(db.put(COL_MISC, $key, $value)?) + }}; + + ($self:expr, $dest:expr, delete, $key:expr) => {{ + let db = match $dest { + DATA_DB_KEY => &$self.data_db, + FLOW_DB_KEY => &$self.flow_db, + _ => return Err(anyhow!("Invalid destination")), + }; + Ok(db.delete(COL_MISC, $key)?) + }}; + + ($self:expr, $dest:expr, transaction, $tx:expr) => {{ + let db = match $dest { + DATA_DB_KEY => &$self.data_db, + FLOW_DB_KEY => &$self.flow_db, + _ => return Err(anyhow!("Invalid destination")), + }; + let mut db_tx = db.transaction(); + db_tx.ops = $tx.ops; + Ok(db.write(db_tx)?) + }}; +} pub trait Configurable { - fn get_config(&self, key: &[u8]) -> Result>>; - fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()>; - fn remove_config(&self, key: &[u8]) -> Result<()>; + fn get_config(&self, key: &[u8], dest: &str) -> Result>>; + fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()>; + fn remove_config(&self, key: &[u8], dest: &str) -> Result<()>; - fn exec_configs(&self, tx: ConfigTx) -> Result<()>; + fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()>; } #[derive(Default)] @@ -41,8 +80,12 @@ impl ConfigTx { } pub trait ConfigurableExt: Configurable { - fn get_config_decoded, T: Decode>(&self, key: &K) -> Result> { - match self.get_config(key.as_ref())? { + fn get_config_decoded, T: Decode>( + &self, + key: &K, + dest: &str, + ) -> Result> { + match self.get_config(key.as_ref(), dest)? { Some(val) => Ok(Some( T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?, )), @@ -50,36 +93,36 @@ pub trait ConfigurableExt: Configurable { } } - fn set_config_encoded, T: Encode>(&self, key: &K, value: &T) -> Result<()> { - self.set_config(key.as_ref(), &value.as_ssz_bytes()) + fn set_config_encoded, T: Encode>( + &self, + key: &K, + value: &T, + dest: &str, + ) -> Result<()> { + self.set_config(key.as_ref(), &value.as_ssz_bytes(), dest) } - fn remove_config_by_key>(&self, key: &K) -> Result<()> { - self.remove_config(key.as_ref()) + fn remove_config_by_key>(&self, key: &K, dest: &str) -> Result<()> { + self.remove_config(key.as_ref(), dest) } } impl ConfigurableExt for T {} impl Configurable for LogManager { - fn get_config(&self, key: &[u8]) -> Result>> { - Ok(self.data_db.get(COL_MISC, key)?) + fn get_config(&self, key: &[u8], dest: &str) -> Result>> { + db_operation!(self, dest, get, key) } - fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.data_db.put(COL_MISC, key, value)?; - Ok(()) + fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()> { + db_operation!(self, dest, put, key, value) } - fn remove_config(&self, key: &[u8]) -> Result<()> { - Ok(self.data_db.delete(COL_MISC, key)?) + fn remove_config(&self, key: &[u8], dest: &str) -> Result<()> { + db_operation!(self, dest, delete, key) } - fn exec_configs(&self, tx: ConfigTx) -> Result<()> { - let mut db_tx = self.data_db.transaction(); - db_tx.ops = tx.ops; - self.data_db.write(db_tx)?; - - Ok(()) + fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()> { + db_operation!(self, dest, transaction, tx) } } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 6d165a7..929b912 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -44,6 +44,9 @@ pub const COL_PAD_DATA_LIST: u32 = 7; // flow db pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db pub const COL_NUM: u32 = 9; +pub const DATA_DB_KEY: &str = "data_db"; +pub const FLOW_DB_KEY: &str = "flow_db"; + // Process at most 1M entries (256MB) pad data at a time. const PAD_MAX_SIZE: usize = 1 << 20; @@ -61,6 +64,7 @@ pub struct UpdateFlowMessage { } pub struct LogManager { + pub(crate) flow_db: Arc, pub(crate) data_db: Arc, tx_store: TransactionStore, flow_store: Arc, @@ -781,6 +785,7 @@ impl LogManager { }); let log_manager = Self { + flow_db: flow_db_source, data_db: data_db_source, tx_store, flow_store, diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index 825b858..0dcb5d5 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -1,7 +1,10 @@ use super::tx_store::TxStore; use anyhow::Result; use std::sync::Arc; -use storage::log_store::config::{ConfigTx, ConfigurableExt}; +use storage::log_store::{ + config::{ConfigTx, ConfigurableExt}, + log_manager::DATA_DB_KEY, +}; use storage_async::Store; use tokio::sync::RwLock; @@ -66,10 +69,10 @@ impl SyncStore { let store = async_store.get_store(); // load next_tx_seq - let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?; + let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)?; // load max_tx_seq - let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ)?; + let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ, DATA_DB_KEY)?; Ok((next_tx_seq, max_tx_seq)) } @@ -77,13 +80,13 @@ impl SyncStore { pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> { let async_store = self.store.write().await; let store = async_store.get_store(); - store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq) + store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq, DATA_DB_KEY) } pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> { let async_store = self.store.write().await; let store = async_store.get_store(); - store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq) + store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq, DATA_DB_KEY) } pub async fn contains(&self, tx_seq: u64) -> Result> { @@ -114,7 +117,7 @@ impl SyncStore { } let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?; - store.exec_configs(tx)?; + store.exec_configs(tx, DATA_DB_KEY)?; if removed { Ok(InsertResult::Upgraded) @@ -128,7 +131,7 @@ impl SyncStore { } let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?; - store.exec_configs(tx)?; + store.exec_configs(tx, DATA_DB_KEY)?; if removed { Ok(InsertResult::Downgraded) @@ -151,7 +154,7 @@ impl SyncStore { let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?; - store.exec_configs(tx)?; + store.exec_configs(tx, DATA_DB_KEY)?; Ok(added) } diff --git a/node/sync/src/auto_sync/tx_store.rs b/node/sync/src/auto_sync/tx_store.rs index 3c0de45..9c2ac27 100644 --- a/node/sync/src/auto_sync/tx_store.rs +++ b/node/sync/src/auto_sync/tx_store.rs @@ -1,6 +1,7 @@ use anyhow::Result; use rand::Rng; use storage::log_store::config::{ConfigTx, ConfigurableExt}; +use storage::log_store::log_manager::DATA_DB_KEY; use storage::log_store::Store; /// TxStore is used to store pending transactions that to be synchronized in advance. @@ -32,11 +33,11 @@ impl TxStore { } fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result> { - store.get_config_decoded(&self.key_seq_to_index(tx_seq)) + store.get_config_decoded(&self.key_seq_to_index(tx_seq), DATA_DB_KEY) } fn at(&self, store: &dyn Store, index: usize) -> Result> { - store.get_config_decoded(&self.key_index_to_seq(index)) + store.get_config_decoded(&self.key_index_to_seq(index), DATA_DB_KEY) } pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result { @@ -45,7 +46,7 @@ impl TxStore { pub fn count(&self, store: &dyn Store) -> Result { store - .get_config_decoded(&self.key_count) + .get_config_decoded(&self.key_count, DATA_DB_KEY) .map(|x| x.unwrap_or(0)) } @@ -70,7 +71,7 @@ impl TxStore { if let Some(db_tx) = db_tx { db_tx.append(&mut tx); } else { - store.exec_configs(tx)?; + store.exec_configs(tx, DATA_DB_KEY)?; } Ok(true) @@ -130,7 +131,7 @@ impl TxStore { if let Some(db_tx) = db_tx { db_tx.append(&mut tx); } else { - store.exec_configs(tx)?; + store.exec_configs(tx, DATA_DB_KEY)?; } Ok(true)