Compare commits

...

3 Commits

Author SHA1 Message Date
Peter Zhang
93885bb25c range root should also add back missing padded data 2024-09-14 17:38:29 +08:00
Peter Zhang
e58f6ca101 add missing padded subtree leaves to root map when recover the merkle 2024-09-14 17:08:30 +08:00
Peter Zhang
f29af9c872 only async full empty segment 2024-09-14 14:29:14 +08:00
4 changed files with 55 additions and 36 deletions

View File

@ -110,8 +110,7 @@ impl Store {
} }
pub async fn get_num_entries(&self) -> Result<u64> { pub async fn get_num_entries(&self) -> Result<u64> {
self.spawn(move |store| store.get_num_entries()) self.spawn(move |store| store.get_num_entries()).await
.await
} }
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> { pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {

View File

@ -3,7 +3,8 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig; use crate::config::ShardConfig;
use crate::error::Error; use crate::error::Error;
use crate::log_store::log_manager::{ use crate::log_store::log_manager::{
bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE, bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
}; };
use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB}; use crate::{try_option, ZgsKeyValueDB};
@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead}; use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
use itertools::Itertools; use itertools::Itertools;
use parking_lot::RwLock; use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof}; use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::cmp::Ordering; use std::cmp::Ordering;
@ -441,6 +442,10 @@ impl FlowDBStore {
// and they will be updated in the merkle tree with `fill_leaf` by the caller. // and they will be updated in the merkle tree with `fill_leaf` by the caller.
let mut leaf_list = Vec::new(); let mut leaf_list = Vec::new();
let mut expected_index = 0; let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) { for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?; let (index_bytes, root_bytes) = r?;
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?; let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
@ -475,25 +480,26 @@ impl FlowDBStore {
expected_index += 1; expected_index += 1;
} }
Ordering::Greater => { Ordering::Greater => {
bail!( while batch_index > expected_index {
"unexpected chunk leaf in range, expected={}, get={}, range={:?}", // Fill the gap with empty leaves.
expected_index, root_list.push((1, empty_root));
batch_index, expected_index += 1;
range_root, }
); range_root = None;
root_list.push((1, root));
expected_index += 1;
} }
} }
} }
} else if expected_index == batch_index { } else {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = Some(BatchRoot::Multiple((subtree_depth, root))); range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
root_list.push((subtree_depth, root)); root_list.push((subtree_depth, root));
expected_index += 1 << (subtree_depth - 1); expected_index += 1 << (subtree_depth - 1);
} else {
bail!(
"unexpected range root: expected={} get={}",
expected_index,
batch_index
);
} }
} }
let extra_node_list = self.get_mpt_node_list()?; let extra_node_list = self.get_mpt_node_list()?;

View File

@ -21,9 +21,9 @@ use shared_types::{
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::sync::mpsc;
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex; use super::tx_store::BlockHashAndSubmissionIndex;
@ -50,7 +50,7 @@ const PAD_MAX_SIZE: usize = 1 << 20;
pub struct UpdateFlowMessage { pub struct UpdateFlowMessage {
pub root_map: BTreeMap<usize, (H256, usize)>, pub root_map: BTreeMap<usize, (H256, usize)>,
pub pad_data: Vec<u8>, pub pad_data: usize,
pub tx_start_flow_index: u64, pub tx_start_flow_index: u64,
} }
@ -393,7 +393,7 @@ impl LogStoreWrite for LogManager {
fn update_shard_config(&self, shard_config: ShardConfig) { fn update_shard_config(&self, shard_config: ShardConfig) {
self.flow_store.update_shard_config(shard_config) self.flow_store.update_shard_config(shard_config)
} }
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> { fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.flow_store.submit_seal_result(answers) self.flow_store.submit_seal_result(answers)
} }
@ -732,7 +732,7 @@ impl LogManager {
tx_store, tx_store,
flow_store, flow_store,
merkle, merkle,
sender sender,
}; };
log_manager.start_receiver(receiver); log_manager.start_receiver(receiver);
@ -768,10 +768,12 @@ impl LogManager {
// Update the flow database. // Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save // This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known. // subtrees with data known.
flow_store.append_entries(ChunkArray { flow_store
data: data.pad_data, .append_entries(ChunkArray {
start_index: data.tx_start_flow_index, data: vec![0; data.pad_data],
}).unwrap(); start_index: data.tx_start_flow_index,
})
.unwrap();
} }
std::result::Result::Err(_) => { std::result::Result::Err(_) => {
bail!("Receiver error"); bail!("Receiver error");
@ -917,6 +919,7 @@ impl LogManager {
); );
if extra != 0 { if extra != 0 {
for pad_data in Self::padding((first_subtree_size - extra) as usize) { for pad_data in Self::padding((first_subtree_size - extra) as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new(); let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree. // Update the in-memory merkle tree.
@ -928,6 +931,7 @@ impl LogManager {
let mut completed_chunk_index = None; let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad { if pad_data.len() < last_chunk_pad {
is_full_empty = false;
merkle merkle
.last_chunk_merkle .last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?); .append_list(data_to_merkle_leaves(&pad_data)?);
@ -936,6 +940,7 @@ impl LogManager {
.update_last(*merkle.last_chunk_merkle.root()); .update_last(*merkle.last_chunk_merkle.root());
} else { } else {
if last_chunk_pad != 0 { if last_chunk_pad != 0 {
is_full_empty = false;
// Pad the last chunk. // Pad the last chunk.
merkle merkle
.last_chunk_merkle .last_chunk_merkle
@ -964,13 +969,26 @@ impl LogManager {
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
} }
let data_size = pad_data.len() / ENTRY_SIZE; let data_size = pad_data.len() / ENTRY_SIZE;
self.sender.send(UpdateFlowMessage { if is_full_empty {
root_map, self.sender.send(UpdateFlowMessage {
pad_data: pad_data.to_vec(), root_map,
tx_start_flow_index, pad_data: pad_data.len(),
})?; tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
self.flow_store
.append_entries(ChunkArray {
data: pad_data.to_vec(),
start_index: tx_start_flow_index,
})
.unwrap();
}
tx_start_flow_index += data_size as u64; tx_start_flow_index += data_size as u64;
if let Some(index) = completed_chunk_index { if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index, &mut *merkle)?; self.complete_last_chunk_merkle(index, &mut *merkle)?;

View File

@ -183,11 +183,7 @@ pub trait Store:
LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
{ {
} }
impl< impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static,
> Store for T
{
}
pub struct MineLoadChunk { pub struct MineLoadChunk {
// Use `Vec` instead of array to avoid thread stack overflow. // Use `Vec` instead of array to avoid thread stack overflow.