Compare commits

...

3 Commits

Author SHA1 Message Date
0xroy
df8a6b32ed
Merge fd9c033176 into fae2d5efb6 2024-11-08 22:07:13 +08:00
0g-peterzhb
fae2d5efb6
@peter/db split (#262)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* do not pad tx during sync phase

* add pad data store

* support async padding after sync phase

* split misc

* add sleep for the next loop
2024-11-08 22:06:45 +08:00
Roy Lu
fd9c033176 Updated README 2024-10-23 08:52:56 -07:00
19 changed files with 327 additions and 235 deletions

View File

@ -2,69 +2,32 @@
## Overview
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, in areas such as AI and gaming.
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
In addition, it forms the storage layer for the 0G data availability (DA) system, with the cross-layer integration abstracted away from Rollup and AppChain builders.
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/)
## System Architecture
## Integration
0G Storage consists of two main components:
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features:
1. **Data Publishing Lane**: Ensures quick data availability and verification through the 0G Consensus network.
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and reliability.
* File Merkle Tree Class
* Flow Contract Types
* RPC methods support
* File upload
* Support browser environment
* Tests for different environments (In Progress)
* File download (In Progress)
Across the two lanes, 0G Storage supports the following features:
## Deployment
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Incentivized Participation**: Utilizes the PoRA (Proof of Random Access) mining algorithm to incentivize storage network participants.
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node.
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/og-storage).
## Test
## Documentation
### Prerequisites
- If you want to run a node, please refer to the [Running a Node](https://docs.0g.ai/run-a-node/storage-node) guide.
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/build-with-0g/storage-sdk) guide.
* Required python version: 3.8, 3.9, 3.10, higher version is not guaranteed (e.g. failed to install `pysha3`).
* Install dependencies under root folder: `pip3 install -r requirements.txt`
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
### Dependencies
Python test framework will launch blockchain fullnodes at local for storage node to interact with. There are 2 kinds of fullnodes supported:
* Conflux eSpace node (by default).
* BSC node (geth).
For Conflux eSpace node, the test framework will automatically compile the binary at runtime, and copy the binary to `tests/tmp` folder. For BSC node, the test framework will automatically download the latest version binary from [github](https://github.com/bnb-chain/bsc/releases) to `tests/tmp` folder.
Alternatively, you could also manually copy specific version binaries (conflux or geth) to the `tests/tmp` folder. Note, do **NOT** copy released conflux binary on github, since block height of some CIPs are hardcoded.
For testing, it's also dependent on the following repos:
* [0G Storage Contract](https://github.com/0glabs/0g-storage-contracts): It essentially provides two abi interfaces for 0G Storage Node to interact with the on-chain contracts.
* ZgsFlow: It contains apis to submit chunk data.
* PoraMine: It contains apis to submit PoRA answers.
* [0G Storage Client](https://github.com/0glabs/0g-storage-client): It is used to interact with certain 0G Storage Nodes to upload/download files.
### Run Tests
Go to the `tests` folder and run the following command to run all tests:
```
python test_all.py
```
or, run any single test, e.g.
```
python sync_test.py
```
## Contributing
To make contributions to the project, please follow the guidelines [here](contributing.md).
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)

View File

@ -15,19 +15,13 @@ abigen!(
);
#[cfg(feature = "dev")]
abigen!(
ZgsFlow,
"../../0g-storage-contracts-dev/artifacts/contracts/dataFlow/Flow.sol/Flow.json"
);
abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json");
#[cfg(feature = "dev")]
abigen!(
PoraMine,
"../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
);
abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json");
#[cfg(feature = "dev")]
abigen!(
ChunkLinearReward,
"../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
"../../storage-contracts-abis/ChunkLinearReward.json"
);

View File

@ -277,6 +277,9 @@ impl LogSyncManager {
.remove_finalized_block_interval_minutes,
);
// start the pad data store
log_sync_manager.store.start_padding(&executor_clone);
let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel();
let watch_rx = log_sync_manager.log_fetcher.start_watch(
@ -509,6 +512,7 @@ impl LogSyncManager {
}
}
self.data_cache.garbage_collect(self.next_tx_seq);
self.next_tx_seq += 1;
// Check if the computed data root matches on-chain state.

View File

@ -5,17 +5,20 @@ use ethereum_types::Address;
use ethers::contract::ContractCall;
use ethers::contract::EthEvent;
use std::sync::Arc;
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::H256;
use storage_async::Store;
const MINER_ID: &str = "mine.miner_id";
pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
store.get_config_decoded(&MINER_ID).await
store.get_config_decoded(&MINER_ID, DATA_DB_KEY).await
}
async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
store.set_config_encoded(&MINER_ID, miner_id).await
store
.set_config_encoded(&MINER_ID, miner_id, DATA_DB_KEY)
.await
}
pub(crate) async fn check_and_request_miner_id(

View File

@ -11,7 +11,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE};
use storage_async::Store;
use task_executor::TaskExecutor;
use tokio::sync::{broadcast, mpsc};
@ -252,7 +252,7 @@ impl Pruner {
.update_shard_config(self.config.shard_config)
.await;
self.store
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config, DATA_DB_KEY)
.await
}
@ -265,17 +265,22 @@ impl Pruner {
.set_config_encoded(
&FIRST_REWARDABLE_CHUNK_KEY,
&(new_first_rewardable_chunk, new_first_tx_seq),
FLOW_DB_KEY,
)
.await
}
}
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await
store
.get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
.await
}
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
store
.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY)
.await
}
#[derive(Debug)]

View File

@ -1054,8 +1054,7 @@ mod tests {
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
let store = LogManager::memorydb(LogConfig::default()).unwrap();
Self {
runtime,
network_globals: Arc::new(network_globals),

View File

@ -89,10 +89,9 @@ impl ClientBuilder {
/// Initializes in-memory storage.
pub fn with_memory_store(mut self) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
// TODO(zz): Set config.
let store = Arc::new(
LogManager::memorydb(LogConfig::default(), executor)
LogManager::memorydb(LogConfig::default())
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
);
@ -110,13 +109,11 @@ impl ClientBuilder {
/// Initializes RocksDB storage.
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(
config.log_config.clone(),
config.db_dir.join("flow_db"),
config.db_dir.join("data_db"),
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);

View File

@ -74,9 +74,11 @@ impl Store {
pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
&self,
key: &K,
dest: &str,
) -> Result<Option<T>> {
let key = key.as_ref().to_vec();
self.spawn(move |store| store.get_config_decoded(&key))
let dest = dest.to_string();
self.spawn(move |store| store.get_config_decoded(&key, &dest))
.await
}
@ -84,10 +86,12 @@ impl Store {
&self,
key: &K,
value: &T,
dest: &str,
) -> anyhow::Result<()> {
let key = key.as_ref().to_vec();
let value = value.as_ssz_bytes();
self.spawn(move |store| store.set_config(&key, &value))
let dest = dest.to_string();
self.spawn(move |store| store.set_config(&key, &value, &dest))
.await
}

View File

@ -14,23 +14,14 @@ use storage::{
},
LogManager,
};
use task_executor::test_utils::TestRuntime;
fn write_performance(c: &mut Criterion) {
if Path::new("db_write").exists() {
fs::remove_dir_all("db_write").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(
LogConfig::default(),
"db_flow_write",
"db_data_write",
executor,
)
LogManager::rocksdb(LogConfig::default(), "db_flow_write", "db_data_write")
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
@ -114,17 +105,8 @@ fn read_performance(c: &mut Criterion) {
fs::remove_dir_all("db_read").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(
LogConfig::default(),
"db_flow_read",
"db_data_read",
executor,
)
LogManager::rocksdb(LogConfig::default(), "db_flow_read", "db_data_read")
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));

View File

@ -2,16 +2,55 @@ use anyhow::{anyhow, Result};
use kvdb::{DBKey, DBOp};
use ssz::{Decode, Encode};
use crate::log_store::log_manager::{COL_MISC, DATA_DB_KEY, FLOW_DB_KEY};
use crate::LogManager;
use super::log_manager::COL_MISC;
macro_rules! db_operation {
($self:expr, $dest:expr, get, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.get(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, put, $key:expr, $value:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.put(COL_MISC, $key, $value)?)
}};
($self:expr, $dest:expr, delete, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.delete(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, transaction, $tx:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
let mut db_tx = db.transaction();
db_tx.ops = $tx.ops;
Ok(db.write(db_tx)?)
}};
}
pub trait Configurable {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()>;
fn remove_config(&self, key: &[u8]) -> Result<()>;
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>>;
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()>;
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()>;
fn exec_configs(&self, tx: ConfigTx) -> Result<()>;
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()>;
}
#[derive(Default)]
@ -41,8 +80,12 @@ impl ConfigTx {
}
pub trait ConfigurableExt: Configurable {
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(&self, key: &K) -> Result<Option<T>> {
match self.get_config(key.as_ref())? {
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(
&self,
key: &K,
dest: &str,
) -> Result<Option<T>> {
match self.get_config(key.as_ref(), dest)? {
Some(val) => Ok(Some(
T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?,
)),
@ -50,36 +93,36 @@ pub trait ConfigurableExt: Configurable {
}
}
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(&self, key: &K, value: &T) -> Result<()> {
self.set_config(key.as_ref(), &value.as_ssz_bytes())
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(
&self,
key: &K,
value: &T,
dest: &str,
) -> Result<()> {
self.set_config(key.as_ref(), &value.as_ssz_bytes(), dest)
}
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K) -> Result<()> {
self.remove_config(key.as_ref())
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K, dest: &str) -> Result<()> {
self.remove_config(key.as_ref(), dest)
}
}
impl<T: ?Sized + Configurable> ConfigurableExt for T {}
impl Configurable for LogManager {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.flow_db.get(COL_MISC, key)?)
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>> {
db_operation!(self, dest, get, key)
}
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.flow_db.put(COL_MISC, key, value)?;
Ok(())
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()> {
db_operation!(self, dest, put, key, value)
}
fn remove_config(&self, key: &[u8]) -> Result<()> {
Ok(self.flow_db.delete(COL_MISC, key)?)
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()> {
db_operation!(self, dest, delete, key)
}
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
let mut db_tx = self.flow_db.transaction();
db_tx.ops = tx.ops;
self.flow_db.write(db_tx)?;
Ok(())
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()> {
db_operation!(self, dest, transaction, tx)
}
}

View File

@ -1,4 +1,5 @@
use super::load_chunk::EntryBatch;
use super::log_manager::{COL_PAD_DATA_LIST, COL_PAD_DATA_SYNC_HEIGH};
use super::seal_task_manager::SealTaskManager;
use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
@ -25,14 +26,16 @@ use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
pub struct FlowStore {
flow_db: Arc<FlowDBStore>,
data_db: Arc<FlowDBStore>,
seal_manager: SealTaskManager,
config: FlowConfig,
}
impl FlowStore {
pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
Self {
flow_db,
data_db,
seal_manager: Default::default(),
config,
@ -199,6 +202,14 @@ impl FlowRead for FlowStore {
fn get_shard_config(&self) -> ShardConfig {
*self.config.shard_config.read()
}
fn get_pad_data(&self, start_index: u64) -> crate::error::Result<Option<Vec<PadPair>>> {
self.flow_db.get_pad_data(start_index)
}
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
self.data_db.get_pad_data_sync_height()
}
}
impl FlowWrite for FlowStore {
@ -266,6 +277,14 @@ impl FlowWrite for FlowStore {
fn update_shard_config(&self, shard_config: ShardConfig) {
*self.config.shard_config.write() = shard_config;
}
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> crate::error::Result<()> {
self.flow_db.put_pad_data(data_sizes, tx_seq)
}
fn put_pad_data_sync_height(&self, sync_index: u64) -> crate::error::Result<()> {
self.data_db.put_pad_data_sync_height(sync_index)
}
}
impl FlowSeal for FlowStore {
@ -343,6 +362,12 @@ impl FlowSeal for FlowStore {
}
}
#[derive(Debug, PartialEq, DeriveEncode, DeriveDecode)]
pub struct PadPair {
pub start_index: u64,
pub data_size: u64,
}
pub struct FlowDBStore {
kvdb: Arc<dyn ZgsKeyValueDB>,
}
@ -443,6 +468,48 @@ impl FlowDBStore {
}
Ok(self.kvdb.write(tx)?)
}
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
let mut buffer = Vec::new();
for item in data_sizes {
buffer.extend(item.as_ssz_bytes());
}
tx.put(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes(), &buffer);
self.kvdb.write(tx)?;
Ok(())
}
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
tx.put(
COL_PAD_DATA_SYNC_HEIGH,
b"sync_height",
&tx_seq.to_be_bytes(),
);
self.kvdb.write(tx)?;
Ok(())
}
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
Some(v) => Ok(Some(u64::from_be_bytes(
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
))),
None => Ok(None),
}
}
fn get_pad_data(&self, tx_seq: u64) -> Result<Option<Vec<PadPair>>> {
match self.kvdb.get(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes())? {
Some(v) => Ok(Some(
Vec::<PadPair>::from_ssz_bytes(&v).map_err(Error::from)?,
)),
None => Ok(None),
}
}
}
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]

View File

@ -1,10 +1,11 @@
use super::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
use crate::log_store::tx_store::TransactionStore;
use crate::log_store::flow_store::{
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
};
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore};
use crate::log_store::{
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
};
use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
@ -24,8 +25,8 @@ use shared_types::{
use std::cmp::Ordering;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info, instrument, trace, warn};
/// 256 Bytes
@ -33,17 +34,21 @@ pub const ENTRY_SIZE: usize = 256;
/// 1024 Entries.
pub const PORA_CHUNK_SIZE: usize = 1024;
pub const COL_TX: u32 = 0;
pub const COL_ENTRY_BATCH: u32 = 1;
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2;
pub const COL_ENTRY_BATCH_ROOT: u32 = 3;
pub const COL_TX_COMPLETED: u32 = 4;
pub const COL_MISC: u32 = 5;
pub const COL_SEAL_CONTEXT: u32 = 6;
pub const COL_FLOW_MPT_NODES: u32 = 7;
pub const COL_BLOCK_PROGRESS: u32 = 8;
pub const COL_TX: u32 = 0; // flow db
pub const COL_ENTRY_BATCH: u32 = 1; // data db
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; // flow db
pub const COL_TX_COMPLETED: u32 = 3; // data db
pub const COL_MISC: u32 = 4; // flow db & data db
pub const COL_FLOW_MPT_NODES: u32 = 5; // flow db
pub const COL_BLOCK_PROGRESS: u32 = 6; // flow db
pub const COL_PAD_DATA_LIST: u32 = 7; // flow db
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db
pub const COL_NUM: u32 = 9;
pub const DATA_DB_KEY: &str = "data_db";
pub const FLOW_DB_KEY: &str = "flow_db";
const PAD_DELAY: Duration = Duration::from_secs(2);
// Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20;
@ -62,10 +67,10 @@ pub struct UpdateFlowMessage {
pub struct LogManager {
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore,
flow_store: Arc<FlowStore>,
merkle: RwLock<MerkleManager>,
sender: mpsc::Sender<UpdateFlowMessage>,
}
struct MerkleManager {
@ -264,7 +269,12 @@ impl LogStoreWrite for LogManager {
}
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
// TODO(zz): Should we validate received tx?
self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
self.append_subtree_list(
tx.seq,
tx.start_entry_index,
tx.merkle_nodes.clone(),
&mut merkle,
)?;
merkle.commit_merkle(tx.seq)?;
debug!(
"commit flow root: root={:?}",
@ -401,6 +411,42 @@ impl LogStoreWrite for LogManager {
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.flow_store.submit_seal_result(answers)
}
fn start_padding(&self, executor: &task_executor::TaskExecutor) {
let store = self.flow_store.clone();
executor.spawn(
async move {
let current_height = store.get_pad_data_sync_height().unwrap();
let mut start_index = current_height.unwrap_or(0);
loop {
match store.get_pad_data(start_index) {
std::result::Result::Ok(data) => {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
if let Some(data) = data {
for pad in data {
store
.append_entries(ChunkArray {
data: vec![0; pad.data_size as usize],
start_index: pad.start_index,
})
.unwrap();
}
};
store.put_pad_data_sync_height(start_index).unwrap();
start_index += 1;
}
std::result::Result::Err(_) => {
debug!("Unable to get pad data, start_index={}", start_index);
tokio::time::sleep(PAD_DELAY).await;
}
};
}
},
"pad_tx",
);
}
}
impl LogStoreChunkRead for LogManager {
@ -614,31 +660,33 @@ impl LogManager {
config: LogConfig,
flow_path: impl AsRef<Path>,
data_path: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
db_config.enable_statistics = true;
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
Self::new(flow_db_source, data_db_source, config, executor)
Self::new(flow_db_source, data_db_source, config)
}
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
pub fn memorydb(config: LogConfig) -> Result<Self> {
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(flow_db, data_db, config, executor)
Self::new(flow_db, data_db, config)
}
fn new(
flow_db_source: Arc<dyn ZgsKeyValueDB>,
data_db_source: Arc<dyn ZgsKeyValueDB>,
config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let tx_store = TransactionStore::new(flow_db_source.clone())?;
let tx_store = TransactionStore::new(data_db_source.clone())?;
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
let flow_store = Arc::new(FlowStore::new(
flow_db.clone(),
data_db.clone(),
config.flow.clone(),
));
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
// first and call `put_tx` later.
let next_tx_seq = tx_store.next_tx_seq();
@ -739,18 +787,14 @@ impl LogManager {
last_chunk_merkle,
});
let (sender, receiver) = mpsc::channel();
let mut log_manager = Self {
let log_manager = Self {
flow_db: flow_db_source,
data_db: data_db_source,
tx_store,
flow_store,
merkle,
sender,
};
log_manager.start_receiver(receiver, executor);
if let Some(tx) = last_tx_to_insert {
log_manager.put_tx(tx)?;
}
@ -765,40 +809,6 @@ impl LogManager {
Ok(log_manager)
}
fn start_receiver(
&mut self,
rx: mpsc::Receiver<UpdateFlowMessage>,
executor: task_executor::TaskExecutor,
) {
let flow_store = self.flow_store.clone();
executor.spawn(
async move {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store
.append_entries(ChunkArray {
data: vec![0; data.pad_data],
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
debug!("Log manager inner channel closed");
break;
}
};
}
},
"pad_tx",
);
// Wait for the spawned thread to finish
// let _ = handle.join().expect("Thread panicked");
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
match maybe_root {
None => self.gen_proof_at_version(flow_index, None),
@ -854,6 +864,7 @@ impl LogManager {
#[instrument(skip(self, merkle))]
fn append_subtree_list(
&self,
tx_seq: u64,
tx_start_index: u64,
merkle_list: Vec<(usize, DataRoot)>,
merkle: &mut MerkleManager,
@ -862,7 +873,7 @@ impl LogManager {
return Ok(());
}
self.pad_tx(tx_start_index, &mut *merkle)?;
self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?;
for (subtree_depth, subtree_root) in merkle_list {
let subtree_size = 1 << (subtree_depth - 1);
@ -900,7 +911,7 @@ impl LogManager {
}
#[instrument(skip(self, merkle))]
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow.
let mut tx_start_flow_index =
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
@ -910,6 +921,7 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
let mut pad_list = vec![];
if pad_size != 0 {
for pad_data in Self::padding(pad_size as usize) {
let mut is_full_empty = true;
@ -954,10 +966,10 @@ impl LogManager {
let data_size = pad_data.len() / ENTRY_SIZE;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
pad_data: pad_data.len(),
tx_start_flow_index,
})?;
pad_list.push(PadPair {
data_size: pad_data.len() as u64,
start_index: tx_start_flow_index,
});
} else {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
@ -979,6 +991,8 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
self.flow_store.put_pad_data(&pad_list, tx_seq)?;
Ok(())
}

View File

@ -1,6 +1,7 @@
use crate::config::ShardConfig;
use ethereum_types::H256;
use flow_store::PadPair;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
Transaction,
@ -158,6 +159,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
fn update_shard_config(&self, shard_config: ShardConfig);
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
fn start_padding(&self, executor: &task_executor::TaskExecutor);
}
pub trait LogStoreChunkWrite {
@ -217,6 +220,10 @@ pub trait FlowRead {
fn get_num_entries(&self) -> Result<u64>;
fn get_shard_config(&self) -> ShardConfig;
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
}
pub trait FlowWrite {
@ -231,6 +238,10 @@ pub trait FlowWrite {
/// Update the shard config.
fn update_shard_config(&self, shard_config: ShardConfig);
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
}
pub struct SealTask {
@ -269,3 +280,23 @@ pub trait FlowSeal {
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
pub trait PadDataStoreRead {
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
}
pub trait PadDataStoreWrite {
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
fn start_padding(&mut self, executor: &task_executor::TaskExecutor);
}
pub trait PadDataStore:
PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static
{
}
impl<T: PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static>
PadDataStore for T
{
}

View File

@ -9,15 +9,10 @@ use rand::random;
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
use std::cmp;
use task_executor::test_utils::TestRuntime;
#[test]
fn test_put_get() {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(config.clone(), executor).unwrap();
let store = LogManager::memorydb(config.clone()).unwrap();
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
// Aligned with size.
let start_offset = 1024;
@ -174,10 +169,7 @@ fn test_put_tx() {
fn create_store() -> LogManager {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(config, executor).unwrap()
LogManager::memorydb(config).unwrap()
}
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {

View File

@ -1,7 +1,10 @@
use super::tx_store::TxStore;
use anyhow::Result;
use std::sync::Arc;
use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage::log_store::{
config::{ConfigTx, ConfigurableExt},
log_manager::DATA_DB_KEY,
};
use storage_async::Store;
use tokio::sync::RwLock;
@ -66,10 +69,10 @@ impl SyncStore {
let store = async_store.get_store();
// load next_tx_seq
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?;
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)?;
// load max_tx_seq
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ)?;
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ, DATA_DB_KEY)?;
Ok((next_tx_seq, max_tx_seq))
}
@ -77,13 +80,13 @@ impl SyncStore {
pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await;
let store = async_store.get_store();
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq)
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq, DATA_DB_KEY)
}
pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await;
let store = async_store.get_store();
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq, DATA_DB_KEY)
}
pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
@ -114,7 +117,7 @@ impl SyncStore {
}
let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
if removed {
Ok(InsertResult::Upgraded)
@ -128,7 +131,7 @@ impl SyncStore {
}
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
if removed {
Ok(InsertResult::Downgraded)
@ -151,7 +154,7 @@ impl SyncStore {
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
Ok(added)
}

View File

@ -1,6 +1,7 @@
use anyhow::Result;
use rand::Rng;
use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::log_store::Store;
/// TxStore is used to store pending transactions that to be synchronized in advance.
@ -32,11 +33,11 @@ impl TxStore {
}
fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> {
store.get_config_decoded(&self.key_seq_to_index(tx_seq))
store.get_config_decoded(&self.key_seq_to_index(tx_seq), DATA_DB_KEY)
}
fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> {
store.get_config_decoded(&self.key_index_to_seq(index))
store.get_config_decoded(&self.key_index_to_seq(index), DATA_DB_KEY)
}
pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
@ -45,7 +46,7 @@ impl TxStore {
pub fn count(&self, store: &dyn Store) -> Result<usize> {
store
.get_config_decoded(&self.key_count)
.get_config_decoded(&self.key_count, DATA_DB_KEY)
.map(|x| x.unwrap_or(0))
}
@ -70,7 +71,7 @@ impl TxStore {
if let Some(db_tx) = db_tx {
db_tx.append(&mut tx);
} else {
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
}
Ok(true)
@ -130,7 +131,7 @@ impl TxStore {
if let Some(db_tx) = db_tx {
db_tx.append(&mut tx);
} else {
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
}
Ok(true)

View File

@ -1657,7 +1657,7 @@ mod tests {
let num_chunks = 123;
let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
let store = Arc::new(LogManager::memorydb(config).unwrap());
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
}

View File

@ -1348,9 +1348,7 @@ mod tests {
let config = LogConfig::default();
let executor = runtime.task_executor.clone();
let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let file_location_cache: Arc<FileLocationCache> =

View File

@ -9,8 +9,6 @@ use storage::{
LogManager,
};
use task_executor::test_utils::TestRuntime;
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
/// The first store is for local node, and data not stored. The second store is for peers, and all
/// transactions are finalized for file sync.
@ -24,11 +22,8 @@ pub fn create_2_store(
Vec<Vec<u8>>,
) {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config, executor).unwrap();
let mut store = LogManager::memorydb(config.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config).unwrap();
let mut offset = 1;
let mut txs = vec![];
@ -120,10 +115,7 @@ pub mod tests {
impl TestStoreRuntime {
pub fn new_store() -> impl LogStore {
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(LogConfig::default(), executor).unwrap()
LogManager::memorydb(LogConfig::default()).unwrap()
}
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {