only async full empty segment

This commit is contained in:
Peter Zhang 2024-09-14 14:29:14 +08:00
parent 14d19b812e
commit 9960598d0e
3 changed files with 33 additions and 20 deletions

View File

@ -110,8 +110,7 @@ impl Store {
}
pub async fn get_num_entries(&self) -> Result<u64> {
self.spawn(move |store| store.get_num_entries())
.await
self.spawn(move |store| store.get_num_entries()).await
}
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {

View File

@ -21,9 +21,9 @@ use shared_types::{
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::sync::mpsc;
use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex;
@ -732,7 +732,7 @@ impl LogManager {
tx_store,
flow_store,
merkle,
sender
sender,
};
log_manager.start_receiver(receiver);
@ -769,10 +769,12 @@ impl LogManager {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store.append_entries(ChunkArray {
data: data.pad_data,
start_index: data.tx_start_flow_index,
}).unwrap();
flow_store
.append_entries(ChunkArray {
data: data.pad_data,
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
bail!("Receiver error");
@ -918,6 +920,7 @@ impl LogManager {
);
if extra != 0 {
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree.
@ -929,6 +932,7 @@ impl LogManager {
let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad {
is_full_empty = false;
merkle
.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?);
@ -937,6 +941,7 @@ impl LogManager {
.update_last(*merkle.last_chunk_merkle.root());
} else {
if last_chunk_pad != 0 {
is_full_empty = false;
// Pad the last chunk.
merkle
.last_chunk_merkle
@ -965,13 +970,26 @@ impl LogManager {
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
}
let data_size = pad_data.len() / ENTRY_SIZE;
self.sender.send(UpdateFlowMessage {
root_map,
pad_data: pad_data.to_vec(),
tx_start_flow_index,
})?;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
root_map,
pad_data: pad_data.to_vec(),
tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
self.flow_store
.append_entries(ChunkArray {
data: pad_data.to_vec(),
start_index: tx_start_flow_index,
})
.unwrap();
}
tx_start_flow_index += data_size as u64;
if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index, &mut *merkle)?;

View File

@ -183,11 +183,7 @@ pub trait Store:
LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
{
}
impl<
T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static,
> Store for T
{
}
impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
pub struct MineLoadChunk {
// Use `Vec` instead of array to avoid thread stack overflow.