change to executor

This commit is contained in:
Peter Zhang 2024-10-03 18:38:18 +08:00
parent 7e0da61256
commit c8fbb13443
10 changed files with 86 additions and 42 deletions

1
Cargo.lock generated
View File

@ -7280,6 +7280,7 @@ dependencies = [
"serde_json",
"shared_types",
"static_assertions",
"task_executor",
"tiny-keccak",
"tokio",
"tracing",

View File

@ -911,7 +911,9 @@ mod tests {
let (network_send, network_recv) = mpsc::unbounded_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let store = LogManager::memorydb(LogConfig::default()).unwrap();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
Self {
runtime,
network_globals: Arc::new(network_globals),

View File

@ -89,9 +89,10 @@ impl ClientBuilder {
/// Initializes in-memory storage.
pub fn with_memory_store(mut self) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
// TODO(zz): Set config.
let store = Arc::new(
LogManager::memorydb(LogConfig::default())
LogManager::memorydb(LogConfig::default(), executor)
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
);
@ -109,8 +110,9 @@ impl ClientBuilder {
/// Initializes RocksDB storage.
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(LogConfig::default(), &config.db_dir)
LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);

View File

@ -30,6 +30,7 @@ serde = { version = "1.0.197", features = ["derive"] }
parking_lot = "0.12.3"
serde_json = "1.0.127"
tokio = { version = "1.10.0", features = ["sync"] }
task_executor = { path = "../../common/task_executor" }
[dev-dependencies]
rand = "0.8.5"

View File

@ -14,14 +14,18 @@ use storage::{
},
LogManager,
};
use task_executor::test_utils::TestRuntime;
fn write_performance(c: &mut Criterion) {
if Path::new("db_write").exists() {
fs::remove_dir_all("db_write").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_write")
LogManager::rocksdb(LogConfig::default(), "db_write", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
@ -105,8 +109,12 @@ fn read_performance(c: &mut Criterion) {
fs::remove_dir_all("db_read").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_read")
LogManager::rocksdb(LogConfig::default(), "db_read", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));

View File

@ -23,7 +23,6 @@ use std::collections::BTreeMap;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex;
@ -606,19 +605,27 @@ impl LogStoreRead for LogManager {
}
impl LogManager {
pub fn rocksdb(config: LogConfig, path: impl AsRef<Path>) -> Result<Self> {
pub fn rocksdb(
config: LogConfig,
path: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
db_config.enable_statistics = true;
let db = Arc::new(Database::open(&db_config, path)?);
Self::new(db, config)
Self::new(db, config, executor)
}
pub fn memorydb(config: LogConfig) -> Result<Self> {
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(db, config)
Self::new(db, config, executor)
}
fn new(db: Arc<dyn ZgsKeyValueDB>, config: LogConfig) -> Result<Self> {
fn new(
db: Arc<dyn ZgsKeyValueDB>,
config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let tx_store = TransactionStore::new(db.clone())?;
let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
let mut initial_data = flow_store.get_chunk_root_list()?;
@ -735,7 +742,7 @@ impl LogManager {
sender,
};
log_manager.start_receiver(receiver);
log_manager.start_receiver(receiver, executor);
if let Some(tx) = last_tx_to_insert {
log_manager.revert_to(tx.seq - 1)?;
@ -758,32 +765,39 @@ impl LogManager {
Ok(log_manager)
}
fn start_receiver(&mut self, rx: mpsc::Receiver<UpdateFlowMessage>) {
fn start_receiver(
&mut self,
rx: mpsc::Receiver<UpdateFlowMessage>,
executor: task_executor::TaskExecutor,
) {
let flow_store = self.flow_store.clone();
let handle = thread::spawn(move || -> Result<(), anyhow::Error> {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the root index.
flow_store.put_batch_root_list(data.root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store
.append_entries(ChunkArray {
data: vec![0; data.pad_data],
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
bail!("Receiver error");
}
executor.spawn(
async move {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the root index.
flow_store.put_batch_root_list(data.root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store
.append_entries(ChunkArray {
data: vec![0; data.pad_data],
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
error!("Receiver error");
}
};
}
}
});
},
"pad_tx",
);
// Wait for the spawned thread to finish
let _ = handle.join().expect("Thread panicked");
// let _ = handle.join().expect("Thread panicked");
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {

View File

@ -8,11 +8,15 @@ use ethereum_types::H256;
use rand::random;
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
use std::cmp;
use task_executor::test_utils::TestRuntime;
#[test]
fn test_put_get() {
let config = LogConfig::default();
let store = LogManager::memorydb(config.clone()).unwrap();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(config.clone(), executor).unwrap();
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
// Aligned with size.
let start_offset = 1024;
@ -169,8 +173,10 @@ fn test_put_tx() {
fn create_store() -> LogManager {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(config).unwrap()
LogManager::memorydb(config, executor).unwrap()
}
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {

View File

@ -1622,7 +1622,7 @@ mod tests {
let num_chunks = 123;
let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config).unwrap());
let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
}

View File

@ -1294,7 +1294,9 @@ mod tests {
let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
let executor = runtime.task_executor.clone();
let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let file_location_cache: Arc<FileLocationCache> =

View File

@ -9,6 +9,8 @@ use storage::{
LogManager,
};
use task_executor::test_utils::TestRuntime;
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
/// The first store is for local node, and data not stored. The second store is for peers, and all
/// transactions are finalized for file sync.
@ -22,8 +24,11 @@ pub fn create_2_store(
Vec<Vec<u8>>,
) {
let config = LogConfig::default();
let mut store = LogManager::memorydb(config.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config).unwrap();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config, executor).unwrap();
let mut offset = 1;
let mut txs = vec![];
@ -115,7 +120,10 @@ pub mod tests {
impl TestStoreRuntime {
pub fn new_store() -> impl LogStore {
LogManager::memorydb(LogConfig::default()).unwrap()
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(LogConfig::default(), executor).unwrap()
}
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {