split misc

This commit is contained in:
Peter Zhang 2024-11-07 20:09:58 +08:00
parent 3b4ed1502b
commit b708ff2f34
7 changed files with 109 additions and 45 deletions

View File

@ -5,17 +5,20 @@ use ethereum_types::Address;
use ethers::contract::ContractCall; use ethers::contract::ContractCall;
use ethers::contract::EthEvent; use ethers::contract::EthEvent;
use std::sync::Arc; use std::sync::Arc;
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::H256; use storage::H256;
use storage_async::Store; use storage_async::Store;
const MINER_ID: &str = "mine.miner_id"; const MINER_ID: &str = "mine.miner_id";
pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> { pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
store.get_config_decoded(&MINER_ID).await store.get_config_decoded(&MINER_ID, DATA_DB_KEY).await
} }
async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> { async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
store.set_config_encoded(&MINER_ID, miner_id).await store
.set_config_encoded(&MINER_ID, miner_id, DATA_DB_KEY)
.await
} }
pub(crate) async fn check_and_request_miner_id( pub(crate) async fn check_and_request_miner_id(

View File

@ -11,7 +11,7 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
use storage::log_store::log_manager::PORA_CHUNK_SIZE; use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE};
use storage_async::Store; use storage_async::Store;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
@ -252,7 +252,7 @@ impl Pruner {
.update_shard_config(self.config.shard_config) .update_shard_config(self.config.shard_config)
.await; .await;
self.store self.store
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config, DATA_DB_KEY)
.await .await
} }
@ -265,17 +265,22 @@ impl Pruner {
.set_config_encoded( .set_config_encoded(
&FIRST_REWARDABLE_CHUNK_KEY, &FIRST_REWARDABLE_CHUNK_KEY,
&(new_first_rewardable_chunk, new_first_tx_seq), &(new_first_rewardable_chunk, new_first_tx_seq),
FLOW_DB_KEY,
) )
.await .await
} }
} }
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> { async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await store
.get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
.await
} }
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> { async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await store
.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY)
.await
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -74,9 +74,11 @@ impl Store {
pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>( pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
&self, &self,
key: &K, key: &K,
dest: &str,
) -> Result<Option<T>> { ) -> Result<Option<T>> {
let key = key.as_ref().to_vec(); let key = key.as_ref().to_vec();
self.spawn(move |store| store.get_config_decoded(&key)) let dest = dest.to_string();
self.spawn(move |store| store.get_config_decoded(&key, &dest))
.await .await
} }
@ -84,10 +86,12 @@ impl Store {
&self, &self,
key: &K, key: &K,
value: &T, value: &T,
dest: &str,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let key = key.as_ref().to_vec(); let key = key.as_ref().to_vec();
let value = value.as_ssz_bytes(); let value = value.as_ssz_bytes();
self.spawn(move |store| store.set_config(&key, &value)) let dest = dest.to_string();
self.spawn(move |store| store.set_config(&key, &value, &dest))
.await .await
} }

View File

@ -2,16 +2,55 @@ use anyhow::{anyhow, Result};
use kvdb::{DBKey, DBOp}; use kvdb::{DBKey, DBOp};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use crate::log_store::log_manager::{COL_MISC, DATA_DB_KEY, FLOW_DB_KEY};
use crate::LogManager; use crate::LogManager;
use super::log_manager::COL_MISC; macro_rules! db_operation {
($self:expr, $dest:expr, get, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.get(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, put, $key:expr, $value:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.put(COL_MISC, $key, $value)?)
}};
($self:expr, $dest:expr, delete, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.delete(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, transaction, $tx:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
let mut db_tx = db.transaction();
db_tx.ops = $tx.ops;
Ok(db.write(db_tx)?)
}};
}
pub trait Configurable { pub trait Configurable {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>>; fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>>;
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()>; fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()>;
fn remove_config(&self, key: &[u8]) -> Result<()>; fn remove_config(&self, key: &[u8], dest: &str) -> Result<()>;
fn exec_configs(&self, tx: ConfigTx) -> Result<()>; fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()>;
} }
#[derive(Default)] #[derive(Default)]
@ -41,8 +80,12 @@ impl ConfigTx {
} }
pub trait ConfigurableExt: Configurable { pub trait ConfigurableExt: Configurable {
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(&self, key: &K) -> Result<Option<T>> { fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(
match self.get_config(key.as_ref())? { &self,
key: &K,
dest: &str,
) -> Result<Option<T>> {
match self.get_config(key.as_ref(), dest)? {
Some(val) => Ok(Some( Some(val) => Ok(Some(
T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?, T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?,
)), )),
@ -50,36 +93,36 @@ pub trait ConfigurableExt: Configurable {
} }
} }
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(&self, key: &K, value: &T) -> Result<()> { fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(
self.set_config(key.as_ref(), &value.as_ssz_bytes()) &self,
key: &K,
value: &T,
dest: &str,
) -> Result<()> {
self.set_config(key.as_ref(), &value.as_ssz_bytes(), dest)
} }
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K) -> Result<()> { fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K, dest: &str) -> Result<()> {
self.remove_config(key.as_ref()) self.remove_config(key.as_ref(), dest)
} }
} }
impl<T: ?Sized + Configurable> ConfigurableExt for T {} impl<T: ?Sized + Configurable> ConfigurableExt for T {}
impl Configurable for LogManager { impl Configurable for LogManager {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>> {
Ok(self.data_db.get(COL_MISC, key)?) db_operation!(self, dest, get, key)
} }
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> { fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()> {
self.data_db.put(COL_MISC, key, value)?; db_operation!(self, dest, put, key, value)
Ok(())
} }
fn remove_config(&self, key: &[u8]) -> Result<()> { fn remove_config(&self, key: &[u8], dest: &str) -> Result<()> {
Ok(self.data_db.delete(COL_MISC, key)?) db_operation!(self, dest, delete, key)
} }
fn exec_configs(&self, tx: ConfigTx) -> Result<()> { fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()> {
let mut db_tx = self.data_db.transaction(); db_operation!(self, dest, transaction, tx)
db_tx.ops = tx.ops;
self.data_db.write(db_tx)?;
Ok(())
} }
} }

View File

@ -44,6 +44,9 @@ pub const COL_PAD_DATA_LIST: u32 = 7; // flow db
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db
pub const COL_NUM: u32 = 9; pub const COL_NUM: u32 = 9;
pub const DATA_DB_KEY: &str = "data_db";
pub const FLOW_DB_KEY: &str = "flow_db";
// Process at most 1M entries (256MB) pad data at a time. // Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20; const PAD_MAX_SIZE: usize = 1 << 20;
@ -61,6 +64,7 @@ pub struct UpdateFlowMessage {
} }
pub struct LogManager { pub struct LogManager {
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>, pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore, tx_store: TransactionStore,
flow_store: Arc<FlowStore>, flow_store: Arc<FlowStore>,
@ -781,6 +785,7 @@ impl LogManager {
}); });
let log_manager = Self { let log_manager = Self {
flow_db: flow_db_source,
data_db: data_db_source, data_db: data_db_source,
tx_store, tx_store,
flow_store, flow_store,

View File

@ -1,7 +1,10 @@
use super::tx_store::TxStore; use super::tx_store::TxStore;
use anyhow::Result; use anyhow::Result;
use std::sync::Arc; use std::sync::Arc;
use storage::log_store::config::{ConfigTx, ConfigurableExt}; use storage::log_store::{
config::{ConfigTx, ConfigurableExt},
log_manager::DATA_DB_KEY,
};
use storage_async::Store; use storage_async::Store;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -66,10 +69,10 @@ impl SyncStore {
let store = async_store.get_store(); let store = async_store.get_store();
// load next_tx_seq // load next_tx_seq
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?; let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)?;
// load max_tx_seq // load max_tx_seq
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ)?; let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ, DATA_DB_KEY)?;
Ok((next_tx_seq, max_tx_seq)) Ok((next_tx_seq, max_tx_seq))
} }
@ -77,13 +80,13 @@ impl SyncStore {
pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> { pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await; let async_store = self.store.write().await;
let store = async_store.get_store(); let store = async_store.get_store();
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq) store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq, DATA_DB_KEY)
} }
pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> { pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await; let async_store = self.store.write().await;
let store = async_store.get_store(); let store = async_store.get_store();
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq) store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq, DATA_DB_KEY)
} }
pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> { pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
@ -114,7 +117,7 @@ impl SyncStore {
} }
let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?; let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?; store.exec_configs(tx, DATA_DB_KEY)?;
if removed { if removed {
Ok(InsertResult::Upgraded) Ok(InsertResult::Upgraded)
@ -128,7 +131,7 @@ impl SyncStore {
} }
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?; let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?; store.exec_configs(tx, DATA_DB_KEY)?;
if removed { if removed {
Ok(InsertResult::Downgraded) Ok(InsertResult::Downgraded)
@ -151,7 +154,7 @@ impl SyncStore {
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?; let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?; store.exec_configs(tx, DATA_DB_KEY)?;
Ok(added) Ok(added)
} }

View File

@ -1,6 +1,7 @@
use anyhow::Result; use anyhow::Result;
use rand::Rng; use rand::Rng;
use storage::log_store::config::{ConfigTx, ConfigurableExt}; use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::log_store::Store; use storage::log_store::Store;
/// TxStore is used to store pending transactions that to be synchronized in advance. /// TxStore is used to store pending transactions that to be synchronized in advance.
@ -32,11 +33,11 @@ impl TxStore {
} }
fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> { fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> {
store.get_config_decoded(&self.key_seq_to_index(tx_seq)) store.get_config_decoded(&self.key_seq_to_index(tx_seq), DATA_DB_KEY)
} }
fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> { fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> {
store.get_config_decoded(&self.key_index_to_seq(index)) store.get_config_decoded(&self.key_index_to_seq(index), DATA_DB_KEY)
} }
pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> { pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
@ -45,7 +46,7 @@ impl TxStore {
pub fn count(&self, store: &dyn Store) -> Result<usize> { pub fn count(&self, store: &dyn Store) -> Result<usize> {
store store
.get_config_decoded(&self.key_count) .get_config_decoded(&self.key_count, DATA_DB_KEY)
.map(|x| x.unwrap_or(0)) .map(|x| x.unwrap_or(0))
} }
@ -70,7 +71,7 @@ impl TxStore {
if let Some(db_tx) = db_tx { if let Some(db_tx) = db_tx {
db_tx.append(&mut tx); db_tx.append(&mut tx);
} else { } else {
store.exec_configs(tx)?; store.exec_configs(tx, DATA_DB_KEY)?;
} }
Ok(true) Ok(true)
@ -130,7 +131,7 @@ impl TxStore {
if let Some(db_tx) = db_tx { if let Some(db_tx) = db_tx {
db_tx.append(&mut tx); db_tx.append(&mut tx);
} else { } else {
store.exec_configs(tx)?; store.exec_configs(tx, DATA_DB_KEY)?;
} }
Ok(true) Ok(true)