From 814bc35b3bae78a14ff7769b1403acacc17846ef Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 14 Sep 2024 00:11:31 +0800 Subject: [PATCH 1/4] support async padding --- Cargo.lock | 1 + node/router/src/libp2p_event_handler.rs | 4 +- node/rpc/src/zgs/impl.rs | 2 +- node/storage-async/src/lib.rs | 10 +-- node/storage/Cargo.toml | 1 + node/storage/src/log_store/log_manager.rs | 95 +++++++++++++++++------ node/storage/src/log_store/mod.rs | 21 +++-- node/sync/src/controllers/serial.rs | 2 +- node/sync/src/service.rs | 2 +- 9 files changed, 99 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9dca6e3..b10db2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7243,6 +7243,7 @@ dependencies = [ "static_assertions", "tempdir", "tiny-keccak", + "tokio", "tracing", "typenum", "zgs_seal", diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index ae80c11..eb5dec3 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -423,7 +423,7 @@ impl Libp2pEventHandler { let addr = self.get_listen_addr_or_add().await?; let timestamp = timestamp_now(); - let shard_config = self.store.get_store().flow().get_shard_config(); + let shard_config = self.store.get_store().get_shard_config(); let msg = AnnounceFile { tx_ids, @@ -699,7 +699,7 @@ impl Libp2pEventHandler { } // notify sync layer if shard config matches - let my_shard_config = self.store.get_store().flow().get_shard_config(); + let my_shard_config = self.store.get_store().get_shard_config(); if my_shard_config.intersect(&announced_shard_config) { for tx_id in msg.tx_ids.iter() { self.send_to_sync(SyncMessage::AnnounceFileGossip { diff --git a/node/rpc/src/zgs/impl.rs b/node/rpc/src/zgs/impl.rs index 7d5934f..3e203f1 100644 --- a/node/rpc/src/zgs/impl.rs +++ b/node/rpc/src/zgs/impl.rs @@ -180,7 +180,7 @@ impl RpcServer for RpcServerImpl { async fn get_shard_config(&self) -> RpcResult { debug!("zgs_getShardConfig"); - let shard_config = self.ctx.log_store.get_store().flow().get_shard_config(); + let shard_config = self.ctx.log_store.get_store().get_shard_config(); Ok(shard_config) } diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index 7a3da13..e2ea3d7 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -95,22 +95,22 @@ impl Store { &self, seal_index_max: usize, ) -> anyhow::Result>> { - self.spawn(move |store| store.flow().pull_seal_chunk(seal_index_max)) + self.spawn(move |store| store.pull_seal_chunk(seal_index_max)) .await } pub async fn submit_seal_result(&self, answers: Vec) -> anyhow::Result<()> { - self.spawn(move |store| store.flow().submit_seal_result(answers)) + self.spawn(move |store| store.submit_seal_result(answers)) .await } pub async fn load_sealed_data(&self, chunk_index: u64) -> Result> { - self.spawn(move |store| store.flow().load_sealed_data(chunk_index)) + self.spawn(move |store| store.load_sealed_data(chunk_index)) .await } pub async fn get_num_entries(&self) -> Result { - self.spawn(move |store| store.flow().get_num_entries()) + self.spawn(move |store| store.get_num_entries()) .await } @@ -122,7 +122,7 @@ impl Store { pub async fn update_shard_config(&self, shard_config: ShardConfig) { self.spawn(move |store| { - store.flow().update_shard_config(shard_config); + store.update_shard_config(shard_config); Ok(()) }) .await diff --git a/node/storage/Cargo.toml b/node/storage/Cargo.toml index 8388510..e446b28 100644 --- a/node/storage/Cargo.toml +++ b/node/storage/Cargo.toml @@ -29,6 +29,7 @@ itertools = "0.13.0" serde = { version = "1.0.197", features = ["derive"] } parking_lot = "0.12.3" serde_json = "1.0.127" +tokio = { version = "1.10.0", features = ["sync"] } [dev-dependencies] tempdir = "0.3.7" diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 1c43a7e..3126adf 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -1,3 +1,4 @@ +use crate::config::ShardConfig; use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore}; use crate::log_store::tx_store::TransactionStore; use crate::log_store::{ @@ -21,10 +22,12 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::path::Path; use std::sync::Arc; +use std::thread; +use std::sync::mpsc; use tracing::{debug, error, info, instrument, trace, warn}; use super::tx_store::BlockHashAndSubmissionIndex; -use super::LogStoreInner; +use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask}; /// 256 Bytes pub const ENTRY_SIZE: usize = 256; @@ -45,11 +48,18 @@ pub const COL_NUM: u32 = 9; // Process at most 1M entries (256MB) pad data at a time. const PAD_MAX_SIZE: usize = 1 << 20; +pub struct UpdateFlowMessage { + pub root_map: BTreeMap, + pub pad_data: Vec, + pub tx_start_flow_index: u64, +} + pub struct LogManager { pub(crate) db: Arc, tx_store: TransactionStore, - flow_store: FlowStore, + flow_store: Arc, merkle: RwLock, + sender: mpsc::Sender, } struct MerkleManager { @@ -139,16 +149,6 @@ pub struct LogConfig { pub flow: FlowConfig, } -impl LogStoreInner for LogManager { - fn flow(&self) -> &dyn super::Flow { - &self.flow_store - } - - fn flow_mut(&mut self) -> &mut dyn super::Flow { - &mut self.flow_store - } -} - impl LogStoreChunkWrite for LogManager { fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> { let mut merkle = self.merkle.write(); @@ -389,6 +389,14 @@ impl LogStoreWrite for LogManager { fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { self.tx_store.delete_block_hash_by_number(block_number) } + + fn update_shard_config(&self, shard_config: ShardConfig) { + self.flow_store.update_shard_config(shard_config) + } + + fn submit_seal_result(&self, answers: Vec) -> Result<()> { + self.flow_store.submit_seal_result(answers) + } } impl LogStoreChunkRead for LogManager { @@ -579,6 +587,22 @@ impl LogStoreRead for LogManager { fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result { self.tx_store.check_tx_pruned(tx_seq) } + + fn pull_seal_chunk(&self, seal_index_max: usize) -> Result>> { + self.flow_store.pull_seal_chunk(seal_index_max) + } + + fn get_num_entries(&self) -> Result { + self.flow_store.get_num_entries() + } + + fn load_sealed_data(&self, chunk_index: u64) -> Result> { + self.flow_store.load_sealed_data(chunk_index) + } + + fn get_shard_config(&self) -> ShardConfig { + self.flow_store.get_shard_config() + } } impl LogManager { @@ -596,7 +620,7 @@ impl LogManager { fn new(db: Arc, config: LogConfig) -> Result { let tx_store = TransactionStore::new(db.clone())?; - let flow_store = FlowStore::new(db.clone(), config.flow); + let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow)); let mut initial_data = flow_store.get_chunk_root_list()?; // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` @@ -700,13 +724,19 @@ impl LogManager { pora_chunks_merkle, last_chunk_merkle, }); - let log_manager = Self { + + let (sender, receiver) = mpsc::channel(); + + let mut log_manager = Self { db, tx_store, flow_store, merkle, + sender }; + log_manager.start_receiver(receiver); + if let Some(tx) = last_tx_to_insert { log_manager.put_tx(tx)?; let mut merkle = log_manager.merkle.write(); @@ -727,6 +757,30 @@ impl LogManager { Ok(log_manager) } + fn start_receiver(&mut self, rx: mpsc::Receiver) { + let flow_store = self.flow_store.clone(); + thread::spawn(move || -> Result<(), anyhow::Error> { + loop { + match rx.recv() { + std::result::Result::Ok(data) => { + // Update the root index. + flow_store.put_batch_root_list(data.root_map).unwrap(); + // Update the flow database. + // This should be called before `complete_last_chunk_merkle` so that we do not save + // subtrees with data known. + flow_store.append_entries(ChunkArray { + data: data.pad_data, + start_index: data.tx_start_flow_index, + }).unwrap(); + } + std::result::Result::Err(_) => { + bail!("Receiver error"); + } + } + } + }); + } + fn gen_proof(&self, flow_index: u64, maybe_root: Option) -> Result { match maybe_root { None => self.gen_proof_at_version(flow_index, None), @@ -910,15 +964,12 @@ impl LogManager { assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); } - // Update the root index. - self.flow_store.put_batch_root_list(root_map)?; - // Update the flow database. - // This should be called before `complete_last_chunk_merkle` so that we do not save - // subtrees with data known. + let data_size = pad_data.len() / ENTRY_SIZE; - self.flow_store.append_entries(ChunkArray { - data: pad_data, - start_index: tx_start_flow_index, + self.sender.send(UpdateFlowMessage { + root_map, + pad_data: pad_data.to_vec(), + tx_start_flow_index, })?; tx_start_flow_index += data_size as u64; if let Some(index) = completed_chunk_index { diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index a2a6442..26280a0 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -76,6 +76,14 @@ pub trait LogStoreRead: LogStoreChunkRead { /// Return flow root and length. fn get_context(&self) -> Result<(DataRoot, u64)>; + + fn pull_seal_chunk(&self, seal_index_max: usize) -> Result>>; + + fn get_num_entries(&self) -> Result; + + fn load_sealed_data(&self, chunk_index: u64) -> Result>; + + fn get_shard_config(&self) -> ShardConfig; } pub trait LogStoreChunkRead { @@ -145,6 +153,10 @@ pub trait LogStoreWrite: LogStoreChunkWrite { ) -> Result; fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>; + + fn update_shard_config(&self, shard_config: ShardConfig); + + fn submit_seal_result(&self, answers: Vec) -> Result<()>; } pub trait LogStoreChunkWrite { @@ -168,20 +180,15 @@ pub trait LogChunkStore: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + impl LogChunkStore for T {} pub trait Store: - LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static + LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static { } impl< - T: LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static, + T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static, > Store for T { } -pub trait LogStoreInner { - fn flow(&self) -> &dyn Flow; - fn flow_mut(&mut self) -> &mut dyn Flow; -} - pub struct MineLoadChunk { // Use `Vec` instead of array to avoid thread stack overflow. pub loaded_chunk: Vec<[u8; BYTES_PER_SEAL]>, diff --git a/node/sync/src/controllers/serial.rs b/node/sync/src/controllers/serial.rs index f45d989..55a080c 100644 --- a/node/sync/src/controllers/serial.rs +++ b/node/sync/src/controllers/serial.rs @@ -478,7 +478,7 @@ impl SerialSyncController { metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0); - let shard_config = self.store.get_store().flow().get_shard_config(); + let shard_config = self.store.get_store().get_shard_config(); let next_chunk = segment_to_sector(shard_config.next_segment_index( sector_to_segment(from_chunk), sector_to_segment(self.tx_start_chunk_in_flow), diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index de362c7..4132433 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -807,7 +807,7 @@ impl SyncService { } async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result> { - let shard_config = store.get_store().flow().get_shard_config(); + let shard_config = store.get_store().get_shard_config(); let start_segment = sector_to_segment(tx.start_entry_index()); let end = bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?); From f29af9c872eec6c1d133ae57a418afc90f12ce0c Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 14 Sep 2024 14:29:14 +0800 Subject: [PATCH 2/4] only async full empty segment --- node/storage-async/src/lib.rs | 3 +- node/storage/src/log_store/log_manager.rs | 44 ++++++++++++++++------- node/storage/src/log_store/mod.rs | 6 +--- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index e2ea3d7..95cf6b5 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -110,8 +110,7 @@ impl Store { } pub async fn get_num_entries(&self) -> Result { - self.spawn(move |store| store.get_num_entries()) - .await + self.spawn(move |store| store.get_num_entries()).await } pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 3126adf..0aa0bba 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -21,9 +21,9 @@ use shared_types::{ use std::cmp::Ordering; use std::collections::BTreeMap; use std::path::Path; +use std::sync::mpsc; use std::sync::Arc; use std::thread; -use std::sync::mpsc; use tracing::{debug, error, info, instrument, trace, warn}; use super::tx_store::BlockHashAndSubmissionIndex; @@ -393,7 +393,7 @@ impl LogStoreWrite for LogManager { fn update_shard_config(&self, shard_config: ShardConfig) { self.flow_store.update_shard_config(shard_config) } - + fn submit_seal_result(&self, answers: Vec) -> Result<()> { self.flow_store.submit_seal_result(answers) } @@ -732,7 +732,7 @@ impl LogManager { tx_store, flow_store, merkle, - sender + sender, }; log_manager.start_receiver(receiver); @@ -768,10 +768,12 @@ impl LogManager { // Update the flow database. // This should be called before `complete_last_chunk_merkle` so that we do not save // subtrees with data known. - flow_store.append_entries(ChunkArray { - data: data.pad_data, - start_index: data.tx_start_flow_index, - }).unwrap(); + flow_store + .append_entries(ChunkArray { + data: data.pad_data, + start_index: data.tx_start_flow_index, + }) + .unwrap(); } std::result::Result::Err(_) => { bail!("Receiver error"); @@ -917,6 +919,7 @@ impl LogManager { ); if extra != 0 { for pad_data in Self::padding((first_subtree_size - extra) as usize) { + let mut is_full_empty = true; let mut root_map = BTreeMap::new(); // Update the in-memory merkle tree. @@ -928,6 +931,7 @@ impl LogManager { let mut completed_chunk_index = None; if pad_data.len() < last_chunk_pad { + is_full_empty = false; merkle .last_chunk_merkle .append_list(data_to_merkle_leaves(&pad_data)?); @@ -936,6 +940,7 @@ impl LogManager { .update_last(*merkle.last_chunk_merkle.root()); } else { if last_chunk_pad != 0 { + is_full_empty = false; // Pad the last chunk. merkle .last_chunk_merkle @@ -964,13 +969,26 @@ impl LogManager { assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); } - let data_size = pad_data.len() / ENTRY_SIZE; - self.sender.send(UpdateFlowMessage { - root_map, - pad_data: pad_data.to_vec(), - tx_start_flow_index, - })?; + if is_full_empty { + self.sender.send(UpdateFlowMessage { + root_map, + pad_data: pad_data.to_vec(), + tx_start_flow_index, + })?; + } else { + self.flow_store.put_batch_root_list(root_map).unwrap(); + // Update the flow database. + // This should be called before `complete_last_chunk_merkle` so that we do not save + // subtrees with data known. + self.flow_store + .append_entries(ChunkArray { + data: pad_data.to_vec(), + start_index: tx_start_flow_index, + }) + .unwrap(); + } + tx_start_flow_index += data_size as u64; if let Some(index) = completed_chunk_index { self.complete_last_chunk_merkle(index, &mut *merkle)?; diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 26280a0..48581e0 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -183,11 +183,7 @@ pub trait Store: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static { } -impl< - T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static, - > Store for T -{ -} +impl Store for T {} pub struct MineLoadChunk { // Use `Vec` instead of array to avoid thread stack overflow. From e58f6ca1019e7057d4233645abf693480a0f9fd9 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 14 Sep 2024 17:08:30 +0800 Subject: [PATCH 3/4] add missing padded subtree leaves to root map when recover the merkle --- node/storage/src/log_store/flow_store.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index cdbc672..8083af7 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -3,7 +3,8 @@ use super::{MineLoadChunk, SealAnswer, SealTask}; use crate::config::ShardConfig; use crate::error::Error; use crate::log_store::log_manager::{ - bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE, + bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, + COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE, }; use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::{try_option, ZgsKeyValueDB}; @@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Result}; use append_merkle::{MerkleTreeInitialData, MerkleTreeRead}; use itertools::Itertools; use parking_lot::RwLock; -use shared_types::{ChunkArray, DataRoot, FlowProof}; +use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle}; use ssz::{Decode, Encode}; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; use std::cmp::Ordering; @@ -441,6 +442,10 @@ impl FlowDBStore { // and they will be updated in the merkle tree with `fill_leaf` by the caller. let mut leaf_list = Vec::new(); let mut expected_index = 0; + + let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE]; + let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root(); + for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) { let (index_bytes, root_bytes) = r?; let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?; @@ -475,12 +480,14 @@ impl FlowDBStore { expected_index += 1; } Ordering::Greater => { - bail!( - "unexpected chunk leaf in range, expected={}, get={}, range={:?}", - expected_index, - batch_index, - range_root, - ); + while batch_index > expected_index { + // Fill the gap with empty leaves. + root_list.push((1, empty_root)); + expected_index += 1; + } + range_root = None; + root_list.push((1, root)); + expected_index += 1; } } } From 93885bb25ca649707a73ec10b4fefb58afa33ed7 Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 14 Sep 2024 17:38:29 +0800 Subject: [PATCH 4/4] range root should also add back missing padded data --- node/storage/src/log_store/flow_store.rs | 13 ++++++------- node/storage/src/log_store/log_manager.rs | 6 +++--- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 8083af7..11f20f0 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -491,16 +491,15 @@ impl FlowDBStore { } } } - } else if expected_index == batch_index { + } else { + while batch_index > expected_index { + // Fill the gap with empty leaves. + root_list.push((1, empty_root)); + expected_index += 1; + } range_root = Some(BatchRoot::Multiple((subtree_depth, root))); root_list.push((subtree_depth, root)); expected_index += 1 << (subtree_depth - 1); - } else { - bail!( - "unexpected range root: expected={} get={}", - expected_index, - batch_index - ); } } let extra_node_list = self.get_mpt_node_list()?; diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 0aa0bba..6471667 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -50,7 +50,7 @@ const PAD_MAX_SIZE: usize = 1 << 20; pub struct UpdateFlowMessage { pub root_map: BTreeMap, - pub pad_data: Vec, + pub pad_data: usize, pub tx_start_flow_index: u64, } @@ -770,7 +770,7 @@ impl LogManager { // subtrees with data known. flow_store .append_entries(ChunkArray { - data: data.pad_data, + data: vec![0; data.pad_data], start_index: data.tx_start_flow_index, }) .unwrap(); @@ -973,7 +973,7 @@ impl LogManager { if is_full_empty { self.sender.send(UpdateFlowMessage { root_map, - pad_data: pad_data.to_vec(), + pad_data: pad_data.len(), tx_start_flow_index, })?; } else {