From 8f37a4a594d9f13cc648dec75a137291c9cbbabd Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Wed, 19 Jun 2024 17:07:28 +0800 Subject: [PATCH] Fix tests and warnings. --- node/miner/src/loader.rs | 2 +- node/miner/src/miner_id.rs | 2 +- node/miner/src/sealer.rs | 1 - node/miner/src/service.rs | 2 +- node/miner/src/submitter.rs | 2 +- node/pruner/src/lib.rs | 2 +- node/src/client/builder.rs | 2 +- node/storage-async/src/lib.rs | 2 +- node/storage/src/log_store/tests.rs | 2 +- node/sync/src/auto_sync/sync_store.rs | 22 +++++++++++----------- node/sync/src/controllers/serial.rs | 6 ++---- node/sync/src/service.rs | 6 +++--- node/sync/src/test_util.rs | 4 ++-- 13 files changed, 26 insertions(+), 29 deletions(-) diff --git a/node/miner/src/loader.rs b/node/miner/src/loader.rs index e8952d4..afb2043 100644 --- a/node/miner/src/loader.rs +++ b/node/miner/src/loader.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use std::sync::Arc; use storage::log_store::{MineLoadChunk, Store}; -use tokio::sync::RwLock; + #[async_trait] pub trait PoraLoader: Send + Sync { diff --git a/node/miner/src/miner_id.rs b/node/miner/src/miner_id.rs index 20a1379..59d265d 100644 --- a/node/miner/src/miner_id.rs +++ b/node/miner/src/miner_id.rs @@ -7,7 +7,7 @@ use ethers::contract::EthEvent; use std::sync::Arc; use storage::log_store::{config::ConfigurableExt, Store}; use storage::H256; -use tokio::sync::RwLock; + const MINER_ID: &str = "mine.miner_id"; diff --git a/node/miner/src/sealer.rs b/node/miner/src/sealer.rs index 0500f3c..2ffc43c 100644 --- a/node/miner/src/sealer.rs +++ b/node/miner/src/sealer.rs @@ -2,7 +2,6 @@ use std::{collections::BTreeMap, sync::Arc}; use ethereum_types::H256; use tokio::{ - sync::RwLock, time::{sleep, Duration, Instant}, }; diff --git a/node/miner/src/service.rs b/node/miner/src/service.rs index b96a143..7d34101 100644 --- a/node/miner/src/service.rs +++ b/node/miner/src/service.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use storage::config::ShardConfig; use storage::log_store::Store; use tokio::sync::mpsc; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::{broadcast}; #[derive(Clone, Debug)] pub enum MinerMessage { diff --git a/node/miner/src/submitter.rs b/node/miner/src/submitter.rs index 753fb8f..bf7b79e 100644 --- a/node/miner/src/submitter.rs +++ b/node/miner/src/submitter.rs @@ -8,7 +8,7 @@ use shared_types::FlowRangeProof; use std::sync::Arc; use storage::log_store::Store; use task_executor::TaskExecutor; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc}; use crate::config::{MineServiceMiddleware, MinerConfig}; use crate::pora::AnswerWithoutProof; diff --git a/node/pruner/src/lib.rs b/node/pruner/src/lib.rs index 914443a..f460288 100644 --- a/node/pruner/src/lib.rs +++ b/node/pruner/src/lib.rs @@ -8,7 +8,7 @@ use storage::config::{ShardConfig, SHARD_CONFIG_KEY}; use storage::log_store::config::ConfigurableExt; use storage::log_store::Store; use task_executor::TaskExecutor; -use tokio::sync::{broadcast, mpsc, RwLock}; +use tokio::sync::{broadcast, mpsc}; use tracing::debug; // Start pruning when the db directory size exceeds 0.9 * limit. diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index c50df97..70563d7 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -15,7 +15,7 @@ use storage::log_store::log_manager::LogConfig; use storage::log_store::Store; use storage::{LogManager, StorageConfig}; use sync::{SyncSender, SyncService}; -use tokio::sync::{broadcast, mpsc, RwLock}; +use tokio::sync::{broadcast, mpsc}; macro_rules! require { ($component:expr, $self:ident, $e:ident) => { diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 81eab35..a6884d5 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -6,7 +6,7 @@ use shared_types::{Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, use std::sync::Arc; use storage::{error, error::Result, log_store::Store as LogStore, H256}; use task_executor::TaskExecutor; -use tokio::sync::{oneshot, RwLock}; +use tokio::sync::{oneshot}; pub use storage::config::ShardConfig; diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index 6928540..741b172 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -12,7 +12,7 @@ use std::cmp; #[test] fn test_put_get() { let config = LogConfig::default(); - let mut store = LogManager::memorydb(config.clone()).unwrap(); + let store = LogManager::memorydb(config.clone()).unwrap(); let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1; // Aligned with size. let start_offset = 1024; diff --git a/node/sync/src/auto_sync/sync_store.rs b/node/sync/src/auto_sync/sync_store.rs index 4797cca..ff885e4 100644 --- a/node/sync/src/auto_sync/sync_store.rs +++ b/node/sync/src/auto_sync/sync_store.rs @@ -1,6 +1,6 @@ use super::tx_store::TxStore; use anyhow::Result; -use std::ops::Deref; + use storage::log_store::config::{ConfigTx, ConfigurableExt}; use storage_async::Store; @@ -57,12 +57,12 @@ impl SyncStore { let store = self.store.get_store(); // already in ready queue - if self.ready_txs.has(store.deref(), tx_seq)? { + if self.ready_txs.has(store, tx_seq)? { return Ok(false); } // always add in pending queue - self.pending_txs.add(store.deref(), None, tx_seq) + self.pending_txs.add(store, None, tx_seq) } pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result { @@ -73,13 +73,13 @@ impl SyncStore { // not in pending queue if !self .pending_txs - .remove(store.deref(), Some(&mut tx), tx_seq)? + .remove(store, Some(&mut tx), tx_seq)? { return Ok(false); } // move from pending to ready queue - let added = self.ready_txs.add(store.deref(), Some(&mut tx), tx_seq)?; + let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?; store.exec_configs(tx)?; @@ -94,13 +94,13 @@ impl SyncStore { // not in ready queue if !self .ready_txs - .remove(store.deref(), Some(&mut tx), tx_seq)? + .remove(store, Some(&mut tx), tx_seq)? { return Ok(false); } // move from ready to pending queue - let added = self.pending_txs.add(store.deref(), Some(&mut tx), tx_seq)?; + let added = self.pending_txs.add(store, Some(&mut tx), tx_seq)?; store.exec_configs(tx)?; @@ -111,24 +111,24 @@ impl SyncStore { let store = self.store.get_store(); // try to find a tx in ready queue with high priority - if let Some(val) = self.ready_txs.random(store.deref())? { + if let Some(val) = self.ready_txs.random(store)? { return Ok(Some(val)); } // otherwise, find tx in pending queue - self.pending_txs.random(store.deref()) + self.pending_txs.random(store) } pub async fn remove_tx(&self, tx_seq: u64) -> Result { let store = self.store.get_store(); // removed in ready queue - if self.ready_txs.remove(store.deref(), None, tx_seq)? { + if self.ready_txs.remove(store, None, tx_seq)? { return Ok(true); } // otherwise, try to remove in pending queue - self.pending_txs.remove(store.deref(), None, tx_seq) + self.pending_txs.remove(store, None, tx_seq) } } diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index b003bee..b6ef02c 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -651,7 +651,7 @@ mod tests { use storage::H256; use task_executor::{test_utils::TestRuntime, TaskExecutor}; use tokio::sync::mpsc::{self, UnboundedReceiver}; - use tokio::sync::RwLock; + #[test] fn test_status() { @@ -1423,8 +1423,6 @@ mod tests { ); let chunks = peer_store - .read() - .await .get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count) .unwrap() .unwrap(); @@ -1547,7 +1545,7 @@ mod tests { let num_chunks = 123; let config = LogConfig::default(); - let store = Arc::new(RwLock::new(LogManager::memorydb(config).unwrap())); + let store = Arc::new(LogManager::memorydb(config).unwrap()); create_controller(task_executor, peer_id, store, tx_id, num_chunks) } diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index b6bd198..1e717bc 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -23,7 +23,7 @@ use storage::config::ShardConfig; use storage::error::Result as StorageResult; use storage::log_store::Store as LogStore; use storage_async::Store; -use tokio::sync::{broadcast, mpsc, RwLock}; +use tokio::sync::{broadcast, mpsc}; const HEARTBEAT_INTERVAL_SEC: u64 = 5; @@ -1158,7 +1158,7 @@ mod tests { let config = LogConfig::default(); - let store = Arc::new(RwLock::new(LogManager::memorydb(config.clone()).unwrap())); + let store = Arc::new(LogManager::memorydb(config.clone()).unwrap()); let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id(); let file_location_cache: Arc = @@ -1364,7 +1364,7 @@ mod tests { wait_for_tx_finalized(runtime.store.clone(), tx_seq).await; - assert!(!runtime.store.read().await.check_tx_completed(0).unwrap()); + assert!(!runtime.store.check_tx_completed(0).unwrap()); // first file let tx_seq = 0u64; diff --git a/node/sync/src/test_util.rs b/node/sync/src/test_util.rs index ce0fdaa..c1154c4 100644 --- a/node/sync/src/test_util.rs +++ b/node/sync/src/test_util.rs @@ -8,7 +8,7 @@ use storage::{ }, LogManager, }; -use tokio::sync::RwLock; + /// Creates stores for local node and peers with initialized transaction of specified chunk count. /// The first store is for local node, and data not stored. The second store is for peers, and all @@ -101,7 +101,7 @@ pub mod tests { }; use storage_async::Store; use task_executor::test_utils::TestRuntime; - use tokio::sync::RwLock; + pub struct TestStoreRuntime { pub runtime: TestRuntime,