From 814bc35b3bae78a14ff7769b1403acacc17846ef Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 14 Sep 2024 00:11:31 +0800 Subject: [PATCH] support async padding --- Cargo.lock | 1 + node/router/src/libp2p_event_handler.rs | 4 +- node/rpc/src/zgs/impl.rs | 2 +- node/storage-async/src/lib.rs | 10 +-- node/storage/Cargo.toml | 1 + node/storage/src/log_store/log_manager.rs | 95 +++++++++++++++++------ node/storage/src/log_store/mod.rs | 21 +++-- node/sync/src/controllers/serial.rs | 2 +- node/sync/src/service.rs | 2 +- 9 files changed, 99 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9dca6e3..b10db2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7243,6 +7243,7 @@ dependencies = [ "static_assertions", "tempdir", "tiny-keccak", + "tokio", "tracing", "typenum", "zgs_seal", diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index ae80c11..eb5dec3 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -423,7 +423,7 @@ impl Libp2pEventHandler { let addr = self.get_listen_addr_or_add().await?; let timestamp = timestamp_now(); - let shard_config = self.store.get_store().flow().get_shard_config(); + let shard_config = self.store.get_store().get_shard_config(); let msg = AnnounceFile { tx_ids, @@ -699,7 +699,7 @@ impl Libp2pEventHandler { } // notify sync layer if shard config matches - let my_shard_config = self.store.get_store().flow().get_shard_config(); + let my_shard_config = self.store.get_store().get_shard_config(); if my_shard_config.intersect(&announced_shard_config) { for tx_id in msg.tx_ids.iter() { self.send_to_sync(SyncMessage::AnnounceFileGossip { diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 7d5934f..3e203f1 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -180,7 +180,7 @@ impl RpcServer for RpcServerImpl { async fn get_shard_config(&self) -> RpcResult { debug!("zgs_getShardConfig"); - let shard_config = self.ctx.log_store.get_store().flow().get_shard_config(); + let shard_config = self.ctx.log_store.get_store().get_shard_config(); Ok(shard_config) } diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 7a3da13..e2ea3d7 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -95,22 +95,22 @@ impl Store { &self, seal_index_max: usize, ) -> anyhow::Result>> { - self.spawn(move |store| store.flow().pull_seal_chunk(seal_index_max)) + self.spawn(move |store| store.pull_seal_chunk(seal_index_max)) .await } pub async fn submit_seal_result(&self, answers: Vec) -> anyhow::Result<()> { - self.spawn(move |store| store.flow().submit_seal_result(answers)) + self.spawn(move |store| store.submit_seal_result(answers)) .await } pub async fn load_sealed_data(&self, chunk_index: u64) -> Result> { - self.spawn(move |store| store.flow().load_sealed_data(chunk_index)) + self.spawn(move |store| store.load_sealed_data(chunk_index)) .await } pub async fn get_num_entries(&self) -> Result { - self.spawn(move |store| store.flow().get_num_entries()) + self.spawn(move |store| store.get_num_entries()) .await } @@ -122,7 +122,7 @@ impl Store { pub async fn update_shard_config(&self, shard_config: ShardConfig) { self.spawn(move |store| { - store.flow().update_shard_config(shard_config); + store.update_shard_config(shard_config); Ok(()) }) .await diff --git a/node/storage/Cargo.toml b/node/storage/Cargo.toml index 8388510..e446b28 100644 --- a/node/storage/Cargo.toml +++ b/node/storage/Cargo.toml @@ -29,6 +29,7 @@ itertools = "0.13.0" serde = { version = "1.0.197", features = ["derive"] } parking_lot = "0.12.3" serde_json = "1.0.127" +tokio = { version = "1.10.0", features = ["sync"] } [dev-dependencies] tempdir = "0.3.7" diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 1c43a7e..3126adf 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1,3 +1,4 @@ +use crate::config::ShardConfig; use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore}; use crate::log_store::tx_store::TransactionStore; use crate::log_store::{ @@ -21,10 +22,12 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::path::Path; use std::sync::Arc; +use std::thread; +use std::sync::mpsc; use tracing::{debug, error, info, instrument, trace, warn}; use super::tx_store::BlockHashAndSubmissionIndex; -use super::LogStoreInner; +use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; /// 256 Bytes pub const ENTRY_SIZE: usize = 256; @@ -45,11 +48,18 @@ pub const COL_NUM: u32 = 9; // Process at most 1M entries (256MB) pad data at a time. const PAD_MAX_SIZE: usize = 1 << 20; +pub struct UpdateFlowMessage { + pub root_map: BTreeMap, + pub pad_data: Vec, + pub tx_start_flow_index: u64, +} + pub struct LogManager { pub(crate) db: Arc, tx_store: TransactionStore, - flow_store: FlowStore, + flow_store: Arc, merkle: RwLock, + sender: mpsc::Sender, } struct MerkleManager { @@ -139,16 +149,6 @@ pub struct LogConfig { pub flow: FlowConfig, } -impl LogStoreInner for LogManager { - fn flow(&self) -> &dyn super::Flow { - &self.flow_store - } - - fn flow_mut(&mut self) -> &mut dyn super::Flow { - &mut self.flow_store - } -} - impl LogStoreChunkWrite for LogManager { fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> { let mut merkle = self.merkle.write(); @@ -389,6 +389,14 @@ impl LogStoreWrite for LogManager { fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { self.tx_store.delete_block_hash_by_number(block_number) } + + fn update_shard_config(&self, shard_config: ShardConfig) { + self.flow_store.update_shard_config(shard_config) + } + + fn submit_seal_result(&self, answers: Vec) -> Result<()> { + self.flow_store.submit_seal_result(answers) + } } impl LogStoreChunkRead for LogManager { @@ -579,6 +587,22 @@ impl LogStoreRead for LogManager { fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result { self.tx_store.check_tx_pruned(tx_seq) } + + fn pull_seal_chunk(&self, seal_index_max: usize) -> Result>> { + self.flow_store.pull_seal_chunk(seal_index_max) + } + + fn get_num_entries(&self) -> Result { + self.flow_store.get_num_entries() + } + + fn load_sealed_data(&self, chunk_index: u64) -> Result> { + self.flow_store.load_sealed_data(chunk_index) + } + + fn get_shard_config(&self) -> ShardConfig { + self.flow_store.get_shard_config() + } } impl LogManager { @@ -596,7 +620,7 @@ impl LogManager { fn new(db: Arc, config: LogConfig) -> Result { let tx_store = TransactionStore::new(db.clone())?; - let flow_store = FlowStore::new(db.clone(), config.flow); + let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow)); let mut initial_data = flow_store.get_chunk_root_list()?; // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` @@ -700,13 +724,19 @@ impl LogManager { pora_chunks_merkle, last_chunk_merkle, }); - let log_manager = Self { + + let (sender, receiver) = mpsc::channel(); + + let mut log_manager = Self { db, tx_store, flow_store, merkle, + sender }; + log_manager.start_receiver(receiver); + if let Some(tx) = last_tx_to_insert { log_manager.put_tx(tx)?; let mut merkle = log_manager.merkle.write(); @@ -727,6 +757,30 @@ impl LogManager { Ok(log_manager) } + fn start_receiver(&mut self, rx: mpsc::Receiver) { + let flow_store = self.flow_store.clone(); + thread::spawn(move || -> Result<(), anyhow::Error> { + loop { + match rx.recv() { + std::result::Result::Ok(data) => { + // Update the root index. + flow_store.put_batch_root_list(data.root_map).unwrap(); + // Update the flow database. + // This should be called before `complete_last_chunk_merkle` so that we do not save + // subtrees with data known. + flow_store.append_entries(ChunkArray { + data: data.pad_data, + start_index: data.tx_start_flow_index, + }).unwrap(); + } + std::result::Result::Err(_) => { + bail!("Receiver error"); + } + } + } + }); + } + fn gen_proof(&self, flow_index: u64, maybe_root: Option) -> Result { match maybe_root { None => self.gen_proof_at_version(flow_index, None), @@ -910,15 +964,12 @@ impl LogManager { assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); } - // Update the root index. - self.flow_store.put_batch_root_list(root_map)?; - // Update the flow database. - // This should be called before `complete_last_chunk_merkle` so that we do not save - // subtrees with data known. + let data_size = pad_data.len() / ENTRY_SIZE; - self.flow_store.append_entries(ChunkArray { - data: pad_data, - start_index: tx_start_flow_index, + self.sender.send(UpdateFlowMessage { + root_map, + pad_data: pad_data.to_vec(), + tx_start_flow_index, })?; tx_start_flow_index += data_size as u64; if let Some(index) = completed_chunk_index { diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index a2a6442..26280a0 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -76,6 +76,14 @@ pub trait LogStoreRead: LogStoreChunkRead { /// Return flow root and length. fn get_context(&self) -> Result<(DataRoot, u64)>; + + fn pull_seal_chunk(&self, seal_index_max: usize) -> Result>>; + + fn get_num_entries(&self) -> Result; + + fn load_sealed_data(&self, chunk_index: u64) -> Result>; + + fn get_shard_config(&self) -> ShardConfig; } pub trait LogStoreChunkRead { @@ -145,6 +153,10 @@ pub trait LogStoreWrite: LogStoreChunkWrite { ) -> Result; fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>; + + fn update_shard_config(&self, shard_config: ShardConfig); + + fn submit_seal_result(&self, answers: Vec) -> Result<()>; } pub trait LogStoreChunkWrite { @@ -168,20 +180,15 @@ pub trait LogChunkStore: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + impl LogChunkStore for T {} pub trait Store: - LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static + LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static { } impl< - T: LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static, + T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static, > Store for T { } -pub trait LogStoreInner { - fn flow(&self) -> &dyn Flow; - fn flow_mut(&mut self) -> &mut dyn Flow; -} - pub struct MineLoadChunk { // Use `Vec` instead of array to avoid thread stack overflow. pub loaded_chunk: Vec<[u8; BYTES_PER_SEAL]>, diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index f45d989..55a080c 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -478,7 +478,7 @@ impl SerialSyncController { metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0); - let shard_config = self.store.get_store().flow().get_shard_config(); + let shard_config = self.store.get_store().get_shard_config(); let next_chunk = segment_to_sector(shard_config.next_segment_index( sector_to_segment(from_chunk), sector_to_segment(self.tx_start_chunk_in_flow), diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index de362c7..4132433 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -807,7 +807,7 @@ impl SyncService { } async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result> { - let shard_config = store.get_store().flow().get_shard_config(); + let shard_config = store.get_store().get_shard_config(); let start_segment = sector_to_segment(tx.start_entry_index()); let end = bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);