diff --git a/Cargo.lock b/Cargo.lock index 5c3a249..64ec1d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7280,6 +7280,7 @@ dependencies = [ "serde_json", "shared_types", "static_assertions", + "task_executor", "tiny-keccak", "tokio", "tracing", diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 551a313..5ed5f71 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -911,7 +911,9 @@ mod tests { let (network_send, network_recv) = mpsc::unbounded_channel(); let (sync_send, sync_recv) = channel::Channel::unbounded("test"); let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel(); - let store = LogManager::memorydb(LogConfig::default()).unwrap(); + + let executor = runtime.task_executor.clone(); + let store = LogManager::memorydb(LogConfig::default(), executor).unwrap(); Self { runtime, network_globals: Arc::new(network_globals), diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 2ff7b0e..4d742e1 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -89,9 +89,10 @@ impl ClientBuilder { /// Initializes in-memory storage. pub fn with_memory_store(mut self) -> Result { + let executor = require!("sync", self, runtime_context).clone().executor; // TODO(zz): Set config. let store = Arc::new( - LogManager::memorydb(LogConfig::default()) + LogManager::memorydb(LogConfig::default(), executor) .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?, ); @@ -109,8 +110,9 @@ impl ClientBuilder { /// Initializes RocksDB storage. pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result { + let executor = require!("sync", self, runtime_context).clone().executor; let store = Arc::new( - LogManager::rocksdb(LogConfig::default(), &config.db_dir) + LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, ); diff --git a/node/storage/Cargo.toml b/node/storage/Cargo.toml index 2c89d6e..9c7ebca 100644 --- a/node/storage/Cargo.toml +++ b/node/storage/Cargo.toml @@ -30,6 +30,7 @@ serde = { version = "1.0.197", features = ["derive"] } parking_lot = "0.12.3" serde_json = "1.0.127" tokio = { version = "1.10.0", features = ["sync"] } +task_executor = { path = "../../common/task_executor" } [dev-dependencies] rand = "0.8.5" diff --git a/node/storage/benches/benchmark.rs b/node/storage/benches/benchmark.rs index 6034f1c..43fce03 100644 --- a/node/storage/benches/benchmark.rs +++ b/node/storage/benches/benchmark.rs @@ -14,14 +14,18 @@ use storage::{ }, LogManager, }; +use task_executor::test_utils::TestRuntime; fn write_performance(c: &mut Criterion) { if Path::new("db_write").exists() { fs::remove_dir_all("db_write").unwrap(); } + let runtime = TestRuntime::default(); + + let executor = runtime.task_executor.clone(); let store: Arc> = Arc::new(RwLock::new( - LogManager::rocksdb(LogConfig::default(), "db_write") + LogManager::rocksdb(LogConfig::default(), "db_write", executor) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) .unwrap(), )); @@ -105,8 +109,12 @@ fn read_performance(c: &mut Criterion) { fs::remove_dir_all("db_read").unwrap(); } + let runtime = TestRuntime::default(); + + let executor = runtime.task_executor.clone(); + let store: Arc> = Arc::new(RwLock::new( - LogManager::rocksdb(LogConfig::default(), "db_read") + LogManager::rocksdb(LogConfig::default(), "db_read", executor) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e)) .unwrap(), )); diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 9c75e58..79e14c1 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -23,7 +23,6 @@ use std::collections::BTreeMap; use std::path::Path; use std::sync::mpsc; use std::sync::Arc; -use std::thread; use tracing::{debug, error, info, instrument, trace, warn}; use super::tx_store::BlockHashAndSubmissionIndex; @@ -606,19 +605,27 @@ impl LogStoreRead for LogManager { } impl LogManager { - pub fn rocksdb(config: LogConfig, path: impl AsRef) -> Result { + pub fn rocksdb( + config: LogConfig, + path: impl AsRef, + executor: task_executor::TaskExecutor, + ) -> Result { let mut db_config = DatabaseConfig::with_columns(COL_NUM); db_config.enable_statistics = true; let db = Arc::new(Database::open(&db_config, path)?); - Self::new(db, config) + Self::new(db, config, executor) } - pub fn memorydb(config: LogConfig) -> Result { + pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result { let db = Arc::new(kvdb_memorydb::create(COL_NUM)); - Self::new(db, config) + Self::new(db, config, executor) } - fn new(db: Arc, config: LogConfig) -> Result { + fn new( + db: Arc, + config: LogConfig, + executor: task_executor::TaskExecutor, + ) -> Result { let tx_store = TransactionStore::new(db.clone())?; let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow)); let mut initial_data = flow_store.get_chunk_root_list()?; @@ -735,7 +742,7 @@ impl LogManager { sender, }; - log_manager.start_receiver(receiver); + log_manager.start_receiver(receiver, executor); if let Some(tx) = last_tx_to_insert { log_manager.revert_to(tx.seq - 1)?; @@ -758,32 +765,39 @@ impl LogManager { Ok(log_manager) } - fn start_receiver(&mut self, rx: mpsc::Receiver) { + fn start_receiver( + &mut self, + rx: mpsc::Receiver, + executor: task_executor::TaskExecutor, + ) { let flow_store = self.flow_store.clone(); - let handle = thread::spawn(move || -> Result<(), anyhow::Error> { - loop { - match rx.recv() { - std::result::Result::Ok(data) => { - // Update the root index. - flow_store.put_batch_root_list(data.root_map).unwrap(); - // Update the flow database. - // This should be called before `complete_last_chunk_merkle` so that we do not save - // subtrees with data known. - flow_store - .append_entries(ChunkArray { - data: vec![0; data.pad_data], - start_index: data.tx_start_flow_index, - }) - .unwrap(); - } - std::result::Result::Err(_) => { - bail!("Receiver error"); - } + executor.spawn( + async move { + loop { + match rx.recv() { + std::result::Result::Ok(data) => { + // Update the root index. + flow_store.put_batch_root_list(data.root_map).unwrap(); + // Update the flow database. + // This should be called before `complete_last_chunk_merkle` so that we do not save + // subtrees with data known. + flow_store + .append_entries(ChunkArray { + data: vec![0; data.pad_data], + start_index: data.tx_start_flow_index, + }) + .unwrap(); + } + std::result::Result::Err(_) => { + error!("Receiver error"); + } + }; } - } - }); + }, + "pad_tx", + ); // Wait for the spawned thread to finish - let _ = handle.join().expect("Thread panicked"); + // let _ = handle.join().expect("Thread panicked"); } fn gen_proof(&self, flow_index: u64, maybe_root: Option) -> Result { diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index 026e78c..d56613d 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -8,11 +8,15 @@ use ethereum_types::H256; use rand::random; use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE}; use std::cmp; +use task_executor::test_utils::TestRuntime; #[test] fn test_put_get() { let config = LogConfig::default(); - let store = LogManager::memorydb(config.clone()).unwrap(); + let runtime = TestRuntime::default(); + + let executor = runtime.task_executor.clone(); + let store = LogManager::memorydb(config.clone(), executor).unwrap(); let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1; // Aligned with size. let start_offset = 1024; @@ -169,8 +173,10 @@ fn test_put_tx() { fn create_store() -> LogManager { let config = LogConfig::default(); + let runtime = TestRuntime::default(); + let executor = runtime.task_executor.clone(); - LogManager::memorydb(config).unwrap() + LogManager::memorydb(config, executor).unwrap() } fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) { diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 485e6cf..dcf724a 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -1622,7 +1622,7 @@ mod tests { let num_chunks = 123; let config = LogConfig::default(); - let store = Arc::new(LogManager::memorydb(config).unwrap()); + let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap()); create_controller(task_executor, peer_id, store, tx_id, num_chunks) } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 4132433..7efba4e 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -1294,7 +1294,9 @@ mod tests { let config = LogConfig::default(); - let store = Arc::new(LogManager::memorydb(config.clone()).unwrap()); + let executor = runtime.task_executor.clone(); + + let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap()); let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id(); let file_location_cache: Arc = diff --git a/node/sync/src/test_util.rs b/node/sync/src/test_util.rs index d3a0981..0a527a9 100644 --- a/node/sync/src/test_util.rs +++ b/node/sync/src/test_util.rs @@ -9,6 +9,8 @@ use storage::{ LogManager, }; +use task_executor::test_utils::TestRuntime; + /// Creates stores for local node and peers with initialized transaction of specified chunk count. /// The first store is for local node, and data not stored. The second store is for peers, and all /// transactions are finalized for file sync. @@ -22,8 +24,11 @@ pub fn create_2_store( Vec>, ) { let config = LogConfig::default(); - let mut store = LogManager::memorydb(config.clone()).unwrap(); - let mut peer_store = LogManager::memorydb(config).unwrap(); + let runtime = TestRuntime::default(); + + let executor = runtime.task_executor.clone(); + let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap(); + let mut peer_store = LogManager::memorydb(config, executor).unwrap(); let mut offset = 1; let mut txs = vec![]; @@ -115,7 +120,10 @@ pub mod tests { impl TestStoreRuntime { pub fn new_store() -> impl LogStore { - LogManager::memorydb(LogConfig::default()).unwrap() + let runtime = TestRuntime::default(); + + let executor = runtime.task_executor.clone(); + LogManager::memorydb(LogConfig::default(), executor).unwrap() } pub fn new(store: Arc) -> TestStoreRuntime {