diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index a3ead52..a1c6dbc 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -122,7 +122,7 @@ impl LogEntryFetcher { pub fn start_remove_finalized_block_task( &self, executor: &TaskExecutor, - store: Arc>, + store: Arc, block_hash_cache: Arc>>>, default_finalized_block_count: u64, remove_finalized_block_interval_minutes: u64, @@ -133,7 +133,7 @@ impl LogEntryFetcher { loop { debug!("processing finalized block"); - let processed_block_number = match store.read().await.get_sync_progress() { + let processed_block_number = match store.get_sync_progress() { Ok(Some((processed_block_number, _))) => Some(processed_block_number), Ok(None) => None, Err(e) => { @@ -176,9 +176,7 @@ impl LogEntryFetcher { } for key in pending_keys.into_iter() { - if let Err(e) = - store.write().await.delete_block_hash_by_number(key) - { + if let Err(e) = store.delete_block_hash_by_number(key) { error!( "remove block tx for number {} error: e={:?}", key, e diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index ebcbc85..c074d92 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -33,7 +33,7 @@ pub enum LogSyncEvent { pub struct LogSyncManager { config: LogSyncConfig, log_fetcher: LogEntryFetcher, - store: Arc>, + store: Arc, data_cache: DataCache, next_tx_seq: u64, @@ -48,9 +48,9 @@ impl LogSyncManager { pub async fn spawn( config: LogSyncConfig, executor: TaskExecutor, - store: Arc>, + store: Arc, ) -> Result> { - let next_tx_seq = store.read().await.next_tx_seq(); + let next_tx_seq = store.next_tx_seq(); let executor_clone = executor.clone(); let mut shutdown_sender = executor.shutdown_sender(); @@ -81,8 +81,6 @@ impl LogSyncManager { let block_hash_cache = Arc::new(RwLock::new( store - .read() - .await .get_block_hashes()? .into_iter() .map(|(x, y)| (x, Some(y))) @@ -124,7 +122,7 @@ impl LogSyncManager { // TODO(zz): Handle reorg instead of return. let mut need_handle_reorg = false; let (mut start_block_number, mut start_block_hash) = - match log_sync_manager.store.read().await.get_sync_progress()? { + match log_sync_manager.store.get_sync_progress()? { // No previous progress, so just use config. None => { let block_number = log_sync_manager.config.start_block_number; @@ -176,7 +174,7 @@ impl LogSyncManager { log_sync_manager.handle_data(reorg_rx).await?; if let Some((block_number, block_hash)) = - log_sync_manager.store.read().await.get_sync_progress()? + log_sync_manager.store.get_sync_progress()? { start_block_number = block_number; start_block_hash = block_hash; @@ -301,7 +299,7 @@ impl LogSyncManager { async fn process_reverted(&mut self, tx_seq: u64) { warn!("revert for chain reorg: seq={}", tx_seq); { - let store = self.store.read().await; + let store = self.store.clone(); for seq in tx_seq..self.next_tx_seq { if matches!(store.check_tx_completed(seq), Ok(true)) { if let Ok(Some(tx)) = store.get_tx_by_seq_number(seq) { @@ -325,7 +323,7 @@ impl LogSyncManager { let _ = self.event_send.send(LogSyncEvent::ReorgDetected { tx_seq }); // TODO(zz): `wrapping_sub` here is a hack to handle the case of tx_seq=0. - if let Err(e) = self.store.write().await.revert_to(tx_seq.wrapping_sub(1)) { + if let Err(e) = self.store.revert_to(tx_seq.wrapping_sub(1)) { error!("revert_to fails: e={:?}", e); return; } @@ -353,7 +351,7 @@ impl LogSyncManager { ); } - self.store.write().await.put_sync_progress(( + self.store.put_sync_progress(( block_number, block_hash, first_submission_index, @@ -396,12 +394,12 @@ impl LogSyncManager { } async fn put_tx_inner(&mut self, tx: Transaction) -> bool { - if let Err(e) = self.store.write().await.put_tx(tx.clone()) { + if let Err(e) = self.store.put_tx(tx.clone()) { error!("put_tx error: e={:?}", e); false } else { if let Some(data) = self.data_cache.pop_data(&tx.data_merkle_root) { - let store = self.store.write().await; + let store = self.store.clone(); // We are holding a mutable reference of LogSyncManager, so no chain reorg is // possible after put_tx. if let Err(e) = store diff --git a/node/miner/src/loader.rs b/node/miner/src/loader.rs index 92b7397..e8952d4 100644 --- a/node/miner/src/loader.rs +++ b/node/miner/src/loader.rs @@ -9,10 +9,9 @@ pub trait PoraLoader: Send + Sync { } #[async_trait] -impl PoraLoader for Arc> { +impl PoraLoader for Arc { async fn load_sealed_data(&self, chunk_index: u64) -> Option { - let store = &*self.read().await; - match store.flow().load_sealed_data(chunk_index) { + match self.flow().load_sealed_data(chunk_index) { Ok(Some(chunk)) => Some(chunk), _ => None, } diff --git a/node/miner/src/miner_id.rs b/node/miner/src/miner_id.rs index 7b50dcc..20a1379 100644 --- a/node/miner/src/miner_id.rs +++ b/node/miner/src/miner_id.rs @@ -21,11 +21,11 @@ fn set_miner_id(store: &dyn Store, miner_id: &H256) -> storage::error::Result<() pub(crate) async fn check_and_request_miner_id( config: &MinerConfig, - store: &RwLock, + store: &dyn Store, provider: &Arc, ) -> Result { - let db_miner_id = load_miner_id(&*store.read().await) - .map_err(|e| format!("miner_id on db corrupt: {:?}", e))?; + let db_miner_id = + load_miner_id(store).map_err(|e| format!("miner_id on db corrupt: {:?}", e))?; let mine_contract = PoraMine::new(config.mine_address, provider.clone()); @@ -42,7 +42,7 @@ pub(crate) async fn check_and_request_miner_id( } (None, Some(c_id)) => { check_miner_id(&mine_contract, c_id).await?; - set_miner_id(&*store.write().await, &c_id) + set_miner_id(store, &c_id) .map_err(|e| format!("set miner id on db corrupt: {:?}", e))?; Ok(c_id) } @@ -53,8 +53,7 @@ pub(crate) async fn check_and_request_miner_id( (None, None) => { let beneficiary = provider.address(); let id = request_miner_id(&mine_contract, beneficiary).await?; - set_miner_id(&*store.write().await, &id) - .map_err(|e| format!("set miner id on db corrupt: {:?}", e))?; + set_miner_id(store, &id).map_err(|e| format!("set miner id on db corrupt: {:?}", e))?; Ok(id) } } diff --git a/node/miner/src/sealer.rs b/node/miner/src/sealer.rs index 59390d4..0500f3c 100644 --- a/node/miner/src/sealer.rs +++ b/node/miner/src/sealer.rs @@ -22,7 +22,7 @@ const CHAIN_STATUS_QUERY_PERIOD: u64 = 5; pub struct Sealer { flow_contract: ZgsFlow, - store: Arc>, + store: Arc, context_cache: BTreeMap, last_context_flow_length: u64, miner_id: H256, @@ -32,7 +32,7 @@ impl Sealer { pub fn spawn( executor: TaskExecutor, provider: Arc, - store: Arc>, + store: Arc, config: &MinerConfig, miner_id: H256, ) { @@ -152,19 +152,11 @@ impl Sealer { async fn fetch_task(&self) -> Result>> { let seal_index_max = self.last_context_flow_length as usize / SECTORS_PER_SEAL; - self.store - .read() - .await - .flow() - .pull_seal_chunk(seal_index_max) + self.store.flow().pull_seal_chunk(seal_index_max) } async fn submit_answer(&self, answers: Vec) -> Result<()> { - self.store - .write() - .await - .flow_mut() - .submit_seal_result(answers) + self.store.flow().submit_seal_result(answers) } async fn seal_iteration(&mut self) -> Result { diff --git a/node/miner/src/service.rs b/node/miner/src/service.rs index d31ad88..b96a143 100644 --- a/node/miner/src/service.rs +++ b/node/miner/src/service.rs @@ -29,13 +29,13 @@ impl MineService { executor: task_executor::TaskExecutor, _network_send: mpsc::UnboundedSender, config: MinerConfig, - store: Arc>, + store: Arc, ) -> Result, String> { let provider = Arc::new(config.make_provider().await?); let (msg_send, msg_recv) = broadcast::channel(1024); - let miner_id = check_and_request_miner_id(&config, &store, &provider).await?; + let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?; debug!("miner id setting complete."); let mine_context_receiver = MineContextWatcher::spawn( diff --git a/node/miner/src/submitter.rs b/node/miner/src/submitter.rs index ad76b5c..753fb8f 100644 --- a/node/miner/src/submitter.rs +++ b/node/miner/src/submitter.rs @@ -22,7 +22,7 @@ pub struct Submitter { mine_contract: PoraMine, flow_contract: ZgsFlow, default_gas_limit: Option, - store: Arc>, + store: Arc, } impl Submitter { @@ -30,7 +30,7 @@ impl Submitter { executor: TaskExecutor, mine_answer_receiver: mpsc::UnboundedReceiver, provider: Arc, - store: Arc>, + store: Arc, config: &MinerConfig, ) { let mine_contract = PoraMine::new(config.mine_address, provider.clone()); @@ -80,8 +80,6 @@ impl Submitter { let flow_proof = self .store - .read() - .await .get_proof_at_root( &mine_answer.context_flow_root, mine_answer.recall_position, diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index 8e1cf0d..914443a 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -32,7 +32,7 @@ impl PrunerConfig { pub struct Pruner { config: PrunerConfig, - store: Arc>, + store: Arc, sender: mpsc::UnboundedSender, miner_sender: Option>, @@ -42,10 +42,10 @@ impl Pruner { pub async fn spawn( executor: TaskExecutor, mut config: PrunerConfig, - store: Arc>, + store: Arc, miner_sender: Option>, ) -> Result> { - if let Some(shard_config) = get_shard_config(&store).await? { + if let Some(shard_config) = get_shard_config(store.as_ref()).await? { config.shard_config = shard_config; } let (tx, rx) = mpsc::unbounded_channel(); @@ -76,7 +76,7 @@ impl Pruner { batch.push(index); if batch.len() == self.config.batch_size || iter.peek().is_none() { debug!(start = batch.first(), end = batch.last(), "prune batch"); - self.store.write().await.remove_chunks_batch(&batch)?; + self.store.remove_chunks_batch(&batch)?; batch = Vec::with_capacity(self.config.batch_size); tokio::time::sleep(self.config.batch_wait_time).await; } @@ -87,7 +87,7 @@ impl Pruner { } async fn maybe_update(&mut self) -> Result>>> { - let current_size = self.store.read().await.flow().get_num_entries()?; + let current_size = self.store.flow().get_num_entries()?; debug!( current_size = current_size, config = ?self.config.shard_config, @@ -109,7 +109,7 @@ impl Pruner { config.num_shard *= 2; // Generate delete list - let flow_len = self.store.read().await.get_context()?.1; + let flow_len = self.store.get_context()?.1; let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard; return Ok(Some(Box::new( (start_index as u64..flow_len).step_by(config.num_shard), @@ -124,16 +124,16 @@ impl Pruner { } self.sender .send(PrunerMessage::ChangeShardConfig(self.config.shard_config))?; - let mut store = self.store.write().await; - store - .flow_mut() + self.store + .flow() .update_shard_config(self.config.shard_config); - store.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) + self.store + .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config) } } -async fn get_shard_config(store: &RwLock) -> Result> { - store.read().await.get_config_decoded(&SHARD_CONFIG_KEY) +async fn get_shard_config(store: &dyn Store) -> Result> { + store.get_config_decoded(&SHARD_CONFIG_KEY) } #[derive(Debug)] diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 678578d..002dd1e 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -269,13 +269,7 @@ impl Libp2pEventHandler { }; let timestamp = timestamp_now(); - let shard_config = self - .store - .get_store() - .read() - .await - .flow() - .get_shard_config(); + let shard_config = self.store.get_store().flow().get_shard_config(); let msg = AnnounceFile { tx_id, @@ -598,7 +592,7 @@ mod tests { sync_recv: SyncReceiver, chunk_pool_send: mpsc::UnboundedSender, chunk_pool_recv: mpsc::UnboundedReceiver, - store: Arc>, + store: Arc, file_location_cache: Arc, peers: Arc>, } @@ -621,7 +615,7 @@ mod tests { sync_recv, chunk_pool_send, chunk_pool_recv, - store: Arc::new(RwLock::new(store)), + store: Arc::new(store), file_location_cache: Arc::new(FileLocationCache::default()), peers: Arc::new(RwLock::new(PeerManager::new(Config::default()))), } diff --git a/node/router/src/service.rs b/node/router/src/service.rs index 0d5f4b8..f1590a2 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -58,7 +58,7 @@ impl RouterService { _miner_send: Option>, chunk_pool_send: UnboundedSender, pruner_recv: Option>, - store: Arc>, + store: Arc, file_location_cache: Arc, local_keypair: Keypair, config: Config, diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 46d7b6d..36ba47e 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -23,8 +23,6 @@ impl RpcServer for RpcServerImpl { .ctx .log_store .get_store() - .read() - .await .get_sync_progress()? .unwrap_or_default(); @@ -156,14 +154,7 @@ impl RpcServer for RpcServerImpl { async fn get_shard_config(&self) -> RpcResult { debug!("zgs_getShardConfig"); - let shard_config = self - .ctx - .log_store - .get_store() - .read() - .await - .flow() - .get_shard_config(); + let shard_config = self.ctx.log_store.get_store().flow().get_shard_config(); Ok(shard_config) } } diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 3933557..c50df97 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -68,7 +68,7 @@ struct ChunkPoolComponents { #[derive(Default)] pub struct ClientBuilder { runtime_context: Option, - store: Option>>, + store: Option>, async_store: Option, file_location_cache: Option>, network: Option, @@ -89,10 +89,10 @@ impl ClientBuilder { /// Initializes in-memory storage. pub fn with_memory_store(mut self) -> Result { // TODO(zz): Set config. - let store = Arc::new(RwLock::new( + let store = Arc::new( LogManager::memorydb(LogConfig::default()) .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?, - )); + ); self.store = Some(store.clone()); @@ -105,10 +105,10 @@ impl ClientBuilder { /// Initializes RocksDB storage. pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result { - let store = Arc::new(RwLock::new( + let store = Arc::new( LogManager::rocksdb(LogConfig::default(), &config.db_dir) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, - )); + ); self.store = Some(store.clone()); diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 012d819..81eab35 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -28,14 +28,14 @@ macro_rules! delegate { #[derive(Clone)] pub struct Store { /// Log and transaction storage. - store: Arc>, + store: Arc, /// Tokio executor for spawning worker tasks. executor: TaskExecutor, } impl Store { - pub fn new(store: Arc>, executor: TaskExecutor) -> Self { + pub fn new(store: Arc, executor: TaskExecutor) -> Self { Store { store, executor } } @@ -64,7 +64,7 @@ impl Store { async fn spawn(&self, f: F) -> Result where - F: FnOnce(&mut dyn LogStore) -> Result + Send + 'static, + F: FnOnce(&dyn LogStore) -> Result + Send + 'static, T: Send + 'static, { let store = self.store.clone(); @@ -73,7 +73,7 @@ impl Store { self.executor.spawn( async move { // FIXME(zz): Not all functions need `write`. Refactor store usage. - let res = f(&mut *store.write().await); + let res = f(&*store); if tx.send(res).is_err() { error!("Unable to complete async storage operation: the receiver dropped"); @@ -87,7 +87,7 @@ impl Store { } // FIXME(zz): Refactor the lock and async call here. - pub fn get_store(&self) -> &RwLock { + pub fn get_store(&self) -> &dyn LogStore { self.store.as_ref() } } diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 7566078..a1c581a 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -87,7 +87,7 @@ impl FlowStore { #[derive(Clone, Debug)] pub struct FlowConfig { pub batch_size: usize, - pub shard_config: ShardConfig, + pub shard_config: Arc>, } impl Default for FlowConfig { @@ -208,7 +208,7 @@ impl FlowRead for FlowStore { } fn get_shard_config(&self) -> ShardConfig { - self.config.shard_config + self.config.shard_config.read().clone() } } @@ -233,7 +233,7 @@ impl FlowWrite for FlowStore { .expect("in range"); let chunk_index = chunk.start_index / self.config.batch_size as u64; - if !self.config.shard_config.in_range(chunk_index) { + if !self.config.shard_config.read().in_range(chunk_index) { // The data are in a shard range that we are not storing. continue; } @@ -273,8 +273,8 @@ impl FlowWrite for FlowStore { Ok(()) } - fn update_shard_config(&mut self, shard_config: ShardConfig) { - self.config.shard_config = shard_config; + fn update_shard_config(&self, shard_config: ShardConfig) { + *self.config.shard_config.write() = shard_config; } } @@ -313,7 +313,7 @@ impl FlowSeal for FlowStore { Ok(Some(tasks)) } - fn submit_seal_result(&mut self, answers: Vec) -> Result<()> { + fn submit_seal_result(&self, answers: Vec) -> Result<()> { let mut to_seal_set = self.to_seal_set.write(); let is_consistent = |answer: &SealAnswer| { to_seal_set diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 4e0f391..e903cc6 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -213,7 +213,7 @@ pub trait FlowWrite { fn truncate(&self, start_index: u64) -> Result<()>; /// Update the shard config. - fn update_shard_config(&mut self, shard_config: ShardConfig); + fn update_shard_config(&self, shard_config: ShardConfig); } pub struct SealTask { @@ -247,7 +247,7 @@ pub trait FlowSeal { /// Submit sealing result - fn submit_seal_result(&mut self, answers: Vec) -> Result<()>; + fn submit_seal_result(&self, answers: Vec) -> Result<()>; } pub trait Flow: FlowRead + FlowWrite + FlowSeal {} diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index 7fa8bb5..4797cca 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -29,7 +29,7 @@ impl SyncStore { } pub async fn get_tx_seq_range(&self) -> Result<(Option, Option)> { - let store = self.store.get_store().read().await; + let store = self.store.get_store(); // load next_tx_seq let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?; @@ -43,8 +43,6 @@ impl SyncStore { pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> { self.store .get_store() - .write() - .await .set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq) } @@ -52,13 +50,11 @@ impl SyncStore { debug!(%tx_seq, "set_max_tx_seq"); self.store .get_store() - .write() - .await .set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq) } pub async fn add_pending_tx(&self, tx_seq: u64) -> Result { - let store = self.store.get_store().write().await; + let store = self.store.get_store(); // already in ready queue if self.ready_txs.has(store.deref(), tx_seq)? { @@ -70,7 +66,7 @@ impl SyncStore { } pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result { - let store = self.store.get_store().write().await; + let store = self.store.get_store(); let mut tx = ConfigTx::default(); @@ -91,7 +87,7 @@ impl SyncStore { } pub async fn downgrade_tx_to_pending(&self, tx_seq: u64) -> Result { - let store = self.store.get_store().write().await; + let store = self.store.get_store(); let mut tx = ConfigTx::default(); @@ -112,7 +108,7 @@ impl SyncStore { } pub async fn random_tx(&self) -> Result> { - let store = self.store.get_store().read().await; + let store = self.store.get_store(); // try to find a tx in ready queue with high priority if let Some(val) = self.ready_txs.random(store.deref())? { @@ -124,7 +120,7 @@ impl SyncStore { } pub async fn remove_tx(&self, tx_seq: u64) -> Result { - let store = self.store.get_store().write().await; + let store = self.store.get_store(); // removed in ready queue if self.ready_txs.remove(store.deref(), None, tx_seq)? { diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index 9e5cc9d..b003bee 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -424,8 +424,6 @@ impl SerialSyncController { let validation_result = self .store .get_store() - .write() - .await .validate_and_insert_range_proof(self.tx_seq, &response); match validation_result { @@ -447,13 +445,7 @@ impl SerialSyncController { self.failures = 0; - let shard_config = self - .store - .get_store() - .read() - .await - .flow() - .get_shard_config(); + let shard_config = self.store.get_store().flow().get_shard_config(); let next_chunk = shard_config.next_segment_index( (from_chunk / PORA_CHUNK_SIZE as u64) as usize, (self.tx_start_chunk_in_flow / PORA_CHUNK_SIZE as u64) as usize, @@ -1107,8 +1099,6 @@ mod tests { ); let chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count) .unwrap() .unwrap(); @@ -1141,8 +1131,6 @@ mod tests { ); let mut chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count) .unwrap() .unwrap(); @@ -1210,8 +1198,6 @@ mod tests { ); let chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count) .unwrap() .unwrap(); @@ -1276,8 +1262,6 @@ mod tests { ); let chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count) .unwrap() .unwrap(); @@ -1350,8 +1334,6 @@ mod tests { ); let chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count) .unwrap() .unwrap(); @@ -1395,8 +1377,6 @@ mod tests { ); let chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, 1024) .unwrap() .unwrap(); @@ -1575,7 +1555,7 @@ mod tests { fn create_controller( task_executor: TaskExecutor, peer_id: Option, - store: Arc>, + store: Arc, tx_id: TxID, num_chunks: usize, ) -> (SerialSyncController, UnboundedReceiver) { diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index 148d2b0..b6bd198 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -129,7 +129,7 @@ impl SyncService { pub async fn spawn( executor: task_executor::TaskExecutor, network_send: mpsc::UnboundedSender, - store: Arc>, + store: Arc, file_location_cache: Arc, event_recv: broadcast::Receiver, ) -> Result { @@ -148,7 +148,7 @@ impl SyncService { config: Config, executor: task_executor::TaskExecutor, network_send: mpsc::UnboundedSender, - store: Arc>, + store: Arc, file_location_cache: Arc, event_recv: broadcast::Receiver, ) -> Result { @@ -723,8 +723,8 @@ mod tests { runtime: TestRuntime, chunk_count: usize, - store: Arc>, - peer_store: Arc>, + store: Arc, + peer_store: Arc, txs: Vec, init_data: Vec, @@ -913,8 +913,6 @@ mod tests { runtime .peer_store - .read() - .await .validate_range_proof(0, &response) .expect("validate proof"); } @@ -1186,10 +1184,7 @@ mod tests { .unwrap(); thread::sleep(Duration::from_millis(1000)); - assert_eq!( - store.read().await.get_tx_by_seq_number(tx_seq).unwrap(), - None - ); + assert_eq!(store.get_tx_by_seq_number(tx_seq).unwrap(), None); assert!(network_recv.try_recv().is_err()); } @@ -1205,18 +1200,13 @@ mod tests { .unwrap(); thread::sleep(Duration::from_millis(1000)); - assert!(runtime - .peer_store - .read() - .await - .check_tx_completed(tx_seq) - .unwrap()); + assert!(runtime.peer_store.check_tx_completed(tx_seq).unwrap()); assert!(runtime.network_recv.try_recv().is_err()); } - async fn wait_for_tx_finalized(store: Arc>, tx_seq: u64) { + async fn wait_for_tx_finalized(store: Arc, tx_seq: u64) { let deadline = Instant::now() + Duration::from_millis(5000); - while !store.read().await.check_tx_completed(tx_seq).unwrap() { + while !store.check_tx_completed(tx_seq).unwrap() { if Instant::now() >= deadline { panic!("Failed to wait tx completed"); } @@ -1238,12 +1228,7 @@ mod tests { receive_dial(&mut runtime, &sync_send).await; - assert!(!runtime - .store - .read() - .await - .check_tx_completed(tx_seq) - .unwrap()); + assert!(!runtime.store.check_tx_completed(tx_seq).unwrap()); assert!(!matches!( sync_send @@ -1313,12 +1298,7 @@ mod tests { receive_dial(&mut runtime, &sync_send).await; - assert!(!runtime - .store - .read() - .await - .check_tx_completed(tx_seq) - .unwrap()); + assert!(!runtime.store.check_tx_completed(tx_seq).unwrap()); receive_chunk_request( &mut runtime.network_recv, @@ -1368,13 +1348,8 @@ mod tests { receive_dial(&mut runtime, &sync_send).await; - assert!(!runtime - .store - .read() - .await - .check_tx_completed(tx_seq) - .unwrap()); - assert!(!runtime.store.read().await.check_tx_completed(0).unwrap()); + assert!(!runtime.store.check_tx_completed(tx_seq).unwrap()); + assert!(!runtime.store.check_tx_completed(0).unwrap()); receive_chunk_request( &mut runtime.network_recv, @@ -1460,12 +1435,7 @@ mod tests { receive_dial(&mut runtime, &sync_send).await; - assert!(!runtime - .store - .read() - .await - .check_tx_completed(tx_seq) - .unwrap()); + assert!(!runtime.store.check_tx_completed(tx_seq).unwrap()); receive_chunk_request( &mut runtime.network_recv, @@ -1503,12 +1473,7 @@ mod tests { receive_dial(&mut runtime, &sync_send).await; - assert!(!runtime - .store - .read() - .await - .check_tx_completed(tx_seq) - .unwrap()); + assert!(!runtime.store.check_tx_completed(tx_seq).unwrap()); receive_chunk_request( &mut runtime.network_recv, @@ -1589,12 +1554,7 @@ mod tests { receive_dial(&mut runtime, &sync_send).await; - assert!(!runtime - .store - .read() - .await - .check_tx_completed(tx_seq) - .unwrap()); + assert!(!runtime.store.check_tx_completed(tx_seq).unwrap()); assert!(!matches!( sync_send @@ -1620,7 +1580,7 @@ mod tests { async fn receive_chunk_request( network_recv: &mut UnboundedReceiver, sync_send: &SyncSender, - peer_store: Arc>, + peer_store: Arc, init_peer_id: PeerId, tx_seq: u64, index_start: u64, @@ -1654,8 +1614,6 @@ mod tests { }; let chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range( tx_seq, req.index_start as usize, diff --git a/node/sync/src/test_util.rs b/node/sync/src/test_util.rs index 4fd04f5..ce0fdaa 100644 --- a/node/sync/src/test_util.rs +++ b/node/sync/src/test_util.rs @@ -17,8 +17,8 @@ use tokio::sync::RwLock; pub fn create_2_store( chunk_count: Vec, ) -> ( - Arc>, - Arc>, + Arc, + Arc, Vec, Vec>, ) { @@ -37,12 +37,7 @@ pub fn create_2_store( offset = ret.2; } - ( - Arc::new(RwLock::new(store)), - Arc::new(RwLock::new(peer_store)), - txs, - data, - ) + (Arc::new(store), Arc::new(peer_store), txs, data) } fn generate_data( @@ -115,7 +110,7 @@ pub mod tests { impl Default for TestStoreRuntime { fn default() -> Self { - let store = Arc::new(RwLock::new(Self::new_store())); + let store = Arc::new(Self::new_store()); Self::new(store) } } @@ -125,7 +120,7 @@ pub mod tests { LogManager::memorydb(LogConfig::default()).unwrap() } - pub fn new(store: Arc>) -> TestStoreRuntime { + pub fn new(store: Arc) -> TestStoreRuntime { let runtime = TestRuntime::default(); let executor = runtime.task_executor.clone(); Self {