From f29af9c872eec6c1d133ae57a418afc90f12ce0c Mon Sep 17 00:00:00 2001 From: Peter Zhang Date: Sat, 14 Sep 2024 14:29:14 +0800 Subject: [PATCH] only async full empty segment --- node/storage-async/src/lib.rs | 3 +- node/storage/src/log_store/log_manager.rs | 44 ++++++++++++++++------- node/storage/src/log_store/mod.rs | 6 +--- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/node/storage-async/src/lib.rs b/node/storage-async/src/lib.rs index e2ea3d7..95cf6b5 100644 --- a/node/storage-async/src/lib.rs +++ b/node/storage-async/src/lib.rs @@ -110,8 +110,7 @@ impl Store { } pub async fn get_num_entries(&self) -> Result { - self.spawn(move |store| store.get_num_entries()) - .await + self.spawn(move |store| store.get_num_entries()).await } pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 3126adf..0aa0bba 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -21,9 +21,9 @@ use shared_types::{ use std::cmp::Ordering; use std::collections::BTreeMap; use std::path::Path; +use std::sync::mpsc; use std::sync::Arc; use std::thread; -use std::sync::mpsc; use tracing::{debug, error, info, instrument, trace, warn}; use super::tx_store::BlockHashAndSubmissionIndex; @@ -393,7 +393,7 @@ impl LogStoreWrite for LogManager { fn update_shard_config(&self, shard_config: ShardConfig) { self.flow_store.update_shard_config(shard_config) } - + fn submit_seal_result(&self, answers: Vec) -> Result<()> { self.flow_store.submit_seal_result(answers) } @@ -732,7 +732,7 @@ impl LogManager { tx_store, flow_store, merkle, - sender + sender, }; log_manager.start_receiver(receiver); @@ -768,10 +768,12 @@ impl LogManager { // Update the flow database. // This should be called before `complete_last_chunk_merkle` so that we do not save // subtrees with data known. - flow_store.append_entries(ChunkArray { - data: data.pad_data, - start_index: data.tx_start_flow_index, - }).unwrap(); + flow_store + .append_entries(ChunkArray { + data: data.pad_data, + start_index: data.tx_start_flow_index, + }) + .unwrap(); } std::result::Result::Err(_) => { bail!("Receiver error"); @@ -917,6 +919,7 @@ impl LogManager { ); if extra != 0 { for pad_data in Self::padding((first_subtree_size - extra) as usize) { + let mut is_full_empty = true; let mut root_map = BTreeMap::new(); // Update the in-memory merkle tree. @@ -928,6 +931,7 @@ impl LogManager { let mut completed_chunk_index = None; if pad_data.len() < last_chunk_pad { + is_full_empty = false; merkle .last_chunk_merkle .append_list(data_to_merkle_leaves(&pad_data)?); @@ -936,6 +940,7 @@ impl LogManager { .update_last(*merkle.last_chunk_merkle.root()); } else { if last_chunk_pad != 0 { + is_full_empty = false; // Pad the last chunk. merkle .last_chunk_merkle @@ -964,13 +969,26 @@ impl LogManager { assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); } - let data_size = pad_data.len() / ENTRY_SIZE; - self.sender.send(UpdateFlowMessage { - root_map, - pad_data: pad_data.to_vec(), - tx_start_flow_index, - })?; + if is_full_empty { + self.sender.send(UpdateFlowMessage { + root_map, + pad_data: pad_data.to_vec(), + tx_start_flow_index, + })?; + } else { + self.flow_store.put_batch_root_list(root_map).unwrap(); + // Update the flow database. + // This should be called before `complete_last_chunk_merkle` so that we do not save + // subtrees with data known. + self.flow_store + .append_entries(ChunkArray { + data: pad_data.to_vec(), + start_index: tx_start_flow_index, + }) + .unwrap(); + } + tx_start_flow_index += data_size as u64; if let Some(index) = completed_chunk_index { self.complete_last_chunk_merkle(index, &mut *merkle)?; diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 26280a0..48581e0 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -183,11 +183,7 @@ pub trait Store: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static { } -impl< - T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static, - > Store for T -{ -} +impl Store for T {} pub struct MineLoadChunk { // Use `Vec` instead of array to avoid thread stack overflow.