Compare commits

...

4 Commits

Author SHA1 Message Date
0g-peterzhb
d35633c335
Merge e2e2f3d84c into da2cdec8a1 2024-10-30 00:24:50 +00:00
Peter Zhang
e2e2f3d84c separate data db from flow db 2024-10-30 08:24:52 +08:00
Peter Zhang
df03b27844 separate data db from flow db 2024-10-30 08:24:50 +08:00
peilun-conflux
da2cdec8a1
Remove unused. (#251)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-10-29 23:23:53 +08:00
6 changed files with 78 additions and 272 deletions

View File

@ -112,8 +112,13 @@ impl ClientBuilder {
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
LogManager::rocksdb(
config.log_config.clone(),
config.db_dir.join("flow_db"),
config.db_dir.join("data_db"),
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);
self.store = Some(store.clone());

View File

@ -25,9 +25,14 @@ fn write_performance(c: &mut Criterion) {
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_write", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
LogManager::rocksdb(
LogConfig::default(),
"db_flow_write",
"db_data_write",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
let chunk_count = 2048;
@ -114,9 +119,14 @@ fn read_performance(c: &mut Criterion) {
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(LogConfig::default(), "db_read", executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
LogManager::rocksdb(
LogConfig::default(),
"db_flow_read",
"db_data_read",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
let tx_size = 1000;

View File

@ -63,22 +63,22 @@ impl<T: ?Sized + Configurable> ConfigurableExt for T {}
impl Configurable for LogManager {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.db.get(COL_MISC, key)?)
Ok(self.flow_db.get(COL_MISC, key)?)
}
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.db.put(COL_MISC, key, value)?;
self.flow_db.put(COL_MISC, key, value)?;
Ok(())
}
fn remove_config(&self, key: &[u8]) -> Result<()> {
Ok(self.db.delete(COL_MISC, key)?)
Ok(self.flow_db.delete(COL_MISC, key)?)
}
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
let mut db_tx = self.db.transaction();
let mut db_tx = self.flow_db.transaction();
db_tx.ops = tx.ops;
self.db.write(db_tx)?;
self.flow_db.write(db_tx)?;
Ok(())
}

View File

@ -4,65 +4,62 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::error::Error;
use crate::log_store::log_manager::{
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
};
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB};
use any::Any;
use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
use append_merkle::{MerkleTreeRead, NodeDatabase, NodeTransaction};
use itertools::Itertools;
use kvdb::DBTransaction;
use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
use shared_types::{ChunkArray, DataRoot, FlowProof};
use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::{any, cmp, mem};
use std::{any, cmp};
use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
pub struct FlowStore {
db: Arc<FlowDBStore>,
flow_db: Arc<FlowDBStore>,
data_db: Arc<FlowDBStore>,
seal_manager: SealTaskManager,
config: FlowConfig,
}
impl FlowStore {
pub fn new(db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
Self {
db,
flow_db,
data_db,
seal_manager: Default::default(),
config,
}
}
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
self.db.put_batch_root_list(root_map)
}
pub fn insert_subtree_list_for_batch(
&self,
batch_index: usize,
subtree_list: Vec<(usize, usize, DataRoot)>,
) -> Result<()> {
let mut batch = self
.db
.data_db
.get_entry_batch(batch_index as u64)?
.unwrap_or_else(|| EntryBatch::new(batch_index as u64));
batch.set_subtree_list(subtree_list);
self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
self.data_db
.put_entry_raw(vec![(batch_index as u64, batch)])?;
Ok(())
}
pub fn gen_proof_in_batch(&self, batch_index: usize, sector_index: usize) -> Result<FlowProof> {
let batch = self
.db
.data_db
.get_entry_batch(batch_index as u64)?
.ok_or_else(|| anyhow!("batch missing, index={}", batch_index))?;
let merkle = batch.to_merkle_tree(batch_index == 0)?.ok_or_else(|| {
@ -74,21 +71,9 @@ impl FlowStore {
merkle.gen_proof(sector_index)
}
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
self.db.put_mpt_node_list(node_list)
}
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
self.seal_manager.delete_batch_list(batch_list);
self.db.delete_batch_list(batch_list)
}
pub fn get_raw_batch(&self, batch_index: u64) -> Result<Option<EntryBatch>> {
self.db.get_entry_batch(batch_index)
}
pub fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
self.db.get_batch_root(batch_index)
self.data_db.delete_batch_list(batch_list)
}
}
@ -134,7 +119,7 @@ impl FlowRead for FlowStore {
length -= 1;
}
let entry_batch = try_option!(self.db.get_entry_batch(chunk_index)?);
let entry_batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
let mut entry_batch_data =
try_option!(entry_batch.get_unsealed_data(offset as usize, length as usize));
data.append(&mut entry_batch_data);
@ -163,7 +148,7 @@ impl FlowRead for FlowStore {
let chunk_index = start_entry_index / self.config.batch_size as u64;
if let Some(mut data_list) = self
.db
.data_db
.get_entry_batch(chunk_index)?
.map(|b| b.into_data_list(start_entry_index))
{
@ -187,13 +172,8 @@ impl FlowRead for FlowStore {
Ok(entry_list)
}
/// Return the list of all stored chunk roots.
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
self.db.get_batch_root_list()
}
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
let batch = try_option!(self.db.get_entry_batch(chunk_index)?);
let batch = try_option!(self.data_db.get_entry_batch(chunk_index)?);
let mut mine_chunk = MineLoadChunk::default();
for (seal_index, (sealed, validity)) in mine_chunk
.loaded_chunk
@ -211,7 +191,7 @@ impl FlowRead for FlowStore {
fn get_num_entries(&self) -> Result<u64> {
// This is an over-estimation as it assumes each batch is full.
self.db
self.data_db
.kvdb
.num_keys(COL_ENTRY_BATCH)
.map(|num_batches| num_batches * PORA_CHUNK_SIZE as u64)
@ -251,7 +231,7 @@ impl FlowWrite for FlowStore {
// TODO: Try to avoid loading from db if possible.
let mut batch = self
.db
.data_db
.get_entry_batch(chunk_index)?
.unwrap_or_else(|| EntryBatch::new(chunk_index));
let completed_seals = batch.insert_data(
@ -269,12 +249,12 @@ impl FlowWrite for FlowStore {
batch_list.push((chunk_index, batch));
}
self.db.put_entry_batch_list(batch_list)
self.data_db.put_entry_batch_list(batch_list)
}
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
let mut to_seal_set = self.seal_manager.to_seal_set.write();
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
let to_reseal = self.data_db.truncate(start_index, self.config.batch_size)?;
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
let new_seal_version = self.seal_manager.inc_seal_version();
@ -304,7 +284,7 @@ impl FlowSeal for FlowStore {
let mut tasks = Vec::with_capacity(SEALS_PER_LOAD);
let batch_data = self
.db
.data_db
.get_entry_batch((first_index / SEALS_PER_LOAD) as u64)?
.expect("Lost data chunk in to_seal_set");
@ -343,7 +323,7 @@ impl FlowSeal for FlowStore {
.chunk_by(|answer| answer.seal_index / SEALS_PER_LOAD as u64)
{
let mut batch_chunk = self
.db
.data_db
.get_entry_batch(load_index)?
.expect("Can not find chunk data");
for answer in answers_in_chunk {
@ -359,7 +339,7 @@ impl FlowSeal for FlowStore {
to_seal_set.remove(&idx);
}
self.db.put_entry_raw(updated_chunk)?;
self.data_db.put_entry_raw(updated_chunk)?;
Ok(())
}
@ -388,12 +368,6 @@ impl FlowDBStore {
);
if let Some(root) = batch.build_root(batch_index == 0)? {
trace!("complete batch: index={}", batch_index);
tx.put(
COL_ENTRY_BATCH_ROOT,
// (batch_index, subtree_depth)
&encode_batch_root_key(batch_index as usize, 1),
root.as_bytes(),
);
completed_batches.push((batch_index, root));
}
}
@ -419,94 +393,6 @@ impl FlowDBStore {
Ok(Some(EntryBatch::from_ssz_bytes(&raw).map_err(Error::from)?))
}
fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (batch_index, (root, subtree_depth)) in root_map {
tx.put(
COL_ENTRY_BATCH_ROOT,
&encode_batch_root_key(batch_index, subtree_depth),
root.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn get_batch_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
let mut range_root = None;
// A list of `BatchRoot` that can reconstruct the whole merkle tree structure.
let mut root_list = Vec::new();
// A list of leaf `(index, root_hash)` in the subtrees of some nodes in `root_list`,
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
let mut leaf_list = Vec::new();
let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?;
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
let root = DataRoot::from_slice(root_bytes.as_ref());
debug!(
"load root depth={}, index expected={} get={} root={:?}",
subtree_depth, expected_index, batch_index, root,
);
if subtree_depth == 1 {
if range_root.is_none() {
// This is expected to be the next leaf.
if batch_index == expected_index {
root_list.push((1, root));
expected_index += 1;
} else {
bail!(
"unexpected chunk leaf, expected={}, get={}",
expected_index,
batch_index
);
}
} else {
match batch_index.cmp(&expected_index) {
Ordering::Less => {
// This leaf is within a subtree whose root is known.
leaf_list.push((batch_index, root));
}
Ordering::Equal => {
// A subtree range ends.
range_root = None;
root_list.push((1, root));
expected_index += 1;
}
Ordering::Greater => {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = None;
root_list.push((1, root));
expected_index += 1;
}
}
}
} else {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
root_list.push((subtree_depth, root));
expected_index += 1 << (subtree_depth - 1);
}
}
let extra_node_list = self.get_mpt_node_list()?;
Ok(MerkleTreeInitialData {
subtree_list: root_list,
known_leaves: leaf_list,
extra_mpt_nodes: extra_node_list,
})
}
fn truncate(&self, start_index: u64, batch_size: usize) -> crate::error::Result<Vec<usize>> {
let mut tx = self.kvdb.transaction();
let mut start_batch_index = start_index / batch_size as u64;
@ -547,38 +433,11 @@ impl FlowDBStore {
};
for batch_index in start_batch_index as usize..=end {
tx.delete(COL_ENTRY_BATCH, &batch_index.to_be_bytes());
tx.delete_prefix(COL_ENTRY_BATCH_ROOT, &batch_index.to_be_bytes());
}
self.kvdb.write(tx)?;
Ok(index_to_reseal)
}
fn put_mpt_node_list(&self, mpt_node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (layer_index, position, data) in mpt_node_list {
tx.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(layer_index, position),
data.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn get_mpt_node_list(&self) -> Result<Vec<(usize, usize, DataRoot)>> {
let mut node_list = Vec::new();
for r in self.kvdb.iter(COL_FLOW_MPT_NODES) {
let (index_bytes, node_bytes) = r?;
let (layer_index, position) = decode_mpt_node_key(index_bytes.as_ref())?;
node_list.push((
layer_index,
position,
DataRoot::from_slice(node_bytes.as_ref()),
));
}
Ok(node_list)
}
fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
let mut tx = self.kvdb.transaction();
for i in batch_list {
@ -586,16 +445,6 @@ impl FlowDBStore {
}
Ok(self.kvdb.write(tx)?)
}
fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
Ok(self
.kvdb
.get(
COL_ENTRY_BATCH_ROOT,
&encode_batch_root_key(batch_index as usize, 1),
)?
.map(|v| DataRoot::from_slice(&v)))
}
}
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
@ -641,37 +490,12 @@ fn decode_batch_index(data: &[u8]) -> Result<usize> {
try_decode_usize(data)
}
/// For the same batch_index, we want to process the larger subtree_depth first in iteration.
fn encode_batch_root_key(batch_index: usize, subtree_depth: usize) -> Vec<u8> {
let mut key = batch_index.to_be_bytes().to_vec();
key.extend_from_slice(&(usize::MAX - subtree_depth).to_be_bytes());
key
}
fn decode_batch_root_key(data: &[u8]) -> Result<(usize, usize)> {
if data.len() != mem::size_of::<usize>() * 2 {
bail!("invalid data length");
}
let batch_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
let subtree_depth = usize::MAX - try_decode_usize(&data[mem::size_of::<u64>()..])?;
Ok((batch_index, subtree_depth))
}
fn encode_mpt_node_key(layer_index: usize, position: usize) -> Vec<u8> {
let mut key = layer_index.to_be_bytes().to_vec();
key.extend_from_slice(&position.to_be_bytes());
key
}
fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
if data.len() != mem::size_of::<usize>() * 2 {
bail!("invalid data length");
}
let layer_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
Ok((layer_index, position))
}
fn layer_size_key(layer: usize) -> Vec<u8> {
let mut key = "layer_size".as_bytes().to_vec();
key.extend_from_slice(&layer.to_be_bytes());

View File

@ -22,7 +22,7 @@ use shared_types::{
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
@ -56,13 +56,12 @@ static PAD_SEGMENT_ROOT: Lazy<H256> = Lazy::new(|| {
.root()
});
pub struct UpdateFlowMessage {
pub root_map: BTreeMap<usize, (H256, usize)>,
pub pad_data: usize,
pub tx_start_flow_index: u64,
}
pub struct LogManager {
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore,
flow_store: Arc<FlowStore>,
merkle: RwLock<MerkleManager>,
@ -214,12 +213,11 @@ impl LogStoreChunkWrite for LogManager {
self.append_entries(flow_entry_array, &mut merkle)?;
if let Some(file_proof) = maybe_file_proof {
let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof(
merkle.pora_chunks_merkle.fill_with_file_proof(
file_proof,
tx.merkle_nodes,
tx.start_entry_index,
)?;
self.flow_store.put_mpt_node_list(updated_node_list)?;
}
Ok(true)
}
@ -385,10 +383,9 @@ impl LogStoreWrite for LogManager {
// `merkle` is used in `validate_range_proof`.
let mut merkle = self.merkle.write();
if valid {
let updated_nodes = merkle
merkle
.pora_chunks_merkle
.fill_with_range_proof(data.proof.clone())?;
self.flow_store.put_mpt_node_list(updated_nodes)?;
}
Ok(valid)
}
@ -615,28 +612,37 @@ impl LogStoreRead for LogManager {
impl LogManager {
pub fn rocksdb(
config: LogConfig,
path: impl AsRef<Path>,
flow_path: impl AsRef<Path>,
data_path: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
db_config.enable_statistics = true;
let db = Arc::new(Database::open(&db_config, path)?);
Self::new(db, config, executor)
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
Self::new(flow_db_source, data_db_source, config, executor)
}
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
let db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(db, config, executor)
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(flow_db, data_db, config, executor)
}
fn new(
db: Arc<dyn ZgsKeyValueDB>,
flow_db_source: Arc<dyn ZgsKeyValueDB>,
data_db_source: Arc<dyn ZgsKeyValueDB>,
config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let tx_store = TransactionStore::new(db.clone())?;
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
let tx_store = TransactionStore::new(flow_db_source.clone())?;
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
let flow_store = Arc::new(FlowStore::new(
flow_db.clone(),
data_db.clone(),
config.flow.clone(),
));
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
// first and call `put_tx` later.
let next_tx_seq = tx_store.next_tx_seq();
@ -740,7 +746,7 @@ impl LogManager {
let (sender, receiver) = mpsc::channel();
let mut log_manager = Self {
db,
flow_db: flow_db_source,
tx_store,
flow_store,
merkle,
@ -774,8 +780,6 @@ impl LogManager {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the root index.
flow_store.put_batch_root_list(data.root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
@ -848,21 +852,7 @@ impl LogManager {
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
}
};
let r = entry_proof(&top_proof, &sub_proof);
if r.is_err() {
let raw_batch = self.flow_store.get_raw_batch(seg_index as u64)?.unwrap();
let db_root = self.flow_store.get_batch_root(seg_index as u64)?;
error!(
?r,
?db_root,
?seg_index,
"gen proof error: top_leaves={}, last={}, raw_batch={}",
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves(),
serde_json::to_string(&raw_batch).unwrap(),
);
}
r
entry_proof(&top_proof, &sub_proof)
}
#[instrument(skip(self, merkle))]
@ -878,7 +868,6 @@ impl LogManager {
self.pad_tx(tx_start_index, &mut *merkle)?;
let mut batch_root_map = BTreeMap::new();
for (subtree_depth, subtree_root) in merkle_list {
let subtree_size = 1 << (subtree_depth - 1);
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
@ -896,10 +885,6 @@ impl LogManager {
.update_last(merkle.last_chunk_merkle.root());
}
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
batch_root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(merkle.last_chunk_merkle.root(), 1),
);
self.complete_last_chunk_merkle(
merkle.pora_chunks_merkle.leaves() - 1,
&mut *merkle,
@ -910,16 +895,11 @@ impl LogManager {
// the chunks boundary.
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
assert!(subtree_size >= PORA_CHUNK_SIZE);
batch_root_map.insert(
merkle.pora_chunks_merkle.leaves(),
(subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)),
);
merkle
.pora_chunks_merkle
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
}
}
self.flow_store.put_batch_root_list(batch_root_map)?;
Ok(())
}
@ -937,7 +917,6 @@ impl LogManager {
if pad_size != 0 {
for pad_data in Self::padding(pad_size as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree.
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
@ -965,10 +944,6 @@ impl LogManager {
merkle
.pora_chunks_merkle
.update_last(merkle.last_chunk_merkle.root());
root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(merkle.last_chunk_merkle.root(), 1),
);
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
}
@ -976,10 +951,6 @@ impl LogManager {
let mut start_index = last_chunk_pad / ENTRY_SIZE;
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(*PAD_SEGMENT_ROOT, 1),
);
start_index += PORA_CHUNK_SIZE;
}
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
@ -988,12 +959,10 @@ impl LogManager {
let data_size = pad_data.len() / ENTRY_SIZE;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
root_map,
pad_data: pad_data.len(),
tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.

View File

@ -1,5 +1,5 @@
use crate::config::ShardConfig;
use append_merkle::MerkleTreeInitialData;
use ethereum_types::H256;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
@ -211,8 +211,6 @@ pub trait FlowRead {
/// For simplicity, `index_start` and `index_end` must be at the batch boundaries.
fn get_available_entries(&self, index_start: u64, index_end: u64) -> Result<Vec<ChunkArray>>;
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>>;
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
// An estimation of the number of entries in the flow db.