only async full empty segment

This commit is contained in:
Peter Zhang 2024-09-14 14:29:14 +08:00
parent 814bc35b3b
commit f29af9c872
3 changed files with 33 additions and 20 deletions

View File

@ -110,8 +110,7 @@ impl Store {
} }
pub async fn get_num_entries(&self) -> Result<u64> { pub async fn get_num_entries(&self) -> Result<u64> {
self.spawn(move |store| store.get_num_entries()) self.spawn(move |store| store.get_num_entries()).await
.await
} }
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> { pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {

View File

@ -21,9 +21,9 @@ use shared_types::{
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::sync::mpsc;
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex; use super::tx_store::BlockHashAndSubmissionIndex;
@ -393,7 +393,7 @@ impl LogStoreWrite for LogManager {
fn update_shard_config(&self, shard_config: ShardConfig) { fn update_shard_config(&self, shard_config: ShardConfig) {
self.flow_store.update_shard_config(shard_config) self.flow_store.update_shard_config(shard_config)
} }
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> { fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.flow_store.submit_seal_result(answers) self.flow_store.submit_seal_result(answers)
} }
@ -732,7 +732,7 @@ impl LogManager {
tx_store, tx_store,
flow_store, flow_store,
merkle, merkle,
sender sender,
}; };
log_manager.start_receiver(receiver); log_manager.start_receiver(receiver);
@ -768,10 +768,12 @@ impl LogManager {
// Update the flow database. // Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save // This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known. // subtrees with data known.
flow_store.append_entries(ChunkArray { flow_store
data: data.pad_data, .append_entries(ChunkArray {
start_index: data.tx_start_flow_index, data: data.pad_data,
}).unwrap(); start_index: data.tx_start_flow_index,
})
.unwrap();
} }
std::result::Result::Err(_) => { std::result::Result::Err(_) => {
bail!("Receiver error"); bail!("Receiver error");
@ -917,6 +919,7 @@ impl LogManager {
); );
if extra != 0 { if extra != 0 {
for pad_data in Self::padding((first_subtree_size - extra) as usize) { for pad_data in Self::padding((first_subtree_size - extra) as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new(); let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree. // Update the in-memory merkle tree.
@ -928,6 +931,7 @@ impl LogManager {
let mut completed_chunk_index = None; let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad { if pad_data.len() < last_chunk_pad {
is_full_empty = false;
merkle merkle
.last_chunk_merkle .last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?); .append_list(data_to_merkle_leaves(&pad_data)?);
@ -936,6 +940,7 @@ impl LogManager {
.update_last(*merkle.last_chunk_merkle.root()); .update_last(*merkle.last_chunk_merkle.root());
} else { } else {
if last_chunk_pad != 0 { if last_chunk_pad != 0 {
is_full_empty = false;
// Pad the last chunk. // Pad the last chunk.
merkle merkle
.last_chunk_merkle .last_chunk_merkle
@ -964,13 +969,26 @@ impl LogManager {
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
} }
let data_size = pad_data.len() / ENTRY_SIZE; let data_size = pad_data.len() / ENTRY_SIZE;
self.sender.send(UpdateFlowMessage { if is_full_empty {
root_map, self.sender.send(UpdateFlowMessage {
pad_data: pad_data.to_vec(), root_map,
tx_start_flow_index, pad_data: pad_data.to_vec(),
})?; tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
self.flow_store
.append_entries(ChunkArray {
data: pad_data.to_vec(),
start_index: tx_start_flow_index,
})
.unwrap();
}
tx_start_flow_index += data_size as u64; tx_start_flow_index += data_size as u64;
if let Some(index) = completed_chunk_index { if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index, &mut *merkle)?; self.complete_last_chunk_merkle(index, &mut *merkle)?;

View File

@ -183,11 +183,7 @@ pub trait Store:
LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
{ {
} }
impl< impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static,
> Store for T
{
}
pub struct MineLoadChunk { pub struct MineLoadChunk {
// Use `Vec` instead of array to avoid thread stack overflow. // Use `Vec` instead of array to avoid thread stack overflow.