mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-19 03:25:18 +00:00
Fix tests and warnings.
This commit is contained in:
parent
6aa11b7b70
commit
8f37a4a594
@ -1,7 +1,7 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::log_store::{MineLoadChunk, Store};
|
use storage::log_store::{MineLoadChunk, Store};
|
||||||
use tokio::sync::RwLock;
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait PoraLoader: Send + Sync {
|
pub trait PoraLoader: Send + Sync {
|
||||||
|
@ -7,7 +7,7 @@ use ethers::contract::EthEvent;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::log_store::{config::ConfigurableExt, Store};
|
use storage::log_store::{config::ConfigurableExt, Store};
|
||||||
use storage::H256;
|
use storage::H256;
|
||||||
use tokio::sync::RwLock;
|
|
||||||
|
|
||||||
const MINER_ID: &str = "mine.miner_id";
|
const MINER_ID: &str = "mine.miner_id";
|
||||||
|
|
||||||
|
@ -2,7 +2,6 @@ use std::{collections::BTreeMap, sync::Arc};
|
|||||||
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::RwLock,
|
|
||||||
time::{sleep, Duration, Instant},
|
time::{sleep, Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ use std::sync::Arc;
|
|||||||
use storage::config::ShardConfig;
|
use storage::config::ShardConfig;
|
||||||
use storage::log_store::Store;
|
use storage::log_store::Store;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::{broadcast, RwLock};
|
use tokio::sync::{broadcast};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum MinerMessage {
|
pub enum MinerMessage {
|
||||||
|
@ -8,7 +8,7 @@ use shared_types::FlowRangeProof;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::log_store::Store;
|
use storage::log_store::Store;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::{mpsc, RwLock};
|
use tokio::sync::{mpsc};
|
||||||
|
|
||||||
use crate::config::{MineServiceMiddleware, MinerConfig};
|
use crate::config::{MineServiceMiddleware, MinerConfig};
|
||||||
use crate::pora::AnswerWithoutProof;
|
use crate::pora::AnswerWithoutProof;
|
||||||
|
@ -8,7 +8,7 @@ use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
|
|||||||
use storage::log_store::config::ConfigurableExt;
|
use storage::log_store::config::ConfigurableExt;
|
||||||
use storage::log_store::Store;
|
use storage::log_store::Store;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::{broadcast, mpsc, RwLock};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
// Start pruning when the db directory size exceeds 0.9 * limit.
|
// Start pruning when the db directory size exceeds 0.9 * limit.
|
||||||
|
@ -15,7 +15,7 @@ use storage::log_store::log_manager::LogConfig;
|
|||||||
use storage::log_store::Store;
|
use storage::log_store::Store;
|
||||||
use storage::{LogManager, StorageConfig};
|
use storage::{LogManager, StorageConfig};
|
||||||
use sync::{SyncSender, SyncService};
|
use sync::{SyncSender, SyncService};
|
||||||
use tokio::sync::{broadcast, mpsc, RwLock};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
|
|
||||||
macro_rules! require {
|
macro_rules! require {
|
||||||
($component:expr, $self:ident, $e:ident) => {
|
($component:expr, $self:ident, $e:ident) => {
|
||||||
|
@ -6,7 +6,7 @@ use shared_types::{Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof,
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use storage::{error, error::Result, log_store::Store as LogStore, H256};
|
use storage::{error, error::Result, log_store::Store as LogStore, H256};
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
use tokio::sync::{oneshot, RwLock};
|
use tokio::sync::{oneshot};
|
||||||
|
|
||||||
pub use storage::config::ShardConfig;
|
pub use storage::config::ShardConfig;
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ use std::cmp;
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_put_get() {
|
fn test_put_get() {
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
let mut store = LogManager::memorydb(config.clone()).unwrap();
|
let store = LogManager::memorydb(config.clone()).unwrap();
|
||||||
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
|
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
|
||||||
// Aligned with size.
|
// Aligned with size.
|
||||||
let start_offset = 1024;
|
let start_offset = 1024;
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use super::tx_store::TxStore;
|
use super::tx_store::TxStore;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use std::ops::Deref;
|
|
||||||
use storage::log_store::config::{ConfigTx, ConfigurableExt};
|
use storage::log_store::config::{ConfigTx, ConfigurableExt};
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
|
|
||||||
@ -57,12 +57,12 @@ impl SyncStore {
|
|||||||
let store = self.store.get_store();
|
let store = self.store.get_store();
|
||||||
|
|
||||||
// already in ready queue
|
// already in ready queue
|
||||||
if self.ready_txs.has(store.deref(), tx_seq)? {
|
if self.ready_txs.has(store, tx_seq)? {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// always add in pending queue
|
// always add in pending queue
|
||||||
self.pending_txs.add(store.deref(), None, tx_seq)
|
self.pending_txs.add(store, None, tx_seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result<bool> {
|
pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result<bool> {
|
||||||
@ -73,13 +73,13 @@ impl SyncStore {
|
|||||||
// not in pending queue
|
// not in pending queue
|
||||||
if !self
|
if !self
|
||||||
.pending_txs
|
.pending_txs
|
||||||
.remove(store.deref(), Some(&mut tx), tx_seq)?
|
.remove(store, Some(&mut tx), tx_seq)?
|
||||||
{
|
{
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// move from pending to ready queue
|
// move from pending to ready queue
|
||||||
let added = self.ready_txs.add(store.deref(), Some(&mut tx), tx_seq)?;
|
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
|
||||||
|
|
||||||
store.exec_configs(tx)?;
|
store.exec_configs(tx)?;
|
||||||
|
|
||||||
@ -94,13 +94,13 @@ impl SyncStore {
|
|||||||
// not in ready queue
|
// not in ready queue
|
||||||
if !self
|
if !self
|
||||||
.ready_txs
|
.ready_txs
|
||||||
.remove(store.deref(), Some(&mut tx), tx_seq)?
|
.remove(store, Some(&mut tx), tx_seq)?
|
||||||
{
|
{
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// move from ready to pending queue
|
// move from ready to pending queue
|
||||||
let added = self.pending_txs.add(store.deref(), Some(&mut tx), tx_seq)?;
|
let added = self.pending_txs.add(store, Some(&mut tx), tx_seq)?;
|
||||||
|
|
||||||
store.exec_configs(tx)?;
|
store.exec_configs(tx)?;
|
||||||
|
|
||||||
@ -111,24 +111,24 @@ impl SyncStore {
|
|||||||
let store = self.store.get_store();
|
let store = self.store.get_store();
|
||||||
|
|
||||||
// try to find a tx in ready queue with high priority
|
// try to find a tx in ready queue with high priority
|
||||||
if let Some(val) = self.ready_txs.random(store.deref())? {
|
if let Some(val) = self.ready_txs.random(store)? {
|
||||||
return Ok(Some(val));
|
return Ok(Some(val));
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise, find tx in pending queue
|
// otherwise, find tx in pending queue
|
||||||
self.pending_txs.random(store.deref())
|
self.pending_txs.random(store)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_tx(&self, tx_seq: u64) -> Result<bool> {
|
pub async fn remove_tx(&self, tx_seq: u64) -> Result<bool> {
|
||||||
let store = self.store.get_store();
|
let store = self.store.get_store();
|
||||||
|
|
||||||
// removed in ready queue
|
// removed in ready queue
|
||||||
if self.ready_txs.remove(store.deref(), None, tx_seq)? {
|
if self.ready_txs.remove(store, None, tx_seq)? {
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise, try to remove in pending queue
|
// otherwise, try to remove in pending queue
|
||||||
self.pending_txs.remove(store.deref(), None, tx_seq)
|
self.pending_txs.remove(store, None, tx_seq)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -651,7 +651,7 @@ mod tests {
|
|||||||
use storage::H256;
|
use storage::H256;
|
||||||
use task_executor::{test_utils::TestRuntime, TaskExecutor};
|
use task_executor::{test_utils::TestRuntime, TaskExecutor};
|
||||||
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
||||||
use tokio::sync::RwLock;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_status() {
|
fn test_status() {
|
||||||
@ -1423,8 +1423,6 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let chunks = peer_store
|
let chunks = peer_store
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
|
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -1547,7 +1545,7 @@ mod tests {
|
|||||||
let num_chunks = 123;
|
let num_chunks = 123;
|
||||||
|
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
let store = Arc::new(RwLock::new(LogManager::memorydb(config).unwrap()));
|
let store = Arc::new(LogManager::memorydb(config).unwrap());
|
||||||
|
|
||||||
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
|
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ use storage::config::ShardConfig;
|
|||||||
use storage::error::Result as StorageResult;
|
use storage::error::Result as StorageResult;
|
||||||
use storage::log_store::Store as LogStore;
|
use storage::log_store::Store as LogStore;
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::sync::{broadcast, mpsc, RwLock};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
|
|
||||||
const HEARTBEAT_INTERVAL_SEC: u64 = 5;
|
const HEARTBEAT_INTERVAL_SEC: u64 = 5;
|
||||||
|
|
||||||
@ -1158,7 +1158,7 @@ mod tests {
|
|||||||
|
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
|
|
||||||
let store = Arc::new(RwLock::new(LogManager::memorydb(config.clone()).unwrap()));
|
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
|
||||||
|
|
||||||
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
||||||
let file_location_cache: Arc<FileLocationCache> =
|
let file_location_cache: Arc<FileLocationCache> =
|
||||||
@ -1364,7 +1364,7 @@ mod tests {
|
|||||||
|
|
||||||
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
|
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
|
||||||
|
|
||||||
assert!(!runtime.store.read().await.check_tx_completed(0).unwrap());
|
assert!(!runtime.store.check_tx_completed(0).unwrap());
|
||||||
|
|
||||||
// first file
|
// first file
|
||||||
let tx_seq = 0u64;
|
let tx_seq = 0u64;
|
||||||
|
@ -8,7 +8,7 @@ use storage::{
|
|||||||
},
|
},
|
||||||
LogManager,
|
LogManager,
|
||||||
};
|
};
|
||||||
use tokio::sync::RwLock;
|
|
||||||
|
|
||||||
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
|
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
|
||||||
/// The first store is for local node, and data not stored. The second store is for peers, and all
|
/// The first store is for local node, and data not stored. The second store is for peers, and all
|
||||||
@ -101,7 +101,7 @@ pub mod tests {
|
|||||||
};
|
};
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use task_executor::test_utils::TestRuntime;
|
use task_executor::test_utils::TestRuntime;
|
||||||
use tokio::sync::RwLock;
|
|
||||||
|
|
||||||
pub struct TestStoreRuntime {
|
pub struct TestStoreRuntime {
|
||||||
pub runtime: TestRuntime,
|
pub runtime: TestRuntime,
|
||||||
|
Loading…
Reference in New Issue
Block a user