Remove async lock for storage.

This commit is contained in:
Peilun Li 2024-06-19 16:45:03 +08:00
parent 38cbb8e182
commit 6aa11b7b70
19 changed files with 92 additions and 194 deletions

View File

@ -122,7 +122,7 @@ impl LogEntryFetcher {
pub fn start_remove_finalized_block_task(
&self,
executor: &TaskExecutor,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
default_finalized_block_count: u64,
remove_finalized_block_interval_minutes: u64,
@ -133,7 +133,7 @@ impl LogEntryFetcher {
loop {
debug!("processing finalized block");
let processed_block_number = match store.read().await.get_sync_progress() {
let processed_block_number = match store.get_sync_progress() {
Ok(Some((processed_block_number, _))) => Some(processed_block_number),
Ok(None) => None,
Err(e) => {
@ -176,9 +176,7 @@ impl LogEntryFetcher {
}
for key in pending_keys.into_iter() {
if let Err(e) =
store.write().await.delete_block_hash_by_number(key)
{
if let Err(e) = store.delete_block_hash_by_number(key) {
error!(
"remove block tx for number {} error: e={:?}",
key, e

View File

@ -33,7 +33,7 @@ pub enum LogSyncEvent {
pub struct LogSyncManager {
config: LogSyncConfig,
log_fetcher: LogEntryFetcher,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
data_cache: DataCache,
next_tx_seq: u64,
@ -48,9 +48,9 @@ impl LogSyncManager {
pub async fn spawn(
config: LogSyncConfig,
executor: TaskExecutor,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
) -> Result<broadcast::Sender<LogSyncEvent>> {
let next_tx_seq = store.read().await.next_tx_seq();
let next_tx_seq = store.next_tx_seq();
let executor_clone = executor.clone();
let mut shutdown_sender = executor.shutdown_sender();
@ -81,8 +81,6 @@ impl LogSyncManager {
let block_hash_cache = Arc::new(RwLock::new(
store
.read()
.await
.get_block_hashes()?
.into_iter()
.map(|(x, y)| (x, Some(y)))
@ -124,7 +122,7 @@ impl LogSyncManager {
// TODO(zz): Handle reorg instead of return.
let mut need_handle_reorg = false;
let (mut start_block_number, mut start_block_hash) =
match log_sync_manager.store.read().await.get_sync_progress()? {
match log_sync_manager.store.get_sync_progress()? {
// No previous progress, so just use config.
None => {
let block_number = log_sync_manager.config.start_block_number;
@ -176,7 +174,7 @@ impl LogSyncManager {
log_sync_manager.handle_data(reorg_rx).await?;
if let Some((block_number, block_hash)) =
log_sync_manager.store.read().await.get_sync_progress()?
log_sync_manager.store.get_sync_progress()?
{
start_block_number = block_number;
start_block_hash = block_hash;
@ -301,7 +299,7 @@ impl LogSyncManager {
async fn process_reverted(&mut self, tx_seq: u64) {
warn!("revert for chain reorg: seq={}", tx_seq);
{
let store = self.store.read().await;
let store = self.store.clone();
for seq in tx_seq..self.next_tx_seq {
if matches!(store.check_tx_completed(seq), Ok(true)) {
if let Ok(Some(tx)) = store.get_tx_by_seq_number(seq) {
@ -325,7 +323,7 @@ impl LogSyncManager {
let _ = self.event_send.send(LogSyncEvent::ReorgDetected { tx_seq });
// TODO(zz): `wrapping_sub` here is a hack to handle the case of tx_seq=0.
if let Err(e) = self.store.write().await.revert_to(tx_seq.wrapping_sub(1)) {
if let Err(e) = self.store.revert_to(tx_seq.wrapping_sub(1)) {
error!("revert_to fails: e={:?}", e);
return;
}
@ -353,7 +351,7 @@ impl LogSyncManager {
);
}
self.store.write().await.put_sync_progress((
self.store.put_sync_progress((
block_number,
block_hash,
first_submission_index,
@ -396,12 +394,12 @@ impl LogSyncManager {
}
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
if let Err(e) = self.store.write().await.put_tx(tx.clone()) {
if let Err(e) = self.store.put_tx(tx.clone()) {
error!("put_tx error: e={:?}", e);
false
} else {
if let Some(data) = self.data_cache.pop_data(&tx.data_merkle_root) {
let store = self.store.write().await;
let store = self.store.clone();
// We are holding a mutable reference of LogSyncManager, so no chain reorg is
// possible after put_tx.
if let Err(e) = store

View File

@ -9,10 +9,9 @@ pub trait PoraLoader: Send + Sync {
}
#[async_trait]
impl PoraLoader for Arc<RwLock<dyn Store>> {
impl PoraLoader for Arc<dyn Store> {
async fn load_sealed_data(&self, chunk_index: u64) -> Option<MineLoadChunk> {
let store = &*self.read().await;
match store.flow().load_sealed_data(chunk_index) {
match self.flow().load_sealed_data(chunk_index) {
Ok(Some(chunk)) => Some(chunk),
_ => None,
}

View File

@ -21,11 +21,11 @@ fn set_miner_id(store: &dyn Store, miner_id: &H256) -> storage::error::Result<()
pub(crate) async fn check_and_request_miner_id(
config: &MinerConfig,
store: &RwLock<dyn Store>,
store: &dyn Store,
provider: &Arc<MineServiceMiddleware>,
) -> Result<H256, String> {
let db_miner_id = load_miner_id(&*store.read().await)
.map_err(|e| format!("miner_id on db corrupt: {:?}", e))?;
let db_miner_id =
load_miner_id(store).map_err(|e| format!("miner_id on db corrupt: {:?}", e))?;
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
@ -42,7 +42,7 @@ pub(crate) async fn check_and_request_miner_id(
}
(None, Some(c_id)) => {
check_miner_id(&mine_contract, c_id).await?;
set_miner_id(&*store.write().await, &c_id)
set_miner_id(store, &c_id)
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
Ok(c_id)
}
@ -53,8 +53,7 @@ pub(crate) async fn check_and_request_miner_id(
(None, None) => {
let beneficiary = provider.address();
let id = request_miner_id(&mine_contract, beneficiary).await?;
set_miner_id(&*store.write().await, &id)
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
set_miner_id(store, &id).map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
Ok(id)
}
}

View File

@ -22,7 +22,7 @@ const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64,
miner_id: H256,
@ -32,7 +32,7 @@ impl Sealer {
pub fn spawn(
executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
config: &MinerConfig,
miner_id: H256,
) {
@ -152,19 +152,11 @@ impl Sealer {
async fn fetch_task(&self) -> Result<Option<Vec<SealTask>>> {
let seal_index_max = self.last_context_flow_length as usize / SECTORS_PER_SEAL;
self.store
.read()
.await
.flow()
.pull_seal_chunk(seal_index_max)
self.store.flow().pull_seal_chunk(seal_index_max)
}
async fn submit_answer(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.store
.write()
.await
.flow_mut()
.submit_seal_result(answers)
self.store.flow().submit_seal_result(answers)
}
async fn seal_iteration(&mut self) -> Result<bool> {

View File

@ -29,13 +29,13 @@ impl MineService {
executor: task_executor::TaskExecutor,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
config: MinerConfig,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, &store, &provider).await?;
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn(

View File

@ -22,7 +22,7 @@ pub struct Submitter {
mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
default_gas_limit: Option<U256>,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
}
impl Submitter {
@ -30,7 +30,7 @@ impl Submitter {
executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
provider: Arc<MineServiceMiddleware>,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
config: &MinerConfig,
) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
@ -80,8 +80,6 @@ impl Submitter {
let flow_proof = self
.store
.read()
.await
.get_proof_at_root(
&mine_answer.context_flow_root,
mine_answer.recall_position,

View File

@ -32,7 +32,7 @@ impl PrunerConfig {
pub struct Pruner {
config: PrunerConfig,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
sender: mpsc::UnboundedSender<PrunerMessage>,
miner_sender: Option<broadcast::Sender<MinerMessage>>,
@ -42,10 +42,10 @@ impl Pruner {
pub async fn spawn(
executor: TaskExecutor,
mut config: PrunerConfig,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
miner_sender: Option<broadcast::Sender<MinerMessage>>,
) -> Result<mpsc::UnboundedReceiver<PrunerMessage>> {
if let Some(shard_config) = get_shard_config(&store).await? {
if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
config.shard_config = shard_config;
}
let (tx, rx) = mpsc::unbounded_channel();
@ -76,7 +76,7 @@ impl Pruner {
batch.push(index);
if batch.len() == self.config.batch_size || iter.peek().is_none() {
debug!(start = batch.first(), end = batch.last(), "prune batch");
self.store.write().await.remove_chunks_batch(&batch)?;
self.store.remove_chunks_batch(&batch)?;
batch = Vec::with_capacity(self.config.batch_size);
tokio::time::sleep(self.config.batch_wait_time).await;
}
@ -87,7 +87,7 @@ impl Pruner {
}
async fn maybe_update(&mut self) -> Result<Option<Box<dyn Send + Iterator<Item = u64>>>> {
let current_size = self.store.read().await.flow().get_num_entries()?;
let current_size = self.store.flow().get_num_entries()?;
debug!(
current_size = current_size,
config = ?self.config.shard_config,
@ -109,7 +109,7 @@ impl Pruner {
config.num_shard *= 2;
// Generate delete list
let flow_len = self.store.read().await.get_context()?.1;
let flow_len = self.store.get_context()?.1;
let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard;
return Ok(Some(Box::new(
(start_index as u64..flow_len).step_by(config.num_shard),
@ -124,16 +124,16 @@ impl Pruner {
}
self.sender
.send(PrunerMessage::ChangeShardConfig(self.config.shard_config))?;
let mut store = self.store.write().await;
store
.flow_mut()
self.store
.flow()
.update_shard_config(self.config.shard_config);
store.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
self.store
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
}
}
async fn get_shard_config(store: &RwLock<dyn Store>) -> Result<Option<ShardConfig>> {
store.read().await.get_config_decoded(&SHARD_CONFIG_KEY)
async fn get_shard_config(store: &dyn Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY)
}
#[derive(Debug)]

View File

@ -269,13 +269,7 @@ impl Libp2pEventHandler {
};
let timestamp = timestamp_now();
let shard_config = self
.store
.get_store()
.read()
.await
.flow()
.get_shard_config();
let shard_config = self.store.get_store().flow().get_shard_config();
let msg = AnnounceFile {
tx_id,
@ -598,7 +592,7 @@ mod tests {
sync_recv: SyncReceiver,
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
chunk_pool_recv: mpsc::UnboundedReceiver<ChunkPoolMessage>,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
file_location_cache: Arc<FileLocationCache>,
peers: Arc<RwLock<PeerManager>>,
}
@ -621,7 +615,7 @@ mod tests {
sync_recv,
chunk_pool_send,
chunk_pool_recv,
store: Arc::new(RwLock::new(store)),
store: Arc::new(store),
file_location_cache: Arc::new(FileLocationCache::default()),
peers: Arc::new(RwLock::new(PeerManager::new(Config::default()))),
}

View File

@ -58,7 +58,7 @@ impl RouterService {
_miner_send: Option<broadcast::Sender<MinerMessage>>,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
local_keypair: Keypair,
config: Config,

View File

@ -23,8 +23,6 @@ impl RpcServer for RpcServerImpl {
.ctx
.log_store
.get_store()
.read()
.await
.get_sync_progress()?
.unwrap_or_default();
@ -156,14 +154,7 @@ impl RpcServer for RpcServerImpl {
async fn get_shard_config(&self) -> RpcResult<ShardConfig> {
debug!("zgs_getShardConfig");
let shard_config = self
.ctx
.log_store
.get_store()
.read()
.await
.flow()
.get_shard_config();
let shard_config = self.ctx.log_store.get_store().flow().get_shard_config();
Ok(shard_config)
}
}

View File

@ -68,7 +68,7 @@ struct ChunkPoolComponents {
#[derive(Default)]
pub struct ClientBuilder {
runtime_context: Option<RuntimeContext>,
store: Option<Arc<RwLock<dyn Store>>>,
store: Option<Arc<dyn Store>>,
async_store: Option<storage_async::Store>,
file_location_cache: Option<Arc<FileLocationCache>>,
network: Option<NetworkComponents>,
@ -89,10 +89,10 @@ impl ClientBuilder {
/// Initializes in-memory storage.
pub fn with_memory_store(mut self) -> Result<Self, String> {
// TODO(zz): Set config.
let store = Arc::new(RwLock::new(
let store = Arc::new(
LogManager::memorydb(LogConfig::default())
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
));
);
self.store = Some(store.clone());
@ -105,10 +105,10 @@ impl ClientBuilder {
/// Initializes RocksDB storage.
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let store = Arc::new(RwLock::new(
let store = Arc::new(
LogManager::rocksdb(LogConfig::default(), &config.db_dir)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
));
);
self.store = Some(store.clone());

View File

@ -28,14 +28,14 @@ macro_rules! delegate {
#[derive(Clone)]
pub struct Store {
/// Log and transaction storage.
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
/// Tokio executor for spawning worker tasks.
executor: TaskExecutor,
}
impl Store {
pub fn new(store: Arc<RwLock<dyn LogStore>>, executor: TaskExecutor) -> Self {
pub fn new(store: Arc<dyn LogStore>, executor: TaskExecutor) -> Self {
Store { store, executor }
}
@ -64,7 +64,7 @@ impl Store {
async fn spawn<T, F>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut dyn LogStore) -> Result<T> + Send + 'static,
F: FnOnce(&dyn LogStore) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let store = self.store.clone();
@ -73,7 +73,7 @@ impl Store {
self.executor.spawn(
async move {
// FIXME(zz): Not all functions need `write`. Refactor store usage.
let res = f(&mut *store.write().await);
let res = f(&*store);
if tx.send(res).is_err() {
error!("Unable to complete async storage operation: the receiver dropped");
@ -87,7 +87,7 @@ impl Store {
}
// FIXME(zz): Refactor the lock and async call here.
pub fn get_store(&self) -> &RwLock<dyn LogStore> {
pub fn get_store(&self) -> &dyn LogStore {
self.store.as_ref()
}
}

View File

@ -87,7 +87,7 @@ impl FlowStore {
#[derive(Clone, Debug)]
pub struct FlowConfig {
pub batch_size: usize,
pub shard_config: ShardConfig,
pub shard_config: Arc<RwLock<ShardConfig>>,
}
impl Default for FlowConfig {
@ -208,7 +208,7 @@ impl FlowRead for FlowStore {
}
fn get_shard_config(&self) -> ShardConfig {
self.config.shard_config
self.config.shard_config.read().clone()
}
}
@ -233,7 +233,7 @@ impl FlowWrite for FlowStore {
.expect("in range");
let chunk_index = chunk.start_index / self.config.batch_size as u64;
if !self.config.shard_config.in_range(chunk_index) {
if !self.config.shard_config.read().in_range(chunk_index) {
// The data are in a shard range that we are not storing.
continue;
}
@ -273,8 +273,8 @@ impl FlowWrite for FlowStore {
Ok(())
}
fn update_shard_config(&mut self, shard_config: ShardConfig) {
self.config.shard_config = shard_config;
fn update_shard_config(&self, shard_config: ShardConfig) {
*self.config.shard_config.write() = shard_config;
}
}
@ -313,7 +313,7 @@ impl FlowSeal for FlowStore {
Ok(Some(tasks))
}
fn submit_seal_result(&mut self, answers: Vec<SealAnswer>) -> Result<()> {
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
let mut to_seal_set = self.to_seal_set.write();
let is_consistent = |answer: &SealAnswer| {
to_seal_set

View File

@ -213,7 +213,7 @@ pub trait FlowWrite {
fn truncate(&self, start_index: u64) -> Result<()>;
/// Update the shard config.
fn update_shard_config(&mut self, shard_config: ShardConfig);
fn update_shard_config(&self, shard_config: ShardConfig);
}
pub struct SealTask {
@ -247,7 +247,7 @@ pub trait FlowSeal {
/// Submit sealing result
fn submit_seal_result(&mut self, answers: Vec<SealAnswer>) -> Result<()>;
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
}
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}

View File

@ -29,7 +29,7 @@ impl SyncStore {
}
pub async fn get_tx_seq_range(&self) -> Result<(Option<u64>, Option<u64>)> {
let store = self.store.get_store().read().await;
let store = self.store.get_store();
// load next_tx_seq
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?;
@ -43,8 +43,6 @@ impl SyncStore {
pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
self.store
.get_store()
.write()
.await
.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq)
}
@ -52,13 +50,11 @@ impl SyncStore {
debug!(%tx_seq, "set_max_tx_seq");
self.store
.get_store()
.write()
.await
.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
}
pub async fn add_pending_tx(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
// already in ready queue
if self.ready_txs.has(store.deref(), tx_seq)? {
@ -70,7 +66,7 @@ impl SyncStore {
}
pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
let mut tx = ConfigTx::default();
@ -91,7 +87,7 @@ impl SyncStore {
}
pub async fn downgrade_tx_to_pending(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
let mut tx = ConfigTx::default();
@ -112,7 +108,7 @@ impl SyncStore {
}
pub async fn random_tx(&self) -> Result<Option<u64>> {
let store = self.store.get_store().read().await;
let store = self.store.get_store();
// try to find a tx in ready queue with high priority
if let Some(val) = self.ready_txs.random(store.deref())? {
@ -124,7 +120,7 @@ impl SyncStore {
}
pub async fn remove_tx(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
// removed in ready queue
if self.ready_txs.remove(store.deref(), None, tx_seq)? {

View File

@ -424,8 +424,6 @@ impl SerialSyncController {
let validation_result = self
.store
.get_store()
.write()
.await
.validate_and_insert_range_proof(self.tx_seq, &response);
match validation_result {
@ -447,13 +445,7 @@ impl SerialSyncController {
self.failures = 0;
let shard_config = self
.store
.get_store()
.read()
.await
.flow()
.get_shard_config();
let shard_config = self.store.get_store().flow().get_shard_config();
let next_chunk = shard_config.next_segment_index(
(from_chunk / PORA_CHUNK_SIZE as u64) as usize,
(self.tx_start_chunk_in_flow / PORA_CHUNK_SIZE as u64) as usize,
@ -1107,8 +1099,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1141,8 +1131,6 @@ mod tests {
);
let mut chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1210,8 +1198,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1276,8 +1262,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1350,8 +1334,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1395,8 +1377,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, 1024)
.unwrap()
.unwrap();
@ -1575,7 +1555,7 @@ mod tests {
fn create_controller(
task_executor: TaskExecutor,
peer_id: Option<PeerId>,
store: Arc<RwLock<LogManager>>,
store: Arc<LogManager>,
tx_id: TxID,
num_chunks: usize,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {

View File

@ -129,7 +129,7 @@ impl SyncService {
pub async fn spawn(
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
) -> Result<SyncSender> {
@ -148,7 +148,7 @@ impl SyncService {
config: Config,
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
) -> Result<SyncSender> {
@ -723,8 +723,8 @@ mod tests {
runtime: TestRuntime,
chunk_count: usize,
store: Arc<RwLock<LogManager>>,
peer_store: Arc<RwLock<LogManager>>,
store: Arc<LogManager>,
peer_store: Arc<LogManager>,
txs: Vec<Transaction>,
init_data: Vec<u8>,
@ -913,8 +913,6 @@ mod tests {
runtime
.peer_store
.read()
.await
.validate_range_proof(0, &response)
.expect("validate proof");
}
@ -1186,10 +1184,7 @@ mod tests {
.unwrap();
thread::sleep(Duration::from_millis(1000));
assert_eq!(
store.read().await.get_tx_by_seq_number(tx_seq).unwrap(),
None
);
assert_eq!(store.get_tx_by_seq_number(tx_seq).unwrap(), None);
assert!(network_recv.try_recv().is_err());
}
@ -1205,18 +1200,13 @@ mod tests {
.unwrap();
thread::sleep(Duration::from_millis(1000));
assert!(runtime
.peer_store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(runtime.peer_store.check_tx_completed(tx_seq).unwrap());
assert!(runtime.network_recv.try_recv().is_err());
}
async fn wait_for_tx_finalized(store: Arc<RwLock<LogManager>>, tx_seq: u64) {
async fn wait_for_tx_finalized(store: Arc<LogManager>, tx_seq: u64) {
let deadline = Instant::now() + Duration::from_millis(5000);
while !store.read().await.check_tx_completed(tx_seq).unwrap() {
while !store.check_tx_completed(tx_seq).unwrap() {
if Instant::now() >= deadline {
panic!("Failed to wait tx completed");
}
@ -1238,12 +1228,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
assert!(!matches!(
sync_send
@ -1313,12 +1298,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1368,13 +1348,8 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.read().await.check_tx_completed(0).unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
assert!(!runtime.store.check_tx_completed(0).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1460,12 +1435,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1503,12 +1473,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1589,12 +1554,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
assert!(!matches!(
sync_send
@ -1620,7 +1580,7 @@ mod tests {
async fn receive_chunk_request(
network_recv: &mut UnboundedReceiver<NetworkMessage>,
sync_send: &SyncSender,
peer_store: Arc<RwLock<LogManager>>,
peer_store: Arc<LogManager>,
init_peer_id: PeerId,
tx_seq: u64,
index_start: u64,
@ -1654,8 +1614,6 @@ mod tests {
};
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(
tx_seq,
req.index_start as usize,

View File

@ -17,8 +17,8 @@ use tokio::sync::RwLock;
pub fn create_2_store(
chunk_count: Vec<usize>,
) -> (
Arc<RwLock<LogManager>>,
Arc<RwLock<LogManager>>,
Arc<LogManager>,
Arc<LogManager>,
Vec<Transaction>,
Vec<Vec<u8>>,
) {
@ -37,12 +37,7 @@ pub fn create_2_store(
offset = ret.2;
}
(
Arc::new(RwLock::new(store)),
Arc::new(RwLock::new(peer_store)),
txs,
data,
)
(Arc::new(store), Arc::new(peer_store), txs, data)
}
fn generate_data(
@ -115,7 +110,7 @@ pub mod tests {
impl Default for TestStoreRuntime {
fn default() -> Self {
let store = Arc::new(RwLock::new(Self::new_store()));
let store = Arc::new(Self::new_store());
Self::new(store)
}
}
@ -125,7 +120,7 @@ pub mod tests {
LogManager::memorydb(LogConfig::default()).unwrap()
}
pub fn new(store: Arc<RwLock<dyn LogStore>>) -> TestStoreRuntime {
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
Self {