mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-19 03:25:18 +00:00
Use inner lock in storage.
This commit is contained in:
parent
d8d8c28c64
commit
fad3b93a31
39
Cargo.lock
generated
39
Cargo.lock
generated
@ -2286,7 +2286,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"hashlink 0.8.4",
|
||||
"network",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"priority-queue",
|
||||
"rand 0.8.5",
|
||||
"shared_types",
|
||||
@ -3342,7 +3342,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"jsonrpsee-types",
|
||||
"lazy_static",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
"rustc-hash",
|
||||
"serde",
|
||||
@ -3531,7 +3531,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf7a85fe66f9ff9cd74e169fdd2c94c6e1e74c412c99a73b4df3200b5d3760b2"
|
||||
dependencies = [
|
||||
"kvdb",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -3542,7 +3542,7 @@ checksum = "b644c70b92285f66bfc2032922a79000ea30af7bc2ab31902992a5dcb9b434f6"
|
||||
dependencies = [
|
||||
"kvdb",
|
||||
"num_cpus",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"regex",
|
||||
"rocksdb",
|
||||
"smallvec",
|
||||
@ -3650,7 +3650,7 @@ dependencies = [
|
||||
"libp2p-websocket",
|
||||
"libp2p-yamux",
|
||||
"multiaddr 0.14.0",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"pin-project 1.1.5",
|
||||
"rand 0.7.3",
|
||||
"smallvec",
|
||||
@ -3729,7 +3729,7 @@ dependencies = [
|
||||
"multiaddr 0.14.0",
|
||||
"multihash 0.16.3",
|
||||
"multistream-select 0.11.0",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"pin-project 1.1.5",
|
||||
"prost 0.9.0",
|
||||
"prost-build 0.9.0",
|
||||
@ -3764,7 +3764,7 @@ dependencies = [
|
||||
"multiaddr 0.14.0",
|
||||
"multihash 0.16.3",
|
||||
"multistream-select 0.11.0",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"pin-project 1.1.5",
|
||||
"prost 0.10.4",
|
||||
"prost-build 0.10.4",
|
||||
@ -3800,7 +3800,7 @@ dependencies = [
|
||||
"futures",
|
||||
"libp2p-core 0.33.0",
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"smallvec",
|
||||
"trust-dns-resolver",
|
||||
]
|
||||
@ -3949,7 +3949,7 @@ dependencies = [
|
||||
"libp2p-core 0.33.0",
|
||||
"log",
|
||||
"nohash-hasher",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.7.3",
|
||||
"smallvec",
|
||||
"unsigned-varint",
|
||||
@ -4177,7 +4177,7 @@ dependencies = [
|
||||
"futures-rustls",
|
||||
"libp2p-core 0.33.0",
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"quicksink",
|
||||
"rw-stream-sink 0.3.0",
|
||||
"soketto",
|
||||
@ -4193,7 +4193,7 @@ checksum = "8fe653639ad74877c759720febb0cbcbf4caa221adde4eed2d3126ce5c6f381f"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"libp2p-core 0.33.0",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"thiserror",
|
||||
"yamux",
|
||||
]
|
||||
@ -4688,7 +4688,7 @@ dependencies = [
|
||||
"libp2p",
|
||||
"lighthouse_metrics",
|
||||
"lru",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"prometheus-client",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@ -5008,9 +5008,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.1"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
|
||||
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.9",
|
||||
@ -5562,7 +5562,7 @@ dependencies = [
|
||||
"fnv",
|
||||
"lazy_static",
|
||||
"memchr",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"protobuf",
|
||||
"thiserror",
|
||||
]
|
||||
@ -6884,6 +6884,7 @@ dependencies = [
|
||||
"kvdb-rocksdb",
|
||||
"merkle_light",
|
||||
"merkle_tree",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
"rayon",
|
||||
"serde",
|
||||
@ -6917,7 +6918,7 @@ checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b"
|
||||
dependencies = [
|
||||
"new_debug_unreachable",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"phf_shared 0.10.0",
|
||||
"precomputed-hash",
|
||||
]
|
||||
@ -7271,7 +7272,7 @@ dependencies = [
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"pin-project-lite 0.2.14",
|
||||
"signal-hook-registry",
|
||||
"socket2 0.5.6",
|
||||
@ -7605,7 +7606,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"log",
|
||||
"lru-cache",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"resolv-conf",
|
||||
"smallvec",
|
||||
"thiserror",
|
||||
@ -8315,7 +8316,7 @@ dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
"nohash-hasher",
|
||||
"parking_lot 0.12.1",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
"static_assertions",
|
||||
]
|
||||
|
@ -27,6 +27,7 @@ static_assertions = "1.1"
|
||||
tiny-keccak = "*"
|
||||
itertools = "0.13.0"
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
parking_lot = "0.12.3"
|
||||
|
||||
[dev-dependencies]
|
||||
tempdir = "0.3.7"
|
||||
|
@ -10,6 +10,7 @@ use crate::{try_option, ZgsKeyValueDB};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
|
||||
use itertools::Itertools;
|
||||
use parking_lot::RwLock;
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||
@ -24,10 +25,10 @@ use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_S
|
||||
pub struct FlowStore {
|
||||
db: FlowDBStore,
|
||||
// TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk.
|
||||
to_seal_set: BTreeMap<usize, usize>,
|
||||
to_seal_set: RwLock<BTreeMap<usize, usize>>,
|
||||
// Data sealing is an asynchronized process.
|
||||
// The sealing service uses the version number to distinguish if revert happens during sealing.
|
||||
to_seal_version: usize,
|
||||
to_seal_version: RwLock<usize>,
|
||||
config: FlowConfig,
|
||||
}
|
||||
|
||||
@ -36,7 +37,7 @@ impl FlowStore {
|
||||
Self {
|
||||
db: FlowDBStore::new(db),
|
||||
to_seal_set: Default::default(),
|
||||
to_seal_version: 0,
|
||||
to_seal_version: RwLock::new(0),
|
||||
config,
|
||||
}
|
||||
}
|
||||
@ -214,7 +215,8 @@ impl FlowRead for FlowStore {
|
||||
impl FlowWrite for FlowStore {
|
||||
/// Return the roots of completed chunks. The order is guaranteed to be increasing
|
||||
/// by chunk index.
|
||||
fn append_entries(&mut self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
||||
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
||||
let mut to_seal_set = self.to_seal_set.write();
|
||||
trace!("append_entries: {} {}", data.start_index, data.data.len());
|
||||
if data.data.len() % BYTES_PER_SECTOR != 0 {
|
||||
bail!("append_entries: invalid data size, len={}", data.data.len());
|
||||
@ -246,9 +248,9 @@ impl FlowWrite for FlowStore {
|
||||
chunk.data,
|
||||
)?;
|
||||
completed_seals.into_iter().for_each(|x| {
|
||||
self.to_seal_set.insert(
|
||||
to_seal_set.insert(
|
||||
chunk_index as usize * SEALS_PER_LOAD + x as usize,
|
||||
self.to_seal_version,
|
||||
*self.to_seal_version.read(),
|
||||
);
|
||||
});
|
||||
|
||||
@ -257,15 +259,16 @@ impl FlowWrite for FlowStore {
|
||||
self.db.put_entry_batch_list(batch_list)
|
||||
}
|
||||
|
||||
fn truncate(&mut self, start_index: u64) -> crate::error::Result<()> {
|
||||
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
|
||||
let mut to_seal_set = self.to_seal_set.write();
|
||||
let mut to_seal_version = self.to_seal_version.write();
|
||||
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
|
||||
|
||||
self.to_seal_set
|
||||
.split_off(&(start_index as usize / SECTORS_PER_SEAL));
|
||||
self.to_seal_version += 1;
|
||||
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
|
||||
*to_seal_version += 1;
|
||||
|
||||
to_reseal.into_iter().for_each(|x| {
|
||||
self.to_seal_set.insert(x, self.to_seal_version);
|
||||
to_seal_set.insert(x, *to_seal_version);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
@ -277,7 +280,8 @@ impl FlowWrite for FlowStore {
|
||||
|
||||
impl FlowSeal for FlowStore {
|
||||
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
|
||||
let mut to_seal_iter = self.to_seal_set.iter();
|
||||
let to_seal_set = self.to_seal_set.read();
|
||||
let mut to_seal_iter = to_seal_set.iter();
|
||||
let (&first_index, &first_version) = try_option!(to_seal_iter.next());
|
||||
if first_index >= seal_index_max {
|
||||
return Ok(None);
|
||||
@ -310,8 +314,9 @@ impl FlowSeal for FlowStore {
|
||||
}
|
||||
|
||||
fn submit_seal_result(&mut self, answers: Vec<SealAnswer>) -> Result<()> {
|
||||
let mut to_seal_set = self.to_seal_set.write();
|
||||
let is_consistent = |answer: &SealAnswer| {
|
||||
self.to_seal_set
|
||||
to_seal_set
|
||||
.get(&(answer.seal_index as usize))
|
||||
.map_or(false, |cur_ver| cur_ver == &answer.version)
|
||||
};
|
||||
@ -337,7 +342,7 @@ impl FlowSeal for FlowStore {
|
||||
debug!("Seal chunks: indices = {:?}", removed_seal_index);
|
||||
|
||||
for idx in removed_seal_index.into_iter() {
|
||||
self.to_seal_set.remove(&idx);
|
||||
to_seal_set.remove(&idx);
|
||||
}
|
||||
|
||||
self.db.put_entry_raw(updated_chunk)?;
|
||||
|
@ -10,6 +10,7 @@ use ethereum_types::H256;
|
||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||
use merkle_light::merkle::{log2_pow2, MerkleTree};
|
||||
use merkle_tree::RawLeafSha3Algorithm;
|
||||
use parking_lot::RwLock;
|
||||
use rayon::iter::ParallelIterator;
|
||||
use rayon::prelude::ParallelSlice;
|
||||
use shared_types::{
|
||||
@ -48,6 +49,10 @@ pub struct LogManager {
|
||||
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
|
||||
tx_store: TransactionStore,
|
||||
flow_store: FlowStore,
|
||||
merkle: RwLock<MerkleManager>,
|
||||
}
|
||||
|
||||
struct MerkleManager {
|
||||
// TODO(zz): Refactor the in-memory merkle and in-disk storage together.
|
||||
pora_chunks_merkle: Merkle,
|
||||
/// The in-memory structure of the sub merkle tree of the last chunk.
|
||||
@ -55,6 +60,80 @@ pub struct LogManager {
|
||||
last_chunk_merkle: Merkle,
|
||||
}
|
||||
|
||||
impl MerkleManager {
|
||||
fn last_chunk_start_index(&self) -> u64 {
|
||||
if self.pora_chunks_merkle.leaves() == 0 {
|
||||
0
|
||||
} else {
|
||||
PORA_CHUNK_SIZE as u64
|
||||
* if self.last_chunk_merkle.leaves() == 0 {
|
||||
// The last chunk is empty and its root hash is not in `pora_chunk_merkle`,
|
||||
// so all chunks in `pora_chunk_merkle` is complete.
|
||||
self.pora_chunks_merkle.leaves()
|
||||
} else {
|
||||
// The last chunk has data, so we need to exclude it from `pora_chunks_merkle`.
|
||||
self.pora_chunks_merkle.leaves() - 1
|
||||
} as u64
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
fn commit_merkle(&mut self, tx_seq: u64) -> Result<()> {
|
||||
self.pora_chunks_merkle.commit(Some(tx_seq));
|
||||
self.last_chunk_merkle.commit(Some(tx_seq));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
|
||||
// Special case for reverting tx_seq == 0
|
||||
if tx_seq == u64::MAX {
|
||||
self.pora_chunks_merkle.reset();
|
||||
self.last_chunk_merkle.reset();
|
||||
return Ok(());
|
||||
}
|
||||
let old_leaves = self.pora_chunks_merkle.leaves();
|
||||
self.pora_chunks_merkle.revert_to(tx_seq)?;
|
||||
if old_leaves == self.pora_chunks_merkle.leaves() {
|
||||
self.last_chunk_merkle.revert_to(tx_seq)?;
|
||||
} else {
|
||||
// We are reverting to a position before the current last_chunk.
|
||||
self.last_chunk_merkle =
|
||||
tx_store.rebuild_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1, tx_seq)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_initialize(&mut self, flow_store: &FlowStore) -> Result<()> {
|
||||
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
|
||||
self.last_chunk_merkle.append(H256::zero());
|
||||
self.pora_chunks_merkle
|
||||
.update_last(*self.last_chunk_merkle.root());
|
||||
} else if self.last_chunk_merkle.leaves() != 0 {
|
||||
let last_chunk_start_index = self.last_chunk_start_index();
|
||||
let last_chunk_data = flow_store.get_available_entries(
|
||||
last_chunk_start_index,
|
||||
last_chunk_start_index + PORA_CHUNK_SIZE as u64,
|
||||
)?;
|
||||
for e in last_chunk_data {
|
||||
let start_index = e.start_index - last_chunk_start_index;
|
||||
for i in 0..e.data.len() / ENTRY_SIZE {
|
||||
let index = i + start_index as usize;
|
||||
if index >= self.last_chunk_merkle.leaves() {
|
||||
// We revert the merkle tree before truncate the flow store,
|
||||
// so last_chunk_data may include data that should have been truncated.
|
||||
break;
|
||||
}
|
||||
self.last_chunk_merkle.fill_leaf(
|
||||
index,
|
||||
Sha3Algorithm::leaf(&e.data[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE]),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct LogConfig {
|
||||
pub flow: FlowConfig,
|
||||
@ -72,6 +151,7 @@ impl LogStoreInner for LogManager {
|
||||
|
||||
impl LogStoreChunkWrite for LogManager {
|
||||
fn put_chunks(&mut self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
|
||||
let mut merkle = self.merkle.write();
|
||||
let tx = self
|
||||
.tx_store
|
||||
.get_tx_by_seq_number(tx_seq)?
|
||||
@ -90,7 +170,7 @@ impl LogStoreChunkWrite for LogManager {
|
||||
// TODO: Use another struct to avoid confusion.
|
||||
let mut flow_entry_array = chunks;
|
||||
flow_entry_array.start_index += tx.start_entry_index;
|
||||
self.append_entries(flow_entry_array)?;
|
||||
self.append_entries(flow_entry_array, &mut *merkle)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -101,6 +181,7 @@ impl LogStoreChunkWrite for LogManager {
|
||||
chunks: ChunkArray,
|
||||
maybe_file_proof: Option<FlowProof>,
|
||||
) -> Result<bool> {
|
||||
let mut merkle = self.merkle.write();
|
||||
let tx = self
|
||||
.tx_store
|
||||
.get_tx_by_seq_number(tx_seq)?
|
||||
@ -122,10 +203,10 @@ impl LogStoreChunkWrite for LogManager {
|
||||
// TODO: Use another struct to avoid confusion.
|
||||
let mut flow_entry_array = chunks;
|
||||
flow_entry_array.start_index += tx.start_entry_index;
|
||||
self.append_entries(flow_entry_array)?;
|
||||
self.append_entries(flow_entry_array, &mut *merkle)?;
|
||||
|
||||
if let Some(file_proof) = maybe_file_proof {
|
||||
let updated_node_list = self.pora_chunks_merkle.fill_with_file_proof(
|
||||
let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof(
|
||||
file_proof,
|
||||
tx.merkle_nodes,
|
||||
tx.start_entry_index,
|
||||
@ -162,6 +243,7 @@ impl LogStoreWrite for LogManager {
|
||||
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
|
||||
///
|
||||
fn put_tx(&mut self, tx: Transaction) -> Result<()> {
|
||||
let mut merkle = self.merkle.write();
|
||||
debug!("put_tx: tx={:?}", tx);
|
||||
let expected_seq = self.next_tx_seq();
|
||||
if tx.seq != expected_seq {
|
||||
@ -176,12 +258,14 @@ impl LogStoreWrite for LogManager {
|
||||
}
|
||||
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
|
||||
// TODO(zz): Should we validate received tx?
|
||||
self.append_subtree_list(tx.merkle_nodes.clone())?;
|
||||
self.commit_merkle(tx.seq)?;
|
||||
self.append_subtree_list(tx.merkle_nodes.clone(), &mut *merkle)?;
|
||||
merkle.commit_merkle(tx.seq)?;
|
||||
debug!(
|
||||
"commit flow root: root={:?}",
|
||||
self.pora_chunks_merkle.root()
|
||||
merkle.pora_chunks_merkle.root()
|
||||
);
|
||||
// Drop the lock because `copy_tx_data` will lock again.
|
||||
drop(merkle);
|
||||
|
||||
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
|
||||
if self.check_tx_completed(old_tx_seq)? {
|
||||
@ -261,9 +345,17 @@ impl LogStoreWrite for LogManager {
|
||||
/// `tx_seq == u64::MAX` is a special case for reverting all transactions.
|
||||
fn revert_to(&mut self, tx_seq: u64) -> Result<Vec<Transaction>> {
|
||||
// FIXME(zz): If this revert is triggered by chain reorg after restarts, this will fail.
|
||||
self.revert_merkle_tree(tx_seq)?;
|
||||
let start_index = self.last_chunk_start_index() * PORA_CHUNK_SIZE as u64
|
||||
+ self.last_chunk_merkle.leaves() as u64;
|
||||
let mut merkle = self.merkle.write();
|
||||
merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
|
||||
merkle.try_initialize(&self.flow_store)?;
|
||||
assert_eq!(
|
||||
Some(*merkle.last_chunk_merkle.root()),
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
|
||||
);
|
||||
let start_index = merkle.last_chunk_start_index() * PORA_CHUNK_SIZE as u64
|
||||
+ merkle.last_chunk_merkle.leaves() as u64;
|
||||
self.flow_store.truncate(start_index)?;
|
||||
let start = if tx_seq != u64::MAX { tx_seq + 1 } else { 0 };
|
||||
self.tx_store.remove_tx_after(start)
|
||||
@ -275,8 +367,10 @@ impl LogStoreWrite for LogManager {
|
||||
data: &ChunkArrayWithProof,
|
||||
) -> Result<bool> {
|
||||
let valid = self.validate_range_proof(tx_seq, data)?;
|
||||
// `merkle` is used in `validate_range_proof`.
|
||||
let mut merkle = self.merkle.write();
|
||||
if valid {
|
||||
let updated_nodes = self
|
||||
let updated_nodes = merkle
|
||||
.pora_chunks_merkle
|
||||
.fill_with_range_proof(data.proof.clone())?;
|
||||
self.flow_store.put_mpt_node_list(updated_nodes)?;
|
||||
@ -421,7 +515,11 @@ impl LogStoreRead for LogManager {
|
||||
&leaves,
|
||||
(data.chunks.start_index + tx.start_entry_index) as usize,
|
||||
)?;
|
||||
Ok(self.pora_chunks_merkle.check_root(&data.proof.root()))
|
||||
Ok(self
|
||||
.merkle
|
||||
.read_recursive()
|
||||
.pora_chunks_merkle
|
||||
.check_root(&data.proof.root()))
|
||||
}
|
||||
|
||||
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>> {
|
||||
@ -455,9 +553,10 @@ impl LogStoreRead for LogManager {
|
||||
}
|
||||
|
||||
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
|
||||
let merkle = self.merkle.read_recursive();
|
||||
Ok((
|
||||
*self.pora_chunks_merkle.root(),
|
||||
self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64,
|
||||
*merkle.pora_chunks_merkle.root(),
|
||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
|
||||
))
|
||||
}
|
||||
}
|
||||
@ -578,19 +677,23 @@ impl LogManager {
|
||||
// update the merkle root
|
||||
pora_chunks_merkle.commit(start_tx_seq);
|
||||
}
|
||||
let merkle = RwLock::new(MerkleManager {
|
||||
pora_chunks_merkle,
|
||||
last_chunk_merkle,
|
||||
});
|
||||
let mut log_manager = Self {
|
||||
db,
|
||||
tx_store,
|
||||
flow_store,
|
||||
pora_chunks_merkle,
|
||||
last_chunk_merkle,
|
||||
merkle,
|
||||
};
|
||||
|
||||
if let Some(tx) = last_tx_to_insert {
|
||||
log_manager.put_tx(tx)?;
|
||||
let mut merkle = log_manager.merkle.write();
|
||||
for (index, h) in extra_leaves {
|
||||
if index < log_manager.pora_chunks_merkle.leaves() {
|
||||
log_manager.pora_chunks_merkle.fill_leaf(index, h);
|
||||
if index < merkle.pora_chunks_merkle.leaves() {
|
||||
merkle.pora_chunks_merkle.fill_leaf(index, h);
|
||||
} else {
|
||||
error!("out of range extra leaf: index={} hash={:?}", index, h);
|
||||
}
|
||||
@ -598,45 +701,19 @@ impl LogManager {
|
||||
} else {
|
||||
assert!(extra_leaves.is_empty());
|
||||
}
|
||||
log_manager.try_initialize()?;
|
||||
log_manager
|
||||
.merkle
|
||||
.write()
|
||||
.try_initialize(&log_manager.flow_store)?;
|
||||
Ok(log_manager)
|
||||
}
|
||||
|
||||
fn try_initialize(&mut self) -> Result<()> {
|
||||
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
|
||||
self.last_chunk_merkle.append(H256::zero());
|
||||
self.pora_chunks_merkle
|
||||
.update_last(*self.last_chunk_merkle.root());
|
||||
} else if self.last_chunk_merkle.leaves() != 0 {
|
||||
let last_chunk_start_index = self.last_chunk_start_index();
|
||||
let last_chunk_data = self.flow_store.get_available_entries(
|
||||
last_chunk_start_index,
|
||||
last_chunk_start_index + PORA_CHUNK_SIZE as u64,
|
||||
)?;
|
||||
for e in last_chunk_data {
|
||||
let start_index = e.start_index - last_chunk_start_index;
|
||||
for i in 0..e.data.len() / ENTRY_SIZE {
|
||||
let index = i + start_index as usize;
|
||||
if index >= self.last_chunk_merkle.leaves() {
|
||||
// We revert the merkle tree before truncate the flow store,
|
||||
// so last_chunk_data may include data that should have been truncated.
|
||||
break;
|
||||
}
|
||||
self.last_chunk_merkle.fill_leaf(
|
||||
index,
|
||||
Sha3Algorithm::leaf(&e.data[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE]),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
|
||||
let merkle = self.merkle.read_recursive();
|
||||
let chunk_index = flow_index / PORA_CHUNK_SIZE as u64;
|
||||
let top_proof = match maybe_root {
|
||||
None => self.pora_chunks_merkle.gen_proof(chunk_index as usize)?,
|
||||
Some(root) => self
|
||||
None => merkle.pora_chunks_merkle.gen_proof(chunk_index as usize)?,
|
||||
Some(root) => merkle
|
||||
.pora_chunks_merkle
|
||||
.at_root_version(&root)?
|
||||
.gen_proof(chunk_index as usize)?,
|
||||
@ -649,17 +726,17 @@ impl LogManager {
|
||||
// and `flow_index` must be within a complete PoRA chunk. For possible future usages,
|
||||
// we'll need to find the flow length at the given root and load a partial chunk
|
||||
// if `flow_index` is in the last chunk.
|
||||
let sub_proof = if chunk_index as usize != self.pora_chunks_merkle.leaves() - 1
|
||||
|| self.last_chunk_merkle.leaves() == 0
|
||||
let sub_proof = if chunk_index as usize != merkle.pora_chunks_merkle.leaves() - 1
|
||||
|| merkle.last_chunk_merkle.leaves() == 0
|
||||
{
|
||||
self.flow_store
|
||||
.gen_proof_in_batch(chunk_index as usize, flow_index as usize % PORA_CHUNK_SIZE)?
|
||||
} else {
|
||||
match maybe_root {
|
||||
None => self
|
||||
None => merkle
|
||||
.last_chunk_merkle
|
||||
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
|
||||
Some(root) => self
|
||||
Some(root) => merkle
|
||||
.last_chunk_merkle
|
||||
.at_root_version(&root)?
|
||||
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
|
||||
@ -668,45 +745,56 @@ impl LogManager {
|
||||
entry_proof(&top_proof, &sub_proof)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
fn append_subtree_list(&mut self, merkle_list: Vec<(usize, DataRoot)>) -> Result<()> {
|
||||
#[instrument(skip(self, merkle))]
|
||||
fn append_subtree_list(
|
||||
&self,
|
||||
merkle_list: Vec<(usize, DataRoot)>,
|
||||
merkle: &mut MerkleManager,
|
||||
) -> Result<()> {
|
||||
if merkle_list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.pad_tx(1 << (merkle_list[0].0 - 1))?;
|
||||
self.pad_tx(1 << (merkle_list[0].0 - 1), &mut *merkle)?;
|
||||
|
||||
let mut batch_root_map = BTreeMap::new();
|
||||
for (subtree_depth, subtree_root) in merkle_list {
|
||||
let subtree_size = 1 << (subtree_depth - 1);
|
||||
if self.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
|
||||
self.last_chunk_merkle
|
||||
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
|
||||
merkle
|
||||
.last_chunk_merkle
|
||||
.append_subtree(subtree_depth, subtree_root)?;
|
||||
if self.last_chunk_merkle.leaves() == subtree_size {
|
||||
if merkle.last_chunk_merkle.leaves() == subtree_size {
|
||||
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
|
||||
self.pora_chunks_merkle
|
||||
.append_subtree(1, *self.last_chunk_merkle.root())?;
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.append_subtree(1, *merkle.last_chunk_merkle.root())?;
|
||||
} else {
|
||||
self.pora_chunks_merkle
|
||||
.update_last(*self.last_chunk_merkle.root());
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
}
|
||||
if self.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
||||
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
|
||||
batch_root_map.insert(
|
||||
self.pora_chunks_merkle.leaves() - 1,
|
||||
(*self.last_chunk_merkle.root(), 1),
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(*merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
self.complete_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1)?;
|
||||
self.complete_last_chunk_merkle(
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
&mut *merkle,
|
||||
)?;
|
||||
}
|
||||
} else {
|
||||
// `last_chunk_merkle` has been padded here, so a subtree should not be across
|
||||
// the chunks boundary.
|
||||
assert_eq!(self.last_chunk_merkle.leaves(), 0);
|
||||
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
|
||||
assert!(subtree_size >= PORA_CHUNK_SIZE);
|
||||
batch_root_map.insert(
|
||||
self.pora_chunks_merkle.leaves(),
|
||||
merkle.pora_chunks_merkle.leaves(),
|
||||
(subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)),
|
||||
);
|
||||
self.pora_chunks_merkle
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
||||
}
|
||||
}
|
||||
@ -714,46 +802,50 @@ impl LogManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
fn pad_tx(&mut self, first_subtree_size: u64) -> Result<()> {
|
||||
#[instrument(skip(self, merkle))]
|
||||
fn pad_tx(&self, first_subtree_size: u64, merkle: &mut MerkleManager) -> Result<()> {
|
||||
// Check if we need to pad the flow.
|
||||
let mut tx_start_flow_index =
|
||||
self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64;
|
||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
||||
let extra = tx_start_flow_index % first_subtree_size;
|
||||
trace!(
|
||||
"before pad_tx {} {}",
|
||||
self.pora_chunks_merkle.leaves(),
|
||||
self.last_chunk_merkle.leaves()
|
||||
merkle.pora_chunks_merkle.leaves(),
|
||||
merkle.last_chunk_merkle.leaves()
|
||||
);
|
||||
if extra != 0 {
|
||||
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
|
||||
let mut root_map = BTreeMap::new();
|
||||
|
||||
// Update the in-memory merkle tree.
|
||||
let last_chunk_pad = if self.last_chunk_merkle.leaves() == 0 {
|
||||
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
|
||||
0
|
||||
} else {
|
||||
(PORA_CHUNK_SIZE - self.last_chunk_merkle.leaves()) * ENTRY_SIZE
|
||||
(PORA_CHUNK_SIZE - merkle.last_chunk_merkle.leaves()) * ENTRY_SIZE
|
||||
};
|
||||
|
||||
let mut completed_chunk_index = None;
|
||||
if pad_data.len() < last_chunk_pad {
|
||||
self.last_chunk_merkle
|
||||
merkle
|
||||
.last_chunk_merkle
|
||||
.append_list(data_to_merkle_leaves(&pad_data)?);
|
||||
self.pora_chunks_merkle
|
||||
.update_last(*self.last_chunk_merkle.root());
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
} else {
|
||||
if last_chunk_pad != 0 {
|
||||
// Pad the last chunk.
|
||||
self.last_chunk_merkle
|
||||
merkle
|
||||
.last_chunk_merkle
|
||||
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
|
||||
self.pora_chunks_merkle
|
||||
.update_last(*self.last_chunk_merkle.root());
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
root_map.insert(
|
||||
self.pora_chunks_merkle.leaves() - 1,
|
||||
(*self.last_chunk_merkle.root(), 1),
|
||||
merkle.pora_chunks_merkle.leaves() - 1,
|
||||
(*merkle.last_chunk_merkle.root(), 1),
|
||||
);
|
||||
completed_chunk_index = Some(self.pora_chunks_merkle.leaves() - 1);
|
||||
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
|
||||
}
|
||||
|
||||
// Pad with more complete chunks.
|
||||
@ -763,8 +855,8 @@ impl LogManager {
|
||||
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
|
||||
.to_vec();
|
||||
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
|
||||
self.pora_chunks_merkle.append(root);
|
||||
root_map.insert(self.pora_chunks_merkle.leaves() - 1, (root, 1));
|
||||
merkle.pora_chunks_merkle.append(root);
|
||||
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
|
||||
start_index += PORA_CHUNK_SIZE;
|
||||
}
|
||||
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
|
||||
@ -782,20 +874,24 @@ impl LogManager {
|
||||
})?;
|
||||
tx_start_flow_index += data_size as u64;
|
||||
if let Some(index) = completed_chunk_index {
|
||||
self.complete_last_chunk_merkle(index)?;
|
||||
self.complete_last_chunk_merkle(index, &mut *merkle)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"after pad_tx {} {}",
|
||||
self.pora_chunks_merkle.leaves(),
|
||||
self.last_chunk_merkle.leaves()
|
||||
merkle.pora_chunks_merkle.leaves(),
|
||||
merkle.last_chunk_merkle.leaves()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn append_entries(&mut self, flow_entry_array: ChunkArray) -> Result<()> {
|
||||
let last_chunk_start_index = self.last_chunk_start_index();
|
||||
fn append_entries(
|
||||
&self,
|
||||
flow_entry_array: ChunkArray,
|
||||
merkle: &mut MerkleManager,
|
||||
) -> Result<()> {
|
||||
let last_chunk_start_index = merkle.last_chunk_start_index();
|
||||
if flow_entry_array.start_index + bytes_to_chunks(flow_entry_array.data.len()) as u64
|
||||
> last_chunk_start_index
|
||||
{
|
||||
@ -822,14 +918,16 @@ impl LogManager {
|
||||
.chunks_exact(ENTRY_SIZE)
|
||||
.enumerate()
|
||||
{
|
||||
self.last_chunk_merkle
|
||||
merkle
|
||||
.last_chunk_merkle
|
||||
.fill_leaf(chunk_start_index + local_index, Sha3Algorithm::leaf(entry));
|
||||
}
|
||||
}
|
||||
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
|
||||
for (chunk_index, chunk_root) in chunk_roots {
|
||||
if chunk_index < self.pora_chunks_merkle.leaves() as u64 {
|
||||
self.pora_chunks_merkle
|
||||
if chunk_index < merkle.pora_chunks_merkle.leaves() as u64 {
|
||||
merkle
|
||||
.pora_chunks_merkle
|
||||
.fill_leaf(chunk_index as usize, chunk_root);
|
||||
} else {
|
||||
// TODO(zz): This assumption may be false in the future.
|
||||
@ -857,56 +955,6 @@ impl LogManager {
|
||||
vec![0; len * ENTRY_SIZE]
|
||||
}
|
||||
|
||||
fn last_chunk_start_index(&self) -> u64 {
|
||||
if self.pora_chunks_merkle.leaves() == 0 {
|
||||
0
|
||||
} else {
|
||||
PORA_CHUNK_SIZE as u64
|
||||
* if self.last_chunk_merkle.leaves() == 0 {
|
||||
// The last chunk is empty and its root hash is not in `pora_chunk_merkle`,
|
||||
// so all chunks in `pora_chunk_merkle` is complete.
|
||||
self.pora_chunks_merkle.leaves()
|
||||
} else {
|
||||
// The last chunk has data, so we need to exclude it from `pora_chunks_merkle`.
|
||||
self.pora_chunks_merkle.leaves() - 1
|
||||
} as u64
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
fn commit_merkle(&mut self, tx_seq: u64) -> Result<()> {
|
||||
self.pora_chunks_merkle.commit(Some(tx_seq));
|
||||
self.last_chunk_merkle.commit(Some(tx_seq));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn revert_merkle_tree(&mut self, tx_seq: u64) -> Result<()> {
|
||||
// Special case for reverting tx_seq == 0
|
||||
if tx_seq == u64::MAX {
|
||||
self.pora_chunks_merkle.reset();
|
||||
self.last_chunk_merkle.reset();
|
||||
self.try_initialize()?;
|
||||
return Ok(());
|
||||
}
|
||||
let old_leaves = self.pora_chunks_merkle.leaves();
|
||||
self.pora_chunks_merkle.revert_to(tx_seq)?;
|
||||
if old_leaves == self.pora_chunks_merkle.leaves() {
|
||||
self.last_chunk_merkle.revert_to(tx_seq)?;
|
||||
} else {
|
||||
// We are reverting to a position before the current last_chunk.
|
||||
self.last_chunk_merkle = self
|
||||
.tx_store
|
||||
.rebuild_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1, tx_seq)?;
|
||||
self.try_initialize()?;
|
||||
assert_eq!(
|
||||
Some(*self.last_chunk_merkle.root()),
|
||||
self.pora_chunks_merkle
|
||||
.leaf_at(self.pora_chunks_merkle.leaves() - 1)?
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn flow_store(&self) -> &FlowStore {
|
||||
&self.flow_store
|
||||
@ -960,6 +1008,7 @@ impl LogManager {
|
||||
}
|
||||
|
||||
fn copy_tx_data(&mut self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
||||
let mut merkle = self.merkle.write();
|
||||
// We have all the data need for this tx, so just copy them.
|
||||
let old_tx = self
|
||||
.get_tx_by_seq_number(from_tx_seq)?
|
||||
@ -992,7 +1041,7 @@ impl LogManager {
|
||||
for (_, offset) in &to_tx_offset_list {
|
||||
let mut data = batch_data.clone();
|
||||
data.start_index += offset;
|
||||
self.append_entries(data)?;
|
||||
self.append_entries(data, &mut *merkle)?;
|
||||
}
|
||||
}
|
||||
// num_entries() includes the rear padding data, so no need for more padding.
|
||||
@ -1007,9 +1056,9 @@ impl LogManager {
|
||||
/// we can still provide proof for known data in it.
|
||||
/// Another choice is to insert these subtrees earlier in `put_tx`. To insert them here can
|
||||
/// batch them and avoid inserting for the subtrees with all data known.
|
||||
fn complete_last_chunk_merkle(&mut self, index: usize) -> Result<()> {
|
||||
let subtree_list = self.last_chunk_merkle.get_subtrees();
|
||||
self.last_chunk_merkle =
|
||||
fn complete_last_chunk_merkle(&self, index: usize, merkle: &mut MerkleManager) -> Result<()> {
|
||||
let subtree_list = merkle.last_chunk_merkle.get_subtrees();
|
||||
merkle.last_chunk_merkle =
|
||||
Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None);
|
||||
|
||||
// Only insert non-leave subtrees. The leave data should have been available.
|
||||
|
@ -206,11 +206,11 @@ pub trait FlowWrite {
|
||||
/// Append data to the flow. `start_index` is included in `ChunkArray`, so
|
||||
/// it's possible to append arrays in any place.
|
||||
/// Return the list of completed chunks.
|
||||
fn append_entries(&mut self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>>;
|
||||
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>>;
|
||||
|
||||
/// Remove all the entries after `start_index`.
|
||||
/// This is used to remove deprecated data in case of chain reorg.
|
||||
fn truncate(&mut self, start_index: u64) -> Result<()>;
|
||||
fn truncate(&self, start_index: u64) -> Result<()>;
|
||||
|
||||
/// Update the shard config.
|
||||
fn update_shard_config(&mut self, shard_config: ShardConfig);
|
||||
|
Loading…
Reference in New Issue
Block a user