Compare commits

..

No commits in common. "b708ff2f34fa0d8c7efcf747aac1b3b5943189e3" and "e946400b82cc93688004a2bea974d074124158c9" have entirely different histories.

18 changed files with 232 additions and 279 deletions

View File

@ -15,13 +15,19 @@ abigen!(
); );
#[cfg(feature = "dev")] #[cfg(feature = "dev")]
abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json"); abigen!(
ZgsFlow,
"../../0g-storage-contracts-dev/artifacts/contracts/dataFlow/Flow.sol/Flow.json"
);
#[cfg(feature = "dev")] #[cfg(feature = "dev")]
abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json"); abigen!(
PoraMine,
"../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
);
#[cfg(feature = "dev")] #[cfg(feature = "dev")]
abigen!( abigen!(
ChunkLinearReward, ChunkLinearReward,
"../../storage-contracts-abis/ChunkLinearReward.json" "../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
); );

View File

@ -278,7 +278,6 @@ impl LogSyncManager {
); );
// start the pad data store // start the pad data store
log_sync_manager.store.start_padding(&executor_clone);
let (watch_progress_tx, watch_progress_rx) = let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
@ -512,7 +511,6 @@ impl LogSyncManager {
} }
} }
self.data_cache.garbage_collect(self.next_tx_seq); self.data_cache.garbage_collect(self.next_tx_seq);
self.next_tx_seq += 1; self.next_tx_seq += 1;
// Check if the computed data root matches on-chain state. // Check if the computed data root matches on-chain state.

View File

@ -5,20 +5,17 @@ use ethereum_types::Address;
use ethers::contract::ContractCall; use ethers::contract::ContractCall;
use ethers::contract::EthEvent; use ethers::contract::EthEvent;
use std::sync::Arc; use std::sync::Arc;
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::H256; use storage::H256;
use storage_async::Store; use storage_async::Store;
const MINER_ID: &str = "mine.miner_id"; const MINER_ID: &str = "mine.miner_id";
pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> { pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
store.get_config_decoded(&MINER_ID, DATA_DB_KEY).await store.get_config_decoded(&MINER_ID).await
} }
async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> { async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
store store.set_config_encoded(&MINER_ID, miner_id).await
.set_config_encoded(&MINER_ID, miner_id, DATA_DB_KEY)
.await
} }
pub(crate) async fn check_and_request_miner_id( pub(crate) async fn check_and_request_miner_id(

View File

@ -11,7 +11,7 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE}; use storage::log_store::log_manager::PORA_CHUNK_SIZE;
use storage_async::Store; use storage_async::Store;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
@ -252,7 +252,7 @@ impl Pruner {
.update_shard_config(self.config.shard_config) .update_shard_config(self.config.shard_config)
.await; .await;
self.store self.store
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config, DATA_DB_KEY) .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
.await .await
} }
@ -265,22 +265,17 @@ impl Pruner {
.set_config_encoded( .set_config_encoded(
&FIRST_REWARDABLE_CHUNK_KEY, &FIRST_REWARDABLE_CHUNK_KEY,
&(new_first_rewardable_chunk, new_first_tx_seq), &(new_first_rewardable_chunk, new_first_tx_seq),
FLOW_DB_KEY,
) )
.await .await
} }
} }
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> { async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store store.get_config_decoded(&SHARD_CONFIG_KEY).await
.get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
.await
} }
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> { async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
store store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY)
.await
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -1054,7 +1054,8 @@ mod tests {
let (sync_send, sync_recv) = channel::Channel::unbounded("test"); let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel(); let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let store = LogManager::memorydb(LogConfig::default()).unwrap(); let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
Self { Self {
runtime, runtime,
network_globals: Arc::new(network_globals), network_globals: Arc::new(network_globals),

View File

@ -89,9 +89,10 @@ impl ClientBuilder {
/// Initializes in-memory storage. /// Initializes in-memory storage.
pub fn with_memory_store(mut self) -> Result<Self, String> { pub fn with_memory_store(mut self) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
// TODO(zz): Set config. // TODO(zz): Set config.
let store = Arc::new( let store = Arc::new(
LogManager::memorydb(LogConfig::default()) LogManager::memorydb(LogConfig::default(), executor)
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?, .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
); );
@ -109,11 +110,13 @@ impl ClientBuilder {
/// Initializes RocksDB storage. /// Initializes RocksDB storage.
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> { pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new( let store = Arc::new(
LogManager::rocksdb( LogManager::rocksdb(
config.log_config.clone(), config.log_config.clone(),
config.db_dir.join("flow_db"), config.db_dir.join("flow_db"),
config.db_dir.join("data_db"), config.db_dir.join("data_db"),
executor,
) )
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
); );

View File

@ -74,11 +74,9 @@ impl Store {
pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>( pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
&self, &self,
key: &K, key: &K,
dest: &str,
) -> Result<Option<T>> { ) -> Result<Option<T>> {
let key = key.as_ref().to_vec(); let key = key.as_ref().to_vec();
let dest = dest.to_string(); self.spawn(move |store| store.get_config_decoded(&key))
self.spawn(move |store| store.get_config_decoded(&key, &dest))
.await .await
} }
@ -86,12 +84,10 @@ impl Store {
&self, &self,
key: &K, key: &K,
value: &T, value: &T,
dest: &str,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let key = key.as_ref().to_vec(); let key = key.as_ref().to_vec();
let value = value.as_ssz_bytes(); let value = value.as_ssz_bytes();
let dest = dest.to_string(); self.spawn(move |store| store.set_config(&key, &value))
self.spawn(move |store| store.set_config(&key, &value, &dest))
.await .await
} }

View File

@ -14,14 +14,23 @@ use storage::{
}, },
LogManager, LogManager,
}; };
use task_executor::test_utils::TestRuntime;
fn write_performance(c: &mut Criterion) { fn write_performance(c: &mut Criterion) {
if Path::new("db_write").exists() { if Path::new("db_write").exists() {
fs::remove_dir_all("db_write").unwrap(); fs::remove_dir_all("db_write").unwrap();
} }
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new( let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_flow_write", "db_data_write") LogManager::rocksdb(
LogConfig::default(),
"db_flow_write",
"db_data_write",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(), .unwrap(),
)); ));
@ -105,8 +114,17 @@ fn read_performance(c: &mut Criterion) {
fs::remove_dir_all("db_read").unwrap(); fs::remove_dir_all("db_read").unwrap();
} }
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new( let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_flow_read", "db_data_read") LogManager::rocksdb(
LogConfig::default(),
"db_flow_read",
"db_data_read",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(), .unwrap(),
)); ));

View File

@ -2,55 +2,16 @@ use anyhow::{anyhow, Result};
use kvdb::{DBKey, DBOp}; use kvdb::{DBKey, DBOp};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use crate::log_store::log_manager::{COL_MISC, DATA_DB_KEY, FLOW_DB_KEY};
use crate::LogManager; use crate::LogManager;
macro_rules! db_operation { use super::log_manager::COL_MISC;
($self:expr, $dest:expr, get, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.get(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, put, $key:expr, $value:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.put(COL_MISC, $key, $value)?)
}};
($self:expr, $dest:expr, delete, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.delete(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, transaction, $tx:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
let mut db_tx = db.transaction();
db_tx.ops = $tx.ops;
Ok(db.write(db_tx)?)
}};
}
pub trait Configurable { pub trait Configurable {
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>>; fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()>; fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()>;
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()>; fn remove_config(&self, key: &[u8]) -> Result<()>;
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()>; fn exec_configs(&self, tx: ConfigTx) -> Result<()>;
} }
#[derive(Default)] #[derive(Default)]
@ -80,12 +41,8 @@ impl ConfigTx {
} }
pub trait ConfigurableExt: Configurable { pub trait ConfigurableExt: Configurable {
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>( fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(&self, key: &K) -> Result<Option<T>> {
&self, match self.get_config(key.as_ref())? {
key: &K,
dest: &str,
) -> Result<Option<T>> {
match self.get_config(key.as_ref(), dest)? {
Some(val) => Ok(Some( Some(val) => Ok(Some(
T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?, T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?,
)), )),
@ -93,36 +50,36 @@ pub trait ConfigurableExt: Configurable {
} }
} }
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>( fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(&self, key: &K, value: &T) -> Result<()> {
&self, self.set_config(key.as_ref(), &value.as_ssz_bytes())
key: &K,
value: &T,
dest: &str,
) -> Result<()> {
self.set_config(key.as_ref(), &value.as_ssz_bytes(), dest)
} }
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K, dest: &str) -> Result<()> { fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K) -> Result<()> {
self.remove_config(key.as_ref(), dest) self.remove_config(key.as_ref())
} }
} }
impl<T: ?Sized + Configurable> ConfigurableExt for T {} impl<T: ?Sized + Configurable> ConfigurableExt for T {}
impl Configurable for LogManager { impl Configurable for LogManager {
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>> { fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
db_operation!(self, dest, get, key) Ok(self.data_db.get(COL_MISC, key)?)
} }
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()> { fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
db_operation!(self, dest, put, key, value) self.data_db.put(COL_MISC, key, value)?;
Ok(())
} }
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()> { fn remove_config(&self, key: &[u8]) -> Result<()> {
db_operation!(self, dest, delete, key) Ok(self.data_db.delete(COL_MISC, key)?)
} }
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()> { fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
db_operation!(self, dest, transaction, tx) let mut db_tx = self.data_db.transaction();
db_tx.ops = tx.ops;
self.data_db.write(db_tx)?;
Ok(())
} }
} }

View File

@ -26,16 +26,14 @@ use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
pub struct FlowStore { pub struct FlowStore {
flow_db: Arc<FlowDBStore>,
data_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>,
seal_manager: SealTaskManager, seal_manager: SealTaskManager,
config: FlowConfig, config: FlowConfig,
} }
impl FlowStore { impl FlowStore {
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self { pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
Self { Self {
flow_db,
data_db, data_db,
seal_manager: Default::default(), seal_manager: Default::default(),
config, config,
@ -202,14 +200,6 @@ impl FlowRead for FlowStore {
fn get_shard_config(&self) -> ShardConfig { fn get_shard_config(&self) -> ShardConfig {
*self.config.shard_config.read() *self.config.shard_config.read()
} }
fn get_pad_data(&self, start_index: u64) -> crate::error::Result<Option<Vec<PadPair>>> {
self.flow_db.get_pad_data(start_index)
}
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
self.data_db.get_pad_data_sync_height()
}
} }
impl FlowWrite for FlowStore { impl FlowWrite for FlowStore {
@ -277,14 +267,6 @@ impl FlowWrite for FlowStore {
fn update_shard_config(&self, shard_config: ShardConfig) { fn update_shard_config(&self, shard_config: ShardConfig) {
*self.config.shard_config.write() = shard_config; *self.config.shard_config.write() = shard_config;
} }
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> crate::error::Result<()> {
self.flow_db.put_pad_data(data_sizes, tx_seq)
}
fn put_pad_data_sync_height(&self, sync_index: u64) -> crate::error::Result<()> {
self.data_db.put_pad_data_sync_height(sync_index)
}
} }
impl FlowSeal for FlowStore { impl FlowSeal for FlowStore {
@ -362,10 +344,33 @@ impl FlowSeal for FlowStore {
} }
} }
#[derive(Debug, PartialEq, DeriveEncode, DeriveDecode)] pub struct PadDataStore {
pub struct PadPair { flow_db: Arc<FlowDBStore>,
pub start_index: u64, data_db: Arc<FlowDBStore>,
pub data_size: u64, }
impl PadDataStore {
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>) -> Self {
Self { flow_db, data_db }
}
pub fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> {
self.flow_db.put_pad_data(data_size, start_index)?;
Ok(())
}
pub fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> {
self.data_db.put_pad_data_sync_height(sync_index)?;
Ok(())
}
pub fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
self.data_db.get_pad_data_sync_heigh()
}
pub fn get_pad_data(&self, start_index: u64) -> Result<Option<usize>> {
self.flow_db.get_pad_data(start_index)
}
} }
pub struct FlowDBStore { pub struct FlowDBStore {
@ -469,31 +474,29 @@ impl FlowDBStore {
Ok(self.kvdb.write(tx)?) Ok(self.kvdb.write(tx)?)
} }
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()> { fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
let mut buffer = Vec::new();
for item in data_sizes {
buffer.extend(item.as_ssz_bytes());
}
tx.put(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes(), &buffer);
self.kvdb.write(tx)?;
Ok(())
}
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()> {
let mut tx = self.kvdb.transaction(); let mut tx = self.kvdb.transaction();
tx.put( tx.put(
COL_PAD_DATA_SYNC_HEIGH, COL_PAD_DATA_LIST,
b"sync_height", &start_index.to_be_bytes(),
&tx_seq.to_be_bytes(), &data_size.to_be_bytes(),
); );
self.kvdb.write(tx)?; self.kvdb.write(tx)?;
Ok(()) Ok(())
} }
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> { fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
tx.put(
COL_PAD_DATA_SYNC_HEIGH,
b"sync_height",
&sync_index.to_be_bytes(),
);
self.kvdb.write(tx)?;
Ok(())
}
fn get_pad_data_sync_heigh(&self) -> Result<Option<u64>> {
match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? { match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
Some(v) => Ok(Some(u64::from_be_bytes( Some(v) => Ok(Some(u64::from_be_bytes(
v.try_into().map_err(|e| anyhow!("{:?}", e))?, v.try_into().map_err(|e| anyhow!("{:?}", e))?,
@ -502,11 +505,14 @@ impl FlowDBStore {
} }
} }
fn get_pad_data(&self, tx_seq: u64) -> Result<Option<Vec<PadPair>>> { fn get_pad_data(&self, start_index: u64) -> Result<Option<usize>> {
match self.kvdb.get(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes())? { match self
Some(v) => Ok(Some( .kvdb
Vec::<PadPair>::from_ssz_bytes(&v).map_err(Error::from)?, .get(COL_PAD_DATA_LIST, &start_index.to_be_bytes())?
)), {
Some(v) => Ok(Some(usize::from_be_bytes(
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
))),
None => Ok(None), None => Ok(None),
} }
} }

View File

@ -1,11 +1,12 @@
use super::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig; use crate::config::ShardConfig;
use crate::log_store::flow_store::{ use crate::log_store::flow_store::{
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair, batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadDataStore,
}; };
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore}; use crate::log_store::tx_store::TransactionStore;
use crate::log_store::{ use crate::log_store::{
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
}; };
use crate::{try_option, ZgsKeyValueDB}; use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
@ -25,6 +26,7 @@ use shared_types::{
use std::cmp::Ordering; use std::cmp::Ordering;
use std::path::Path; use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
@ -33,19 +35,18 @@ pub const ENTRY_SIZE: usize = 256;
/// 1024 Entries. /// 1024 Entries.
pub const PORA_CHUNK_SIZE: usize = 1024; pub const PORA_CHUNK_SIZE: usize = 1024;
pub const COL_TX: u32 = 0; // flow db pub const COL_TX: u32 = 0;
pub const COL_ENTRY_BATCH: u32 = 1; // data db pub const COL_ENTRY_BATCH: u32 = 1;
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; // flow db pub const COL_TX_DATA_ROOT_INDEX: u32 = 2;
pub const COL_TX_COMPLETED: u32 = 3; // data db pub const COL_ENTRY_BATCH_ROOT: u32 = 3;
pub const COL_MISC: u32 = 4; // flow db & data db pub const COL_TX_COMPLETED: u32 = 4;
pub const COL_FLOW_MPT_NODES: u32 = 5; // flow db pub const COL_MISC: u32 = 5;
pub const COL_BLOCK_PROGRESS: u32 = 6; // flow db pub const COL_SEAL_CONTEXT: u32 = 6;
pub const COL_PAD_DATA_LIST: u32 = 7; // flow db pub const COL_FLOW_MPT_NODES: u32 = 7;
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db pub const COL_BLOCK_PROGRESS: u32 = 8;
pub const COL_NUM: u32 = 9; pub const COL_NUM: u32 = 9;
pub const COL_PAD_DATA_LIST: u32 = 10;
pub const DATA_DB_KEY: &str = "data_db"; pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 11;
pub const FLOW_DB_KEY: &str = "flow_db";
// Process at most 1M entries (256MB) pad data at a time. // Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20; const PAD_MAX_SIZE: usize = 1 << 20;
@ -64,11 +65,12 @@ pub struct UpdateFlowMessage {
} }
pub struct LogManager { pub struct LogManager {
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>, pub(crate) pad_db: Arc<PadDataStore>,
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>, pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore, tx_store: TransactionStore,
flow_store: Arc<FlowStore>, flow_store: Arc<FlowStore>,
merkle: RwLock<MerkleManager>, merkle: RwLock<MerkleManager>,
sender: mpsc::Sender<UpdateFlowMessage>,
} }
struct MerkleManager { struct MerkleManager {
@ -267,12 +269,7 @@ impl LogStoreWrite for LogManager {
} }
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned(); let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
// TODO(zz): Should we validate received tx? // TODO(zz): Should we validate received tx?
self.append_subtree_list( self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
tx.seq,
tx.start_entry_index,
tx.merkle_nodes.clone(),
&mut merkle,
)?;
merkle.commit_merkle(tx.seq)?; merkle.commit_merkle(tx.seq)?;
debug!( debug!(
"commit flow root: root={:?}", "commit flow root: root={:?}",
@ -409,41 +406,6 @@ impl LogStoreWrite for LogManager {
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> { fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.flow_store.submit_seal_result(answers) self.flow_store.submit_seal_result(answers)
} }
fn start_padding(&self, executor: &task_executor::TaskExecutor) {
let store = self.flow_store.clone();
executor.spawn(
async move {
let current_height = store.get_pad_data_sync_height().unwrap();
let mut start_index = current_height.unwrap_or(0);
loop {
match store.get_pad_data(start_index) {
std::result::Result::Ok(data) => {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
if let Some(data) = data {
for pad in data {
store
.append_entries(ChunkArray {
data: vec![0; pad.data_size as usize],
start_index: pad.start_index,
})
.unwrap();
}
};
store.put_pad_data_sync_height(start_index).unwrap();
start_index += 1;
}
std::result::Result::Err(_) => {
debug!("Unable to get pad data, start_index={}", start_index);
}
};
}
},
"pad_tx",
);
}
} }
impl LogStoreChunkRead for LogManager { impl LogStoreChunkRead for LogManager {
@ -657,33 +619,32 @@ impl LogManager {
config: LogConfig, config: LogConfig,
flow_path: impl AsRef<Path>, flow_path: impl AsRef<Path>,
data_path: impl AsRef<Path>, data_path: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> { ) -> Result<Self> {
let mut db_config = DatabaseConfig::with_columns(COL_NUM); let mut db_config = DatabaseConfig::with_columns(COL_NUM);
db_config.enable_statistics = true; db_config.enable_statistics = true;
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?); let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
let data_db_source = Arc::new(Database::open(&db_config, data_path)?); let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
Self::new(flow_db_source, data_db_source, config) Self::new(flow_db_source, data_db_source, config, executor)
} }
pub fn memorydb(config: LogConfig) -> Result<Self> { pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM)); let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM)); let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(flow_db, data_db, config) Self::new(flow_db, data_db, config, executor)
} }
fn new( fn new(
flow_db_source: Arc<dyn ZgsKeyValueDB>, flow_db_source: Arc<dyn ZgsKeyValueDB>,
data_db_source: Arc<dyn ZgsKeyValueDB>, data_db_source: Arc<dyn ZgsKeyValueDB>,
config: LogConfig, config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> { ) -> Result<Self> {
let tx_store = TransactionStore::new(data_db_source.clone())?; let tx_store = TransactionStore::new(data_db_source.clone())?;
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
let flow_store = Arc::new(FlowStore::new( let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
flow_db.clone(), let pad_db = Arc::new(PadDataStore::new(flow_db.clone(), data_db.clone()));
data_db.clone(),
config.flow.clone(),
));
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle` // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
// first and call `put_tx` later. // first and call `put_tx` later.
let next_tx_seq = tx_store.next_tx_seq(); let next_tx_seq = tx_store.next_tx_seq();
@ -784,14 +745,19 @@ impl LogManager {
last_chunk_merkle, last_chunk_merkle,
}); });
let log_manager = Self { let (sender, receiver) = mpsc::channel();
flow_db: flow_db_source,
let mut log_manager = Self {
pad_db,
data_db: data_db_source, data_db: data_db_source,
tx_store, tx_store,
flow_store, flow_store,
merkle, merkle,
sender,
}; };
log_manager.start_receiver(receiver, executor);
if let Some(tx) = last_tx_to_insert { if let Some(tx) = last_tx_to_insert {
log_manager.put_tx(tx)?; log_manager.put_tx(tx)?;
} }
@ -806,6 +772,40 @@ impl LogManager {
Ok(log_manager) Ok(log_manager)
} }
fn start_receiver(
&mut self,
rx: mpsc::Receiver<UpdateFlowMessage>,
executor: task_executor::TaskExecutor,
) {
let flow_store = self.flow_store.clone();
executor.spawn(
async move {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store
.append_entries(ChunkArray {
data: vec![0; data.pad_data],
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
debug!("Log manager inner channel closed");
break;
}
};
}
},
"pad_tx",
);
// Wait for the spawned thread to finish
// let _ = handle.join().expect("Thread panicked");
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> { fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
match maybe_root { match maybe_root {
None => self.gen_proof_at_version(flow_index, None), None => self.gen_proof_at_version(flow_index, None),
@ -861,7 +861,6 @@ impl LogManager {
#[instrument(skip(self, merkle))] #[instrument(skip(self, merkle))]
fn append_subtree_list( fn append_subtree_list(
&self, &self,
tx_seq: u64,
tx_start_index: u64, tx_start_index: u64,
merkle_list: Vec<(usize, DataRoot)>, merkle_list: Vec<(usize, DataRoot)>,
merkle: &mut MerkleManager, merkle: &mut MerkleManager,
@ -870,7 +869,7 @@ impl LogManager {
return Ok(()); return Ok(());
} }
self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?; self.pad_tx(tx_start_index, &mut *merkle)?;
for (subtree_depth, subtree_root) in merkle_list { for (subtree_depth, subtree_root) in merkle_list {
let subtree_size = 1 << (subtree_depth - 1); let subtree_size = 1 << (subtree_depth - 1);
@ -908,7 +907,7 @@ impl LogManager {
} }
#[instrument(skip(self, merkle))] #[instrument(skip(self, merkle))]
fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> { fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow. // Check if we need to pad the flow.
let mut tx_start_flow_index = let mut tx_start_flow_index =
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64; merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
@ -918,7 +917,6 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(), merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves() merkle.last_chunk_merkle.leaves()
); );
let mut pad_list = vec![];
if pad_size != 0 { if pad_size != 0 {
for pad_data in Self::padding(pad_size as usize) { for pad_data in Self::padding(pad_size as usize) {
let mut is_full_empty = true; let mut is_full_empty = true;
@ -963,10 +961,6 @@ impl LogManager {
let data_size = pad_data.len() / ENTRY_SIZE; let data_size = pad_data.len() / ENTRY_SIZE;
if is_full_empty { if is_full_empty {
pad_list.push(PadPair {
data_size: pad_data.len() as u64,
start_index: tx_start_flow_index,
});
} else { } else {
// Update the flow database. // Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save // This should be called before `complete_last_chunk_merkle` so that we do not save
@ -988,8 +982,6 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(), merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves() merkle.last_chunk_merkle.leaves()
); );
self.flow_store.put_pad_data(&pad_list, tx_seq)?;
Ok(()) Ok(())
} }

View File

@ -1,7 +1,6 @@
use crate::config::ShardConfig; use crate::config::ShardConfig;
use ethereum_types::H256; use ethereum_types::H256;
use flow_store::PadPair;
use shared_types::{ use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
Transaction, Transaction,
@ -16,6 +15,7 @@ pub mod config;
mod flow_store; mod flow_store;
mod load_chunk; mod load_chunk;
pub mod log_manager; pub mod log_manager;
mod pad_data_store;
mod seal_task_manager; mod seal_task_manager;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -159,8 +159,6 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
fn update_shard_config(&self, shard_config: ShardConfig); fn update_shard_config(&self, shard_config: ShardConfig);
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>; fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
fn start_padding(&self, executor: &task_executor::TaskExecutor);
} }
pub trait LogStoreChunkWrite { pub trait LogStoreChunkWrite {
@ -220,10 +218,6 @@ pub trait FlowRead {
fn get_num_entries(&self) -> Result<u64>; fn get_num_entries(&self) -> Result<u64>;
fn get_shard_config(&self) -> ShardConfig; fn get_shard_config(&self) -> ShardConfig;
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
} }
pub trait FlowWrite { pub trait FlowWrite {
@ -238,10 +232,6 @@ pub trait FlowWrite {
/// Update the shard config. /// Update the shard config.
fn update_shard_config(&self, shard_config: ShardConfig); fn update_shard_config(&self, shard_config: ShardConfig);
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
} }
pub struct SealTask { pub struct SealTask {
@ -280,23 +270,3 @@ pub trait FlowSeal {
pub trait Flow: FlowRead + FlowWrite + FlowSeal {} pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {} impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
pub trait PadDataStoreRead {
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
}
pub trait PadDataStoreWrite {
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
fn start_padding(&mut self, executor: &task_executor::TaskExecutor);
}
pub trait PadDataStore:
PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static
{
}
impl<T: PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static>
PadDataStore for T
{
}

View File

@ -9,10 +9,15 @@ use rand::random;
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE}; use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
use std::cmp; use std::cmp;
use task_executor::test_utils::TestRuntime;
#[test] #[test]
fn test_put_get() { fn test_put_get() {
let config = LogConfig::default(); let config = LogConfig::default();
let store = LogManager::memorydb(config.clone()).unwrap(); let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(config.clone(), executor).unwrap();
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1; let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
// Aligned with size. // Aligned with size.
let start_offset = 1024; let start_offset = 1024;
@ -169,7 +174,10 @@ fn test_put_tx() {
fn create_store() -> LogManager { fn create_store() -> LogManager {
let config = LogConfig::default(); let config = LogConfig::default();
LogManager::memorydb(config).unwrap() let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(config, executor).unwrap()
} }
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) { fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {

View File

@ -1,10 +1,7 @@
use super::tx_store::TxStore; use super::tx_store::TxStore;
use anyhow::Result; use anyhow::Result;
use std::sync::Arc; use std::sync::Arc;
use storage::log_store::{ use storage::log_store::config::{ConfigTx, ConfigurableExt};
config::{ConfigTx, ConfigurableExt},
log_manager::DATA_DB_KEY,
};
use storage_async::Store; use storage_async::Store;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -69,10 +66,10 @@ impl SyncStore {
let store = async_store.get_store(); let store = async_store.get_store();
// load next_tx_seq // load next_tx_seq
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)?; let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?;
// load max_tx_seq // load max_tx_seq
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ, DATA_DB_KEY)?; let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ)?;
Ok((next_tx_seq, max_tx_seq)) Ok((next_tx_seq, max_tx_seq))
} }
@ -80,13 +77,13 @@ impl SyncStore {
pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> { pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await; let async_store = self.store.write().await;
let store = async_store.get_store(); let store = async_store.get_store();
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq, DATA_DB_KEY) store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq)
} }
pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> { pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await; let async_store = self.store.write().await;
let store = async_store.get_store(); let store = async_store.get_store();
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq, DATA_DB_KEY) store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
} }
pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> { pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
@ -117,7 +114,7 @@ impl SyncStore {
} }
let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?; let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx, DATA_DB_KEY)?; store.exec_configs(tx)?;
if removed { if removed {
Ok(InsertResult::Upgraded) Ok(InsertResult::Upgraded)
@ -131,7 +128,7 @@ impl SyncStore {
} }
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?; let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx, DATA_DB_KEY)?; store.exec_configs(tx)?;
if removed { if removed {
Ok(InsertResult::Downgraded) Ok(InsertResult::Downgraded)
@ -154,7 +151,7 @@ impl SyncStore {
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?; let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx, DATA_DB_KEY)?; store.exec_configs(tx)?;
Ok(added) Ok(added)
} }

View File

@ -1,7 +1,6 @@
use anyhow::Result; use anyhow::Result;
use rand::Rng; use rand::Rng;
use storage::log_store::config::{ConfigTx, ConfigurableExt}; use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::log_store::Store; use storage::log_store::Store;
/// TxStore is used to store pending transactions that to be synchronized in advance. /// TxStore is used to store pending transactions that to be synchronized in advance.
@ -33,11 +32,11 @@ impl TxStore {
} }
fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> { fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> {
store.get_config_decoded(&self.key_seq_to_index(tx_seq), DATA_DB_KEY) store.get_config_decoded(&self.key_seq_to_index(tx_seq))
} }
fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> { fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> {
store.get_config_decoded(&self.key_index_to_seq(index), DATA_DB_KEY) store.get_config_decoded(&self.key_index_to_seq(index))
} }
pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> { pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
@ -46,7 +45,7 @@ impl TxStore {
pub fn count(&self, store: &dyn Store) -> Result<usize> { pub fn count(&self, store: &dyn Store) -> Result<usize> {
store store
.get_config_decoded(&self.key_count, DATA_DB_KEY) .get_config_decoded(&self.key_count)
.map(|x| x.unwrap_or(0)) .map(|x| x.unwrap_or(0))
} }
@ -71,7 +70,7 @@ impl TxStore {
if let Some(db_tx) = db_tx { if let Some(db_tx) = db_tx {
db_tx.append(&mut tx); db_tx.append(&mut tx);
} else { } else {
store.exec_configs(tx, DATA_DB_KEY)?; store.exec_configs(tx)?;
} }
Ok(true) Ok(true)
@ -131,7 +130,7 @@ impl TxStore {
if let Some(db_tx) = db_tx { if let Some(db_tx) = db_tx {
db_tx.append(&mut tx); db_tx.append(&mut tx);
} else { } else {
store.exec_configs(tx, DATA_DB_KEY)?; store.exec_configs(tx)?;
} }
Ok(true) Ok(true)

View File

@ -1657,7 +1657,7 @@ mod tests {
let num_chunks = 123; let num_chunks = 123;
let config = LogConfig::default(); let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config).unwrap()); let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
create_controller(task_executor, peer_id, store, tx_id, num_chunks) create_controller(task_executor, peer_id, store, tx_id, num_chunks)
} }

View File

@ -1348,7 +1348,9 @@ mod tests {
let config = LogConfig::default(); let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap()); let executor = runtime.task_executor.clone();
let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id(); let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let file_location_cache: Arc<FileLocationCache> = let file_location_cache: Arc<FileLocationCache> =

View File

@ -9,6 +9,8 @@ use storage::{
LogManager, LogManager,
}; };
use task_executor::test_utils::TestRuntime;
/// Creates stores for local node and peers with initialized transaction of specified chunk count. /// Creates stores for local node and peers with initialized transaction of specified chunk count.
/// The first store is for local node, and data not stored. The second store is for peers, and all /// The first store is for local node, and data not stored. The second store is for peers, and all
/// transactions are finalized for file sync. /// transactions are finalized for file sync.
@ -22,8 +24,11 @@ pub fn create_2_store(
Vec<Vec<u8>>, Vec<Vec<u8>>,
) { ) {
let config = LogConfig::default(); let config = LogConfig::default();
let mut store = LogManager::memorydb(config.clone()).unwrap(); let runtime = TestRuntime::default();
let mut peer_store = LogManager::memorydb(config).unwrap();
let executor = runtime.task_executor.clone();
let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config, executor).unwrap();
let mut offset = 1; let mut offset = 1;
let mut txs = vec![]; let mut txs = vec![];
@ -115,7 +120,10 @@ pub mod tests {
impl TestStoreRuntime { impl TestStoreRuntime {
pub fn new_store() -> impl LogStore { pub fn new_store() -> impl LogStore {
LogManager::memorydb(LogConfig::default()).unwrap() let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(LogConfig::default(), executor).unwrap()
} }
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime { pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {