mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-18 02:55:17 +00:00
separate data db from flow db (#252)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* separate data db from flow db
This commit is contained in:
parent
bb6e1457b7
commit
9eea71e97d
3
.gitignore
vendored
3
.gitignore
vendored
@ -4,5 +4,6 @@
|
||||
/.idea
|
||||
tests/**/__pycache__
|
||||
tests/tmp/**
|
||||
tests/config/zgs
|
||||
.vscode/*.json
|
||||
/0g-storage-contracts-dev
|
||||
/0g-storage-contracts-dev
|
||||
|
@ -112,8 +112,13 @@ impl ClientBuilder {
|
||||
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
||||
let store = Arc::new(
|
||||
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||
LogManager::rocksdb(
|
||||
config.log_config.clone(),
|
||||
config.db_dir.join("flow_db"),
|
||||
config.db_dir.join("data_db"),
|
||||
executor,
|
||||
)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||
);
|
||||
|
||||
self.store = Some(store.clone());
|
||||
|
@ -25,9 +25,14 @@ fn write_performance(c: &mut Criterion) {
|
||||
let executor = runtime.task_executor.clone();
|
||||
|
||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||
LogManager::rocksdb(LogConfig::default(), "db_write", executor)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
LogManager::rocksdb(
|
||||
LogConfig::default(),
|
||||
"db_flow_write",
|
||||
"db_data_write",
|
||||
executor,
|
||||
)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
));
|
||||
|
||||
let chunk_count = 2048;
|
||||
@ -114,9 +119,14 @@ fn read_performance(c: &mut Criterion) {
|
||||
let executor = runtime.task_executor.clone();
|
||||
|
||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||
LogManager::rocksdb(LogConfig::default(), "db_read", executor)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
LogManager::rocksdb(
|
||||
LogConfig::default(),
|
||||
"db_flow_read",
|
||||
"db_data_read",
|
||||
executor,
|
||||
)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
));
|
||||
|
||||
let tx_size = 1000;
|
||||
|
@ -63,22 +63,22 @@ impl<T: ?Sized + Configurable> ConfigurableExt for T {}
|
||||
|
||||
impl Configurable for LogManager {
|
||||
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
Ok(self.db.get(COL_MISC, key)?)
|
||||
Ok(self.flow_db.get(COL_MISC, key)?)
|
||||
}
|
||||
|
||||
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
self.db.put(COL_MISC, key, value)?;
|
||||
self.flow_db.put(COL_MISC, key, value)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_config(&self, key: &[u8]) -> Result<()> {
|
||||
Ok(self.db.delete(COL_MISC, key)?)
|
||||
Ok(self.flow_db.delete(COL_MISC, key)?)
|
||||
}
|
||||
|
||||
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
|
||||
let mut db_tx = self.db.transaction();
|
||||
let mut db_tx = self.flow_db.transaction();
|
||||
db_tx.ops = tx.ops;
|
||||
self.db.write(db_tx)?;
|
||||
self.flow_db.write(db_tx)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -25,15 +25,15 @@ use tracing::{debug, error, trace};
|
||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||
|
||||
pub struct FlowStore {
|
||||
db: Arc<FlowDBStore>,
|
||||
data_db: Arc<FlowDBStore>,
|
||||
seal_manager: SealTaskManager,
|
||||
config: FlowConfig,
|
||||
}
|
||||
|
||||
impl FlowStore {
|
||||
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||
pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||
Self {
|
||||
db,
|
||||
data_db,
|
||||
seal_manager: Default::default(),
|
||||
config,
|
||||
}
|
||||
@ -45,18 +45,19 @@ impl FlowStore {
|
||||
subtree_list: Vec<(usize, usize, DataRoot)>,
|
||||
) -> Result<()> {
|
||||
let mut batch = self
|
||||
.db
|
||||
.data_db
|
||||
.get_entry_batch(batch_index as u64)?
|
||||
.unwrap_or_else(|| EntryBatch::new(batch_index as u64));
|
||||
batch.set_subtree_list(subtree_list);
|
||||
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
||||
self.data_db
|
||||
.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn gen_proof_in_batch(&self, batch_index: usize, sector_index: usize) -> Result<FlowProof> {
|
||||
let batch = self
|
||||
.db
|
||||
.data_db
|
||||
.get_entry_batch(batch_index as u64)?
|
||||
.ok_or_else(|| anyhow!("batch missing, index={}", batch_index))?;
|
||||
let merkle = batch.to_merkle_tree(batch_index == 0)?.ok_or_else(|| {
|
||||
@ -70,7 +71,7 @@ impl FlowStore {
|
||||
|
||||
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
||||
self.seal_manager.delete_batch_list(batch_list);
|
||||
self.db.delete_batch_list(batch_list)
|
||||
self.data_db.delete_batch_list(batch_list)
|
||||
}
|
||||
}
|
||||
|
||||
@ -116,7 +117,7 @@ impl FlowRead for FlowStore {
|
||||
length -= 1;
|
||||
}
|
||||
|
||||
let entry_batch = try_option!(self.db.get_entry_batch(chunk_index)?);
|
||||
let entry_batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
|
||||
let mut entry_batch_data =
|
||||
try_option!(entry_batch.get_unsealed_data(offset as usize, length as usize));
|
||||
data.append(&mut entry_batch_data);
|
||||
@ -145,7 +146,7 @@ impl FlowRead for FlowStore {
|
||||
let chunk_index = start_entry_index / self.config.batch_size as u64;
|
||||
|
||||
if let Some(mut data_list) = self
|
||||
.db
|
||||
.data_db
|
||||
.get_entry_batch(chunk_index)?
|
||||
.map(|b| b.into_data_list(start_entry_index))
|
||||
{
|
||||
@ -170,7 +171,7 @@ impl FlowRead for FlowStore {
|
||||
}
|
||||
|
||||
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
|
||||
let batch = try_option!(self.db.get_entry_batch(chunk_index)?);
|
||||
let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
|
||||
let mut mine_chunk = MineLoadChunk::default();
|
||||
for (seal_index, (sealed, validity)) in mine_chunk
|
||||
.loaded_chunk
|
||||
@ -188,7 +189,7 @@ impl FlowRead for FlowStore {
|
||||
|
||||
fn get_num_entries(&self) -> Result<u64> {
|
||||
// This is an over-estimation as it assumes each batch is full.
|
||||
self.db
|
||||
self.data_db
|
||||
.kvdb
|
||||
.num_keys(COL_ENTRY_BATCH)
|
||||
.map(|num_batches| num_batches * PORA_CHUNK_SIZE as u64)
|
||||
@ -228,7 +229,7 @@ impl FlowWrite for FlowStore {
|
||||
|
||||
// TODO: Try to avoid loading from db if possible.
|
||||
let mut batch = self
|
||||
.db
|
||||
.data_db
|
||||
.get_entry_batch(chunk_index)?
|
||||
.unwrap_or_else(|| EntryBatch::new(chunk_index));
|
||||
let completed_seals = batch.insert_data(
|
||||
@ -246,12 +247,12 @@ impl FlowWrite for FlowStore {
|
||||
|
||||
batch_list.push((chunk_index, batch));
|
||||
}
|
||||
self.db.put_entry_batch_list(batch_list)
|
||||
self.data_db.put_entry_batch_list(batch_list)
|
||||
}
|
||||
|
||||
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
|
||||
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
|
||||
let to_reseal = self.data_db.truncate(start_index, self.config.batch_size)?;
|
||||
|
||||
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
|
||||
let new_seal_version = self.seal_manager.inc_seal_version();
|
||||
@ -281,7 +282,7 @@ impl FlowSeal for FlowStore {
|
||||
let mut tasks = Vec::with_capacity(SEALS_PER_LOAD);
|
||||
|
||||
let batch_data = self
|
||||
.db
|
||||
.data_db
|
||||
.get_entry_batch((first_index / SEALS_PER_LOAD) as u64)?
|
||||
.expect("Lost data chunk in to_seal_set");
|
||||
|
||||
@ -320,7 +321,7 @@ impl FlowSeal for FlowStore {
|
||||
.chunk_by(|answer| answer.seal_index / SEALS_PER_LOAD as u64)
|
||||
{
|
||||
let mut batch_chunk = self
|
||||
.db
|
||||
.data_db
|
||||
.get_entry_batch(load_index)?
|
||||
.expect("Can not find chunk data");
|
||||
for answer in answers_in_chunk {
|
||||
@ -336,7 +337,7 @@ impl FlowSeal for FlowStore {
|
||||
to_seal_set.remove(&idx);
|
||||
}
|
||||
|
||||
self.db.put_entry_raw(updated_chunk)?;
|
||||
self.data_db.put_entry_raw(updated_chunk)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ pub struct UpdateFlowMessage {
|
||||
}
|
||||
|
||||
pub struct LogManager {
|
||||
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
|
||||
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
|
||||
tx_store: TransactionStore,
|
||||
flow_store: Arc<FlowStore>,
|
||||
merkle: RwLock<MerkleManager>,
|
||||
@ -612,28 +612,33 @@ impl LogStoreRead for LogManager {
|
||||
impl LogManager {
|
||||
pub fn rocksdb(
|
||||
config: LogConfig,
|
||||
path: impl AsRef<Path>,
|
||||
flow_path: impl AsRef<Path>,
|
||||
data_path: impl AsRef<Path>,
|
||||
executor: task_executor::TaskExecutor,
|
||||
) -> Result<Self> {
|
||||
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
|
||||
db_config.enable_statistics = true;
|
||||
let db = Arc::new(Database::open(&db_config, path)?);
|
||||
Self::new(db, config, executor)
|
||||
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
|
||||
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
|
||||
Self::new(flow_db_source, data_db_source, config, executor)
|
||||
}
|
||||
|
||||
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
|
||||
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||
Self::new(db, config, executor)
|
||||
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||
Self::new(flow_db, data_db, config, executor)
|
||||
}
|
||||
|
||||
fn new(
|
||||
db: Arc<dyn ZgsKeyValueDB>,
|
||||
flow_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||
data_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||
config: LogConfig,
|
||||
executor: task_executor::TaskExecutor,
|
||||
) -> Result<Self> {
|
||||
let tx_store = TransactionStore::new(db.clone())?;
|
||||
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
||||
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
||||
let tx_store = TransactionStore::new(flow_db_source.clone())?;
|
||||
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
|
||||
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
|
||||
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
|
||||
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||
// first and call `put_tx` later.
|
||||
let next_tx_seq = tx_store.next_tx_seq();
|
||||
@ -737,7 +742,7 @@ impl LogManager {
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
|
||||
let mut log_manager = Self {
|
||||
db,
|
||||
flow_db: flow_db_source,
|
||||
tx_store,
|
||||
flow_store,
|
||||
merkle,
|
||||
|
@ -1 +0,0 @@
|
||||
enr:-Ly4QJZwz9htAorBIx_otqoaRFPohX7NQJ31iBB6mcEhBiuPWsOnigc1ABQsg6tLU1OirQdLR6aEvv8SlkkfIbV72T8CgmlkgnY0gmlwhH8AAAGQbmV0d29ya19pZGVudGl0eZ8oIwAAAAAAADPyz8cpvYcPpUtQMmYOBrTPKn-UAAIAiXNlY3AyNTZrMaEDeDdgnDgLPkxNxB39jKb9f1Na30t6R9vVolpTk5zu-hODdGNwgir4g3VkcIIq-A
|
@ -1 +0,0 @@
|
||||
Y<13><><02><><EFBFBD>Ң<>-
<0A><>r<>7<EFBFBD><37>jq<6A>p<>}<7D>
|
Loading…
Reference in New Issue
Block a user