mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-19 03:25:18 +00:00
Use spawn_blocking for storage task.
This commit is contained in:
parent
8f37a4a594
commit
c67ae6f835
@ -2,7 +2,6 @@ use async_trait::async_trait;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::log_store::{MineLoadChunk, Store};
|
use storage::log_store::{MineLoadChunk, Store};
|
||||||
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait PoraLoader: Send + Sync {
|
pub trait PoraLoader: Send + Sync {
|
||||||
async fn load_sealed_data(&self, index: u64) -> Option<MineLoadChunk>;
|
async fn load_sealed_data(&self, index: u64) -> Option<MineLoadChunk>;
|
||||||
|
@ -8,7 +8,6 @@ use std::sync::Arc;
|
|||||||
use storage::log_store::{config::ConfigurableExt, Store};
|
use storage::log_store::{config::ConfigurableExt, Store};
|
||||||
use storage::H256;
|
use storage::H256;
|
||||||
|
|
||||||
|
|
||||||
const MINER_ID: &str = "mine.miner_id";
|
const MINER_ID: &str = "mine.miner_id";
|
||||||
|
|
||||||
pub fn load_miner_id(store: &dyn Store) -> storage::error::Result<Option<H256>> {
|
pub fn load_miner_id(store: &dyn Store) -> storage::error::Result<Option<H256>> {
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use tokio::{
|
use tokio::time::{sleep, Duration, Instant};
|
||||||
time::{sleep, Duration, Instant},
|
|
||||||
};
|
|
||||||
|
|
||||||
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
|
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
|
||||||
use storage::{
|
use storage::{
|
||||||
|
@ -6,8 +6,8 @@ use network::NetworkMessage;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
use storage::log_store::Store;
|
use storage::log_store::Store;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::{broadcast};
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum MinerMessage {
|
pub enum MinerMessage {
|
||||||
|
@ -8,7 +8,7 @@ use shared_types::FlowRangeProof;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::log_store::Store;
|
use storage::log_store::Store;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::{mpsc};
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::config::{MineServiceMiddleware, MinerConfig};
|
use crate::config::{MineServiceMiddleware, MinerConfig};
|
||||||
use crate::pora::AnswerWithoutProof;
|
use crate::pora::AnswerWithoutProof;
|
||||||
|
@ -6,7 +6,7 @@ use shared_types::{Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof,
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::{error, error::Result, log_store::Store as LogStore, H256};
|
use storage::{error, error::Result, log_store::Store as LogStore, H256};
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::{oneshot};
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
pub use storage::config::ShardConfig;
|
pub use storage::config::ShardConfig;
|
||||||
|
|
||||||
@ -70,8 +70,8 @@ impl Store {
|
|||||||
let store = self.store.clone();
|
let store = self.store.clone();
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
self.executor.spawn(
|
self.executor.spawn_blocking(
|
||||||
async move {
|
move || {
|
||||||
// FIXME(zz): Not all functions need `write`. Refactor store usage.
|
// FIXME(zz): Not all functions need `write`. Refactor store usage.
|
||||||
let res = f(&*store);
|
let res = f(&*store);
|
||||||
|
|
||||||
|
@ -71,10 +71,7 @@ impl SyncStore {
|
|||||||
let mut tx = ConfigTx::default();
|
let mut tx = ConfigTx::default();
|
||||||
|
|
||||||
// not in pending queue
|
// not in pending queue
|
||||||
if !self
|
if !self.pending_txs.remove(store, Some(&mut tx), tx_seq)? {
|
||||||
.pending_txs
|
|
||||||
.remove(store, Some(&mut tx), tx_seq)?
|
|
||||||
{
|
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,10 +89,7 @@ impl SyncStore {
|
|||||||
let mut tx = ConfigTx::default();
|
let mut tx = ConfigTx::default();
|
||||||
|
|
||||||
// not in ready queue
|
// not in ready queue
|
||||||
if !self
|
if !self.ready_txs.remove(store, Some(&mut tx), tx_seq)? {
|
||||||
.ready_txs
|
|
||||||
.remove(store, Some(&mut tx), tx_seq)?
|
|
||||||
{
|
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -651,7 +651,6 @@ mod tests {
|
|||||||
use storage::H256;
|
use storage::H256;
|
||||||
use task_executor::{test_utils::TestRuntime, TaskExecutor};
|
use task_executor::{test_utils::TestRuntime, TaskExecutor};
|
||||||
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_status() {
|
fn test_status() {
|
||||||
|
@ -9,7 +9,6 @@ use storage::{
|
|||||||
LogManager,
|
LogManager,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
|
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
|
||||||
/// The first store is for local node, and data not stored. The second store is for peers, and all
|
/// The first store is for local node, and data not stored. The second store is for peers, and all
|
||||||
/// transactions are finalized for file sync.
|
/// transactions are finalized for file sync.
|
||||||
@ -101,7 +100,6 @@ pub mod tests {
|
|||||||
};
|
};
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use task_executor::test_utils::TestRuntime;
|
use task_executor::test_utils::TestRuntime;
|
||||||
|
|
||||||
|
|
||||||
pub struct TestStoreRuntime {
|
pub struct TestStoreRuntime {
|
||||||
pub runtime: TestRuntime,
|
pub runtime: TestRuntime,
|
||||||
|
Loading…
Reference in New Issue
Block a user