From 3b4ed1502b0cf63a62c6ecd9cf23710eef1be1a3 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Thu, 7 Nov 2024 17:19:52 +0800 Subject: [PATCH] support async padding after sync phase --- common/contract-interface/src/lib.rs | 12 +- node/log_entry_sync/src/sync_manager/mod.rs | 2 + node/router/src/libp2p_event_handler.rs | 3 +- node/src/client/builder.rs | 5 +- node/storage/benches/benchmark.rs | 30 +--- node/storage/src/log_store/flow_store.rs | 84 ++++++------ node/storage/src/log_store/log_manager.rs | 143 ++++++++++---------- node/storage/src/log_store/mod.rs | 32 ++++- node/storage/src/log_store/tests.rs | 12 +- node/sync/src/controllers/serial.rs | 2 +- node/sync/src/service.rs | 4 +- node/sync/src/test_util.rs | 14 +- 12 files changed, 163 insertions(+), 180 deletions(-) diff --git a/common/contract-interface/src/lib.rs b/common/contract-interface/src/lib.rs index 5e3badb..900f189 100644 --- a/common/contract-interface/src/lib.rs +++ b/common/contract-interface/src/lib.rs @@ -15,19 +15,13 @@ abigen!( ); #[cfg(feature = "dev")] -abigen!( - ZgsFlow, - "../../0g-storage-contracts-dev/artifacts/contracts/dataFlow/Flow.sol/Flow.json" -); +abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json"); #[cfg(feature = "dev")] -abigen!( - PoraMine, - "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json" -); +abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json"); #[cfg(feature = "dev")] abigen!( ChunkLinearReward, - "../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json" + "../../storage-contracts-abis/ChunkLinearReward.json" ); diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index d665eee..2412990 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -278,6 +278,7 @@ impl LogSyncManager { ); // start the pad data store + log_sync_manager.store.start_padding(&executor_clone); let (watch_progress_tx, watch_progress_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -511,6 +512,7 @@ impl LogSyncManager { } } self.data_cache.garbage_collect(self.next_tx_seq); + self.next_tx_seq += 1; // Check if the computed data root matches on-chain state. diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 439dafa..09a0693 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -1054,8 +1054,7 @@ mod tests { let (sync_send, sync_recv) = channel::Channel::unbounded("test"); let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel(); - let executor = runtime.task_executor.clone(); - let store = LogManager::memorydb(LogConfig::default(), executor).unwrap(); + let store = LogManager::memorydb(LogConfig::default()).unwrap(); Self { runtime, network_globals: Arc::new(network_globals), diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 59b2f4c..1b4df65 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -89,10 +89,9 @@ impl ClientBuilder { /// Initializes in-memory storage. pub fn with_memory_store(mut self) -> Result { - let executor = require!("sync", self, runtime_context).clone().executor; // TODO(zz): Set config. let store = Arc::new( - LogManager::memorydb(LogConfig::default(), executor) + LogManager::memorydb(LogConfig::default()) .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?, ); @@ -110,13 +109,11 @@ impl ClientBuilder { /// Initializes RocksDB storage. pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result { - let executor = require!("sync", self, runtime_context).clone().executor; let store = Arc::new( LogManager::rocksdb( config.log_config.clone(), config.db_dir.join("flow_db"), config.db_dir.join("data_db"), - executor, ) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, ); diff --git a/node/storage/benches/benchmark.rs b/node/storage/benches/benchmark.rs index 4516c9b..a12442a 100644 --- a/node/storage/benches/benchmark.rs +++ b/node/storage/benches/benchmark.rs @@ -14,25 +14,16 @@ use storage::{ }, LogManager, }; -use task_executor::test_utils::TestRuntime; fn write_performance(c: &mut Criterion) { if Path::new("db_write").exists() { fs::remove_dir_all("db_write").unwrap(); } - let runtime = TestRuntime::default(); - - let executor = runtime.task_executor.clone(); let store: Arc> = Arc::new(RwLock::new( - LogManager::rocksdb( - LogConfig::default(), - "db_flow_write", - "db_data_write", - executor, - ) - .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) - .unwrap(), + LogManager::rocksdb(LogConfig::default(), "db_flow_write", "db_data_write") + .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) + .unwrap(), )); let chunk_count = 2048; @@ -114,19 +105,10 @@ fn read_performance(c: &mut Criterion) { fs::remove_dir_all("db_read").unwrap(); } - let runtime = TestRuntime::default(); - - let executor = runtime.task_executor.clone(); - let store: Arc> = Arc::new(RwLock::new( - LogManager::rocksdb( - LogConfig::default(), - "db_flow_read", - "db_data_read", - executor, - ) - .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) - .unwrap(), + LogManager::rocksdb(LogConfig::default(), "db_flow_read", "db_data_read") + .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) + .unwrap(), )); let tx_size = 1000; diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 386c76f..c1f7bc8 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -26,14 +26,16 @@ use tracing::{debug, error, trace}; use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL}; pub struct FlowStore { + flow_db: Arc, data_db: Arc, seal_manager: SealTaskManager, config: FlowConfig, } impl FlowStore { - pub fn new(data_db: Arc, config: FlowConfig) -> Self { + pub fn new(flow_db: Arc, data_db: Arc, config: FlowConfig) -> Self { Self { + flow_db, data_db, seal_manager: Default::default(), config, @@ -200,6 +202,14 @@ impl FlowRead for FlowStore { fn get_shard_config(&self) -> ShardConfig { *self.config.shard_config.read() } + + fn get_pad_data(&self, start_index: u64) -> crate::error::Result>> { + self.flow_db.get_pad_data(start_index) + } + + fn get_pad_data_sync_height(&self) -> Result> { + self.data_db.get_pad_data_sync_height() + } } impl FlowWrite for FlowStore { @@ -267,6 +277,14 @@ impl FlowWrite for FlowStore { fn update_shard_config(&self, shard_config: ShardConfig) { *self.config.shard_config.write() = shard_config; } + + fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> crate::error::Result<()> { + self.flow_db.put_pad_data(data_sizes, tx_seq) + } + + fn put_pad_data_sync_height(&self, sync_index: u64) -> crate::error::Result<()> { + self.data_db.put_pad_data_sync_height(sync_index) + } } impl FlowSeal for FlowStore { @@ -344,33 +362,10 @@ impl FlowSeal for FlowStore { } } -pub struct PadDataStore { - flow_db: Arc, - data_db: Arc, -} - -impl PadDataStore { - pub fn new(flow_db: Arc, data_db: Arc) -> Self { - Self { flow_db, data_db } - } - - pub fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> { - self.flow_db.put_pad_data(data_size, start_index)?; - Ok(()) - } - - pub fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> { - self.data_db.put_pad_data_sync_height(sync_index)?; - Ok(()) - } - - pub fn get_pad_data_sync_height(&self) -> Result> { - self.data_db.get_pad_data_sync_heigh() - } - - pub fn get_pad_data(&self, start_index: u64) -> Result> { - self.flow_db.get_pad_data(start_index) - } +#[derive(Debug, PartialEq, DeriveEncode, DeriveDecode)] +pub struct PadPair { + pub start_index: u64, + pub data_size: u64, } pub struct FlowDBStore { @@ -474,29 +469,31 @@ impl FlowDBStore { Ok(self.kvdb.write(tx)?) } - fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> { + fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()> { let mut tx = self.kvdb.transaction(); - tx.put( - COL_PAD_DATA_LIST, - &start_index.to_be_bytes(), - &data_size.to_be_bytes(), - ); + + let mut buffer = Vec::new(); + for item in data_sizes { + buffer.extend(item.as_ssz_bytes()); + } + + tx.put(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes(), &buffer); self.kvdb.write(tx)?; Ok(()) } - fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> { + fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()> { let mut tx = self.kvdb.transaction(); tx.put( COL_PAD_DATA_SYNC_HEIGH, b"sync_height", - &sync_index.to_be_bytes(), + &tx_seq.to_be_bytes(), ); self.kvdb.write(tx)?; Ok(()) } - fn get_pad_data_sync_heigh(&self) -> Result> { + fn get_pad_data_sync_height(&self) -> Result> { match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? { Some(v) => Ok(Some(u64::from_be_bytes( v.try_into().map_err(|e| anyhow!("{:?}", e))?, @@ -505,14 +502,11 @@ impl FlowDBStore { } } - fn get_pad_data(&self, start_index: u64) -> Result> { - match self - .kvdb - .get(COL_PAD_DATA_LIST, &start_index.to_be_bytes())? - { - Some(v) => Ok(Some(usize::from_be_bytes( - v.try_into().map_err(|e| anyhow!("{:?}", e))?, - ))), + fn get_pad_data(&self, tx_seq: u64) -> Result>> { + match self.kvdb.get(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes())? { + Some(v) => Ok(Some( + Vec::::from_ssz_bytes(&v).map_err(Error::from)?, + )), None => Ok(None), } } diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 7beb3aa..6d165a7 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1,12 +1,11 @@ -use super::tx_store::BlockHashAndSubmissionIndex; -use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; use crate::log_store::flow_store::{ - batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadDataStore, + batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair, }; -use crate::log_store::tx_store::TransactionStore; +use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore}; use crate::log_store::{ - FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite, + FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, + LogStoreWrite, MineLoadChunk, SealAnswer, SealTask, }; use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; @@ -26,7 +25,6 @@ use shared_types::{ use std::cmp::Ordering; use std::path::Path; -use std::sync::mpsc; use std::sync::Arc; use tracing::{debug, error, info, instrument, trace, warn}; @@ -35,18 +33,16 @@ pub const ENTRY_SIZE: usize = 256; /// 1024 Entries. pub const PORA_CHUNK_SIZE: usize = 1024; -pub const COL_TX: u32 = 0; -pub const COL_ENTRY_BATCH: u32 = 1; -pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; -pub const COL_ENTRY_BATCH_ROOT: u32 = 3; -pub const COL_TX_COMPLETED: u32 = 4; -pub const COL_MISC: u32 = 5; -pub const COL_SEAL_CONTEXT: u32 = 6; -pub const COL_FLOW_MPT_NODES: u32 = 7; -pub const COL_BLOCK_PROGRESS: u32 = 8; +pub const COL_TX: u32 = 0; // flow db +pub const COL_ENTRY_BATCH: u32 = 1; // data db +pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; // flow db +pub const COL_TX_COMPLETED: u32 = 3; // data db +pub const COL_MISC: u32 = 4; // flow db & data db +pub const COL_FLOW_MPT_NODES: u32 = 5; // flow db +pub const COL_BLOCK_PROGRESS: u32 = 6; // flow db +pub const COL_PAD_DATA_LIST: u32 = 7; // flow db +pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db pub const COL_NUM: u32 = 9; -pub const COL_PAD_DATA_LIST: u32 = 10; -pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 11; // Process at most 1M entries (256MB) pad data at a time. const PAD_MAX_SIZE: usize = 1 << 20; @@ -65,12 +61,10 @@ pub struct UpdateFlowMessage { } pub struct LogManager { - pub(crate) pad_db: Arc, pub(crate) data_db: Arc, tx_store: TransactionStore, flow_store: Arc, merkle: RwLock, - sender: mpsc::Sender, } struct MerkleManager { @@ -269,7 +263,12 @@ impl LogStoreWrite for LogManager { } let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned(); // TODO(zz): Should we validate received tx? - self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?; + self.append_subtree_list( + tx.seq, + tx.start_entry_index, + tx.merkle_nodes.clone(), + &mut merkle, + )?; merkle.commit_merkle(tx.seq)?; debug!( "commit flow root: root={:?}", @@ -406,6 +405,41 @@ impl LogStoreWrite for LogManager { fn submit_seal_result(&self, answers: Vec) -> Result<()> { self.flow_store.submit_seal_result(answers) } + + fn start_padding(&self, executor: &task_executor::TaskExecutor) { + let store = self.flow_store.clone(); + executor.spawn( + async move { + let current_height = store.get_pad_data_sync_height().unwrap(); + let mut start_index = current_height.unwrap_or(0); + loop { + match store.get_pad_data(start_index) { + std::result::Result::Ok(data) => { + // Update the flow database. + // This should be called before `complete_last_chunk_merkle` so that we do not save + // subtrees with data known. + if let Some(data) = data { + for pad in data { + store + .append_entries(ChunkArray { + data: vec![0; pad.data_size as usize], + start_index: pad.start_index, + }) + .unwrap(); + } + }; + store.put_pad_data_sync_height(start_index).unwrap(); + start_index += 1; + } + std::result::Result::Err(_) => { + debug!("Unable to get pad data, start_index={}", start_index); + } + }; + } + }, + "pad_tx", + ); + } } impl LogStoreChunkRead for LogManager { @@ -619,32 +653,33 @@ impl LogManager { config: LogConfig, flow_path: impl AsRef, data_path: impl AsRef, - executor: task_executor::TaskExecutor, ) -> Result { let mut db_config = DatabaseConfig::with_columns(COL_NUM); db_config.enable_statistics = true; let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?); let data_db_source = Arc::new(Database::open(&db_config, data_path)?); - Self::new(flow_db_source, data_db_source, config, executor) + Self::new(flow_db_source, data_db_source, config) } - pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result { + pub fn memorydb(config: LogConfig) -> Result { let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM)); let data_db = Arc::new(kvdb_memorydb::create(COL_NUM)); - Self::new(flow_db, data_db, config, executor) + Self::new(flow_db, data_db, config) } fn new( flow_db_source: Arc, data_db_source: Arc, config: LogConfig, - executor: task_executor::TaskExecutor, ) -> Result { let tx_store = TransactionStore::new(data_db_source.clone())?; let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); - let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone())); - let pad_db = Arc::new(PadDataStore::new(flow_db.clone(), data_db.clone())); + let flow_store = Arc::new(FlowStore::new( + flow_db.clone(), + data_db.clone(), + config.flow.clone(), + )); // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle` // first and call `put_tx` later. let next_tx_seq = tx_store.next_tx_seq(); @@ -745,19 +780,13 @@ impl LogManager { last_chunk_merkle, }); - let (sender, receiver) = mpsc::channel(); - - let mut log_manager = Self { - pad_db, + let log_manager = Self { data_db: data_db_source, tx_store, flow_store, merkle, - sender, }; - log_manager.start_receiver(receiver, executor); - if let Some(tx) = last_tx_to_insert { log_manager.put_tx(tx)?; } @@ -772,40 +801,6 @@ impl LogManager { Ok(log_manager) } - fn start_receiver( - &mut self, - rx: mpsc::Receiver, - executor: task_executor::TaskExecutor, - ) { - let flow_store = self.flow_store.clone(); - executor.spawn( - async move { - loop { - match rx.recv() { - std::result::Result::Ok(data) => { - // Update the flow database. - // This should be called before `complete_last_chunk_merkle` so that we do not save - // subtrees with data known. - flow_store - .append_entries(ChunkArray { - data: vec![0; data.pad_data], - start_index: data.tx_start_flow_index, - }) - .unwrap(); - } - std::result::Result::Err(_) => { - debug!("Log manager inner channel closed"); - break; - } - }; - } - }, - "pad_tx", - ); - // Wait for the spawned thread to finish - // let _ = handle.join().expect("Thread panicked"); - } - fn gen_proof(&self, flow_index: u64, maybe_root: Option) -> Result { match maybe_root { None => self.gen_proof_at_version(flow_index, None), @@ -861,6 +856,7 @@ impl LogManager { #[instrument(skip(self, merkle))] fn append_subtree_list( &self, + tx_seq: u64, tx_start_index: u64, merkle_list: Vec<(usize, DataRoot)>, merkle: &mut MerkleManager, @@ -869,7 +865,7 @@ impl LogManager { return Ok(()); } - self.pad_tx(tx_start_index, &mut *merkle)?; + self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?; for (subtree_depth, subtree_root) in merkle_list { let subtree_size = 1 << (subtree_depth - 1); @@ -907,7 +903,7 @@ impl LogManager { } #[instrument(skip(self, merkle))] - fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> { + fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> { // Check if we need to pad the flow. let mut tx_start_flow_index = merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64; @@ -917,6 +913,7 @@ impl LogManager { merkle.pora_chunks_merkle.leaves(), merkle.last_chunk_merkle.leaves() ); + let mut pad_list = vec![]; if pad_size != 0 { for pad_data in Self::padding(pad_size as usize) { let mut is_full_empty = true; @@ -961,6 +958,10 @@ impl LogManager { let data_size = pad_data.len() / ENTRY_SIZE; if is_full_empty { + pad_list.push(PadPair { + data_size: pad_data.len() as u64, + start_index: tx_start_flow_index, + }); } else { // Update the flow database. // This should be called before `complete_last_chunk_merkle` so that we do not save @@ -982,6 +983,8 @@ impl LogManager { merkle.pora_chunks_merkle.leaves(), merkle.last_chunk_merkle.leaves() ); + + self.flow_store.put_pad_data(&pad_list, tx_seq)?; Ok(()) } diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 99389f1..f869dbf 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -1,6 +1,7 @@ use crate::config::ShardConfig; use ethereum_types::H256; +use flow_store::PadPair; use shared_types::{ Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, @@ -15,7 +16,6 @@ pub mod config; mod flow_store; mod load_chunk; pub mod log_manager; -mod pad_data_store; mod seal_task_manager; #[cfg(test)] mod tests; @@ -159,6 +159,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite { fn update_shard_config(&self, shard_config: ShardConfig); fn submit_seal_result(&self, answers: Vec) -> Result<()>; + + fn start_padding(&self, executor: &task_executor::TaskExecutor); } pub trait LogStoreChunkWrite { @@ -218,6 +220,10 @@ pub trait FlowRead { fn get_num_entries(&self) -> Result; fn get_shard_config(&self) -> ShardConfig; + + fn get_pad_data(&self, start_index: u64) -> Result>>; + + fn get_pad_data_sync_height(&self) -> Result>; } pub trait FlowWrite { @@ -232,6 +238,10 @@ pub trait FlowWrite { /// Update the shard config. fn update_shard_config(&self, shard_config: ShardConfig); + + fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>; + + fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>; } pub struct SealTask { @@ -270,3 +280,23 @@ pub trait FlowSeal { pub trait Flow: FlowRead + FlowWrite + FlowSeal {} impl Flow for T {} + +pub trait PadDataStoreRead { + fn get_pad_data(&self, start_index: u64) -> Result>>; + fn get_pad_data_sync_height(&self) -> Result>; +} + +pub trait PadDataStoreWrite { + fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>; + fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>; + fn start_padding(&mut self, executor: &task_executor::TaskExecutor); +} + +pub trait PadDataStore: + PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static +{ +} +impl + PadDataStore for T +{ +} diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index 50b4cc5..5b9f42c 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -9,15 +9,10 @@ use rand::random; use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE}; use std::cmp; -use task_executor::test_utils::TestRuntime; - #[test] fn test_put_get() { let config = LogConfig::default(); - let runtime = TestRuntime::default(); - - let executor = runtime.task_executor.clone(); - let store = LogManager::memorydb(config.clone(), executor).unwrap(); + let store = LogManager::memorydb(config.clone()).unwrap(); let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1; // Aligned with size. let start_offset = 1024; @@ -174,10 +169,7 @@ fn test_put_tx() { fn create_store() -> LogManager { let config = LogConfig::default(); - let runtime = TestRuntime::default(); - let executor = runtime.task_executor.clone(); - - LogManager::memorydb(config, executor).unwrap() + LogManager::memorydb(config).unwrap() } fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) { diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 876f6d7..6a5561b 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -1657,7 +1657,7 @@ mod tests { let num_chunks = 123; let config = LogConfig::default(); - let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap()); + let store = Arc::new(LogManager::memorydb(config).unwrap()); create_controller(task_executor, peer_id, store, tx_id, num_chunks) } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 3fd12f6..be34080 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1348,9 +1348,7 @@ mod tests { let config = LogConfig::default(); - let executor = runtime.task_executor.clone(); - - let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap()); + let store = Arc::new(LogManager::memorydb(config.clone()).unwrap()); let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id(); let file_location_cache: Arc = diff --git a/node/sync/src/test_util.rs b/node/sync/src/test_util.rs index 0a527a9..d3a0981 100644 --- a/node/sync/src/test_util.rs +++ b/node/sync/src/test_util.rs @@ -9,8 +9,6 @@ use storage::{ LogManager, }; -use task_executor::test_utils::TestRuntime; - /// Creates stores for local node and peers with initialized transaction of specified chunk count. /// The first store is for local node, and data not stored. The second store is for peers, and all /// transactions are finalized for file sync. @@ -24,11 +22,8 @@ pub fn create_2_store( Vec>, ) { let config = LogConfig::default(); - let runtime = TestRuntime::default(); - - let executor = runtime.task_executor.clone(); - let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap(); - let mut peer_store = LogManager::memorydb(config, executor).unwrap(); + let mut store = LogManager::memorydb(config.clone()).unwrap(); + let mut peer_store = LogManager::memorydb(config).unwrap(); let mut offset = 1; let mut txs = vec![]; @@ -120,10 +115,7 @@ pub mod tests { impl TestStoreRuntime { pub fn new_store() -> impl LogStore { - let runtime = TestRuntime::default(); - - let executor = runtime.task_executor.clone(); - LogManager::memorydb(LogConfig::default(), executor).unwrap() + LogManager::memorydb(LogConfig::default()).unwrap() } pub fn new(store: Arc) -> TestStoreRuntime {