From 07704fedc98c140ae4862baa40a4fbb9a9ecace0 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Wed, 30 Oct 2024 18:51:45 +0800 Subject: [PATCH] fix: wait for async storage to complete before receiving the result. --- node/storage-async/src/lib.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 95cf6b5..86e94fb 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -1,7 +1,7 @@ #[macro_use] extern crate tracing; -use anyhow::bail; +use anyhow::{anyhow, bail}; use shared_types::{ Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction, }; @@ -136,17 +136,21 @@ impl Store { let store = self.store.clone(); let (tx, rx) = oneshot::channel(); - self.executor.spawn_blocking( - move || { - // FIXME(zz): Not all functions need `write`. Refactor store usage. - let res = f(&*store); + self.executor + .spawn_blocking_handle( + move || { + // FIXME(zz): Not all functions need `write`. Refactor store usage. + let res = f(&*store); - if tx.send(res).is_err() { - error!("Unable to complete async storage operation: the receiver dropped"); - } - }, - WORKER_TASK_NAME, - ); + if tx.send(res).is_err() { + error!("Unable to complete async storage operation: the receiver dropped"); + } + }, + WORKER_TASK_NAME, + ) + .ok_or(anyhow!("Unable to spawn async storage work"))? + .await + .map_err(|e| anyhow!("join error: e={:?}", e))?; rx.await .unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))