mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-18 02:55:17 +00:00
Remove unused. (#251)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
This commit is contained in:
parent
9b68a8b7d7
commit
da2cdec8a1
@ -4,25 +4,23 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
|
||||
use crate::config::ShardConfig;
|
||||
use crate::error::Error;
|
||||
use crate::log_store::log_manager::{
|
||||
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
|
||||
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
use any::Any;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||
use append_merkle::{MerkleTreeRead, NodeDatabase, NodeTransaction};
|
||||
use itertools::Itertools;
|
||||
use kvdb::DBTransaction;
|
||||
use parking_lot::RwLock;
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::{any, cmp, mem};
|
||||
use std::{any, cmp};
|
||||
use tracing::{debug, error, trace};
|
||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||
|
||||
@ -41,10 +39,6 @@ impl FlowStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
|
||||
self.db.put_batch_root_list(root_map)
|
||||
}
|
||||
|
||||
pub fn insert_subtree_list_for_batch(
|
||||
&self,
|
||||
batch_index: usize,
|
||||
@ -74,22 +68,10 @@ impl FlowStore {
|
||||
merkle.gen_proof(sector_index)
|
||||
}
|
||||
|
||||
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
||||
self.db.put_mpt_node_list(node_list)
|
||||
}
|
||||
|
||||
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
||||
self.seal_manager.delete_batch_list(batch_list);
|
||||
self.db.delete_batch_list(batch_list)
|
||||
}
|
||||
|
||||
pub fn get_raw_batch(&self, batch_index: u64) -> Result<Option<EntryBatch>> {
|
||||
self.db.get_entry_batch(batch_index)
|
||||
}
|
||||
|
||||
pub fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
|
||||
self.db.get_batch_root(batch_index)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@ -187,11 +169,6 @@ impl FlowRead for FlowStore {
|
||||
Ok(entry_list)
|
||||
}
|
||||
|
||||
/// Return the list of all stored chunk roots.
|
||||
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
|
||||
self.db.get_batch_root_list()
|
||||
}
|
||||
|
||||
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
|
||||
let batch = try_option!(self.db.get_entry_batch(chunk_index)?);
|
||||
let mut mine_chunk = MineLoadChunk::default();
|
||||
@ -388,12 +365,6 @@ impl FlowDBStore {
|
||||
);
|
||||
if let Some(root) = batch.build_root(batch_index == 0)? {
|
||||
trace!("complete batch: index={}", batch_index);
|
||||
tx.put(
|
||||
COL_ENTRY_BATCH_ROOT,
|
||||
// (batch_index, subtree_depth)
|
||||
&encode_batch_root_key(batch_index as usize, 1),
|
||||
root.as_bytes(),
|
||||
);
|
||||
completed_batches.push((batch_index, root));
|
||||
}
|
||||
}
|
||||
@ -419,94 +390,6 @@ impl FlowDBStore {
|
||||
Ok(Some(EntryBatch::from_ssz_bytes(&raw).map_err(Error::from)?))
|
||||
}
|
||||
|
||||
fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for (batch_index, (root, subtree_depth)) in root_map {
|
||||
tx.put(
|
||||
COL_ENTRY_BATCH_ROOT,
|
||||
&encode_batch_root_key(batch_index, subtree_depth),
|
||||
root.as_bytes(),
|
||||
);
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn get_batch_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
|
||||
let mut range_root = None;
|
||||
// A list of `BatchRoot` that can reconstruct the whole merkle tree structure.
|
||||
let mut root_list = Vec::new();
|
||||
// A list of leaf `(index, root_hash)` in the subtrees of some nodes in `root_list`,
|
||||
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
|
||||
let mut leaf_list = Vec::new();
|
||||
let mut expected_index = 0;
|
||||
|
||||
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
||||
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||
|
||||
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
||||
let (index_bytes, root_bytes) = r?;
|
||||
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
|
||||
let root = DataRoot::from_slice(root_bytes.as_ref());
|
||||
debug!(
|
||||
"load root depth={}, index expected={} get={} root={:?}",
|
||||
subtree_depth, expected_index, batch_index, root,
|
||||
);
|
||||
if subtree_depth == 1 {
|
||||
if range_root.is_none() {
|
||||
// This is expected to be the next leaf.
|
||||
if batch_index == expected_index {
|
||||
root_list.push((1, root));
|
||||
expected_index += 1;
|
||||
} else {
|
||||
bail!(
|
||||
"unexpected chunk leaf, expected={}, get={}",
|
||||
expected_index,
|
||||
batch_index
|
||||
);
|
||||
}
|
||||
} else {
|
||||
match batch_index.cmp(&expected_index) {
|
||||
Ordering::Less => {
|
||||
// This leaf is within a subtree whose root is known.
|
||||
leaf_list.push((batch_index, root));
|
||||
}
|
||||
Ordering::Equal => {
|
||||
// A subtree range ends.
|
||||
range_root = None;
|
||||
root_list.push((1, root));
|
||||
expected_index += 1;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
while batch_index > expected_index {
|
||||
// Fill the gap with empty leaves.
|
||||
root_list.push((1, empty_root));
|
||||
expected_index += 1;
|
||||
}
|
||||
range_root = None;
|
||||
root_list.push((1, root));
|
||||
expected_index += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
while batch_index > expected_index {
|
||||
// Fill the gap with empty leaves.
|
||||
root_list.push((1, empty_root));
|
||||
expected_index += 1;
|
||||
}
|
||||
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
|
||||
root_list.push((subtree_depth, root));
|
||||
expected_index += 1 << (subtree_depth - 1);
|
||||
}
|
||||
}
|
||||
let extra_node_list = self.get_mpt_node_list()?;
|
||||
Ok(MerkleTreeInitialData {
|
||||
subtree_list: root_list,
|
||||
known_leaves: leaf_list,
|
||||
extra_mpt_nodes: extra_node_list,
|
||||
})
|
||||
}
|
||||
|
||||
fn truncate(&self, start_index: u64, batch_size: usize) -> crate::error::Result<Vec<usize>> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
let mut start_batch_index = start_index / batch_size as u64;
|
||||
@ -547,38 +430,11 @@ impl FlowDBStore {
|
||||
};
|
||||
for batch_index in start_batch_index as usize..=end {
|
||||
tx.delete(COL_ENTRY_BATCH, &batch_index.to_be_bytes());
|
||||
tx.delete_prefix(COL_ENTRY_BATCH_ROOT, &batch_index.to_be_bytes());
|
||||
}
|
||||
self.kvdb.write(tx)?;
|
||||
Ok(index_to_reseal)
|
||||
}
|
||||
|
||||
fn put_mpt_node_list(&self, mpt_node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for (layer_index, position, data) in mpt_node_list {
|
||||
tx.put(
|
||||
COL_FLOW_MPT_NODES,
|
||||
&encode_mpt_node_key(layer_index, position),
|
||||
data.as_bytes(),
|
||||
);
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn get_mpt_node_list(&self) -> Result<Vec<(usize, usize, DataRoot)>> {
|
||||
let mut node_list = Vec::new();
|
||||
for r in self.kvdb.iter(COL_FLOW_MPT_NODES) {
|
||||
let (index_bytes, node_bytes) = r?;
|
||||
let (layer_index, position) = decode_mpt_node_key(index_bytes.as_ref())?;
|
||||
node_list.push((
|
||||
layer_index,
|
||||
position,
|
||||
DataRoot::from_slice(node_bytes.as_ref()),
|
||||
));
|
||||
}
|
||||
Ok(node_list)
|
||||
}
|
||||
|
||||
fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for i in batch_list {
|
||||
@ -586,16 +442,6 @@ impl FlowDBStore {
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
|
||||
Ok(self
|
||||
.kvdb
|
||||
.get(
|
||||
COL_ENTRY_BATCH_ROOT,
|
||||
&encode_batch_root_key(batch_index as usize, 1),
|
||||
)?
|
||||
.map(|v| DataRoot::from_slice(&v)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
|
||||
@ -641,37 +487,12 @@ fn decode_batch_index(data: &[u8]) -> Result<usize> {
|
||||
try_decode_usize(data)
|
||||
}
|
||||
|
||||
/// For the same batch_index, we want to process the larger subtree_depth first in iteration.
|
||||
fn encode_batch_root_key(batch_index: usize, subtree_depth: usize) -> Vec<u8> {
|
||||
let mut key = batch_index.to_be_bytes().to_vec();
|
||||
key.extend_from_slice(&(usize::MAX - subtree_depth).to_be_bytes());
|
||||
key
|
||||
}
|
||||
|
||||
fn decode_batch_root_key(data: &[u8]) -> Result<(usize, usize)> {
|
||||
if data.len() != mem::size_of::<usize>() * 2 {
|
||||
bail!("invalid data length");
|
||||
}
|
||||
let batch_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
|
||||
let subtree_depth = usize::MAX - try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
||||
Ok((batch_index, subtree_depth))
|
||||
}
|
||||
|
||||
fn encode_mpt_node_key(layer_index: usize, position: usize) -> Vec<u8> {
|
||||
let mut key = layer_index.to_be_bytes().to_vec();
|
||||
key.extend_from_slice(&position.to_be_bytes());
|
||||
key
|
||||
}
|
||||
|
||||
fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
||||
if data.len() != mem::size_of::<usize>() * 2 {
|
||||
bail!("invalid data length");
|
||||
}
|
||||
let layer_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
|
||||
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
||||
Ok((layer_index, position))
|
||||
}
|
||||
|
||||
fn layer_size_key(layer: usize) -> Vec<u8> {
|
||||
let mut key = "layer_size".as_bytes().to_vec();
|
||||
key.extend_from_slice(&layer.to_be_bytes());
|
||||
|
@ -22,7 +22,7 @@ use shared_types::{
|
||||
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::Arc;
|
||||
@ -56,7 +56,6 @@ static PAD_SEGMENT_ROOT: Lazy<H256> = Lazy::new(|| {
|
||||
.root()
|
||||
});
|
||||
pub struct UpdateFlowMessage {
|
||||
pub root_map: BTreeMap<usize, (H256, usize)>,
|
||||
pub pad_data: usize,
|
||||
pub tx_start_flow_index: u64,
|
||||
}
|
||||
@ -214,12 +213,11 @@ impl LogStoreChunkWrite for LogManager {
|
||||
self.append_entries(flow_entry_array, &mut merkle)?;
|
||||
|
||||
if let Some(file_proof) = maybe_file_proof {
|
||||
let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof(
|
||||
merkle.pora_chunks_merkle.fill_with_file_proof(
|
||||
file_proof,
|
||||
tx.merkle_nodes,
|
||||
tx.start_entry_index,
|
||||
)?;
|
||||
self.flow_store.put_mpt_node_list(updated_node_list)?;
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
@ -385,10 +383,9 @@ impl LogStoreWrite for LogManager {
|
||||
// `merkle` is used in `validate_range_proof`.
|
||||
let mut merkle = self.merkle.write();
|
||||
if valid {
|
||||
let updated_nodes = merkle
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.fill_with_range_proof(data.proof.clone())?;
|
||||
self.flow_store.put_mpt_node_list(updated_nodes)?;
|
||||
}
|
||||
Ok(valid)
|
||||
}
|
||||
@ -774,8 +771,6 @@ impl LogManager {
|
||||
loop {
|
||||
match rx.recv() {
|
||||
std::result::Result::Ok(data) => {
|
||||
// Update the root index.
|
||||
flow_store.put_batch_root_list(data.root_map).unwrap();
|
||||
// Update the flow database.
|
||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||
// subtrees with data known.
|
||||
@ -848,21 +843,7 @@ impl LogManager {
|
||||
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
|
||||
}
|
||||
};
|
||||
let r = entry_proof(&top_proof, &sub_proof);
|
||||
if r.is_err() {
|
||||
let raw_batch = self.flow_store.get_raw_batch(seg_index as u64)?.unwrap();
|
||||
let db_root = self.flow_store.get_batch_root(seg_index as u64)?;
|
||||
error!(
|
||||
?r,
|
||||
?db_root,
|
||||
?seg_index,
|
||||
"gen proof error: top_leaves={}, last={}, raw_batch={}",
|
||||
merkle.pora_chunks_merkle.leaves(),
|
||||
merkle.last_chunk_merkle.leaves(),
|
||||
serde_json::to_string(&raw_batch).unwrap(),
|
||||
);
|
||||
}
|
||||
r
|
||||
entry_proof(&top_proof, &sub_proof)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, merkle))]
|
||||
@ -878,7 +859,6 @@ impl LogManager {
|
||||
|
||||
self.pad_tx(tx_start_index, &mut *merkle)?;
|
||||
|
||||
let mut batch_root_map = BTreeMap::new();
|
||||
for (subtree_depth, subtree_root) in merkle_list {
|
||||
let subtree_size = 1 << (subtree_depth - 1);
|
||||
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
|
||||
@ -896,10 +876,6 @@ impl LogManager {
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
}
|
||||
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
||||
batch_root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
self.complete_last_chunk_merkle(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
&mut *merkle,
|
||||
@ -910,16 +886,11 @@ impl LogManager {
|
||||
// the chunks boundary.
|
||||
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
|
||||
assert!(subtree_size >= PORA_CHUNK_SIZE);
|
||||
batch_root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves(),
|
||||
(subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)),
|
||||
);
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
||||
}
|
||||
}
|
||||
self.flow_store.put_batch_root_list(batch_root_map)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -937,7 +908,6 @@ impl LogManager {
|
||||
if pad_size != 0 {
|
||||
for pad_data in Self::padding(pad_size as usize) {
|
||||
let mut is_full_empty = true;
|
||||
let mut root_map = BTreeMap::new();
|
||||
|
||||
// Update the in-memory merkle tree.
|
||||
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
|
||||
@ -965,10 +935,6 @@ impl LogManager {
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(merkle.last_chunk_merkle.root());
|
||||
root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
||||
}
|
||||
|
||||
@ -976,10 +942,6 @@ impl LogManager {
|
||||
let mut start_index = last_chunk_pad / ENTRY_SIZE;
|
||||
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
|
||||
merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
|
||||
root_map.insert(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(*PAD_SEGMENT_ROOT, 1),
|
||||
);
|
||||
start_index += PORA_CHUNK_SIZE;
|
||||
}
|
||||
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
|
||||
@ -988,12 +950,10 @@ impl LogManager {
|
||||
let data_size = pad_data.len() / ENTRY_SIZE;
|
||||
if is_full_empty {
|
||||
self.sender.send(UpdateFlowMessage {
|
||||
root_map,
|
||||
pad_data: pad_data.len(),
|
||||
tx_start_flow_index,
|
||||
})?;
|
||||
} else {
|
||||
self.flow_store.put_batch_root_list(root_map).unwrap();
|
||||
// Update the flow database.
|
||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||
// subtrees with data known.
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::config::ShardConfig;
|
||||
use append_merkle::MerkleTreeInitialData;
|
||||
|
||||
use ethereum_types::H256;
|
||||
use shared_types::{
|
||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||
@ -211,8 +211,6 @@ pub trait FlowRead {
|
||||
/// For simplicity, `index_start` and `index_end` must be at the batch boundaries.
|
||||
fn get_available_entries(&self, index_start: u64, index_end: u64) -> Result<Vec<ChunkArray>>;
|
||||
|
||||
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>>;
|
||||
|
||||
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
|
||||
|
||||
// An estimation of the number of entries in the flow db.
|
||||
|
Loading…
Reference in New Issue
Block a user