From fad3b93a31d745a369c354326609dc0dce3f21e7 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 17 Jun 2024 23:14:01 +0800 Subject: [PATCH] Use inner lock in storage. --- Cargo.lock | 39 +-- node/storage/Cargo.toml | 1 + node/storage/src/log_store/flow_store.rs | 33 +- node/storage/src/log_store/log_manager.rs | 351 ++++++++++++---------- node/storage/src/log_store/mod.rs | 4 +- 5 files changed, 242 insertions(+), 186 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84d2561..9033c97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2286,7 +2286,7 @@ version = "0.1.0" dependencies = [ "hashlink 0.8.4", "network", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "priority-queue", "rand 0.8.5", "shared_types", @@ -3342,7 +3342,7 @@ dependencies = [ "hyper", "jsonrpsee-types", "lazy_static", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "rand 0.8.5", "rustc-hash", "serde", @@ -3531,7 +3531,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf7a85fe66f9ff9cd74e169fdd2c94c6e1e74c412c99a73b4df3200b5d3760b2" dependencies = [ "kvdb", - "parking_lot 0.12.1", + "parking_lot 0.12.3", ] [[package]] @@ -3542,7 +3542,7 @@ checksum = "b644c70b92285f66bfc2032922a79000ea30af7bc2ab31902992a5dcb9b434f6" dependencies = [ "kvdb", "num_cpus", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "regex", "rocksdb", "smallvec", @@ -3650,7 +3650,7 @@ dependencies = [ "libp2p-websocket", "libp2p-yamux", "multiaddr 0.14.0", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "pin-project 1.1.5", "rand 0.7.3", "smallvec", @@ -3729,7 +3729,7 @@ dependencies = [ "multiaddr 0.14.0", "multihash 0.16.3", "multistream-select 0.11.0", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "pin-project 1.1.5", "prost 0.9.0", "prost-build 0.9.0", @@ -3764,7 +3764,7 @@ dependencies = [ "multiaddr 0.14.0", "multihash 0.16.3", "multistream-select 0.11.0", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "pin-project 1.1.5", "prost 0.10.4", "prost-build 0.10.4", @@ -3800,7 +3800,7 @@ dependencies = [ "futures", "libp2p-core 0.33.0", "log", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "smallvec", "trust-dns-resolver", ] @@ -3949,7 +3949,7 @@ dependencies = [ "libp2p-core 0.33.0", "log", "nohash-hasher", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "rand 0.7.3", "smallvec", "unsigned-varint", @@ -4177,7 +4177,7 @@ dependencies = [ "futures-rustls", "libp2p-core 0.33.0", "log", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "quicksink", "rw-stream-sink 0.3.0", "soketto", @@ -4193,7 +4193,7 @@ checksum = "8fe653639ad74877c759720febb0cbcbf4caa221adde4eed2d3126ce5c6f381f" dependencies = [ "futures", "libp2p-core 0.33.0", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "thiserror", "yamux", ] @@ -4688,7 +4688,7 @@ dependencies = [ "libp2p", "lighthouse_metrics", "lru", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "prometheus-client", "rand 0.8.5", "regex", @@ -5008,9 +5008,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core 0.9.9", @@ -5562,7 +5562,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "protobuf", "thiserror", ] @@ -6884,6 +6884,7 @@ dependencies = [ "kvdb-rocksdb", "merkle_light", "merkle_tree", + "parking_lot 0.12.3", "rand 0.8.5", "rayon", "serde", @@ -6917,7 +6918,7 @@ checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" dependencies = [ "new_debug_unreachable", "once_cell", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "phf_shared 0.10.0", "precomputed-hash", ] @@ -7271,7 +7272,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "pin-project-lite 0.2.14", "signal-hook-registry", "socket2 0.5.6", @@ -7605,7 +7606,7 @@ dependencies = [ "lazy_static", "log", "lru-cache", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "resolv-conf", "smallvec", "thiserror", @@ -8315,7 +8316,7 @@ dependencies = [ "futures", "log", "nohash-hasher", - "parking_lot 0.12.1", + "parking_lot 0.12.3", "rand 0.8.5", "static_assertions", ] diff --git a/node/storage/Cargo.toml b/node/storage/Cargo.toml index 43547d0..9eff9e0 100644 --- a/node/storage/Cargo.toml +++ b/node/storage/Cargo.toml @@ -27,6 +27,7 @@ static_assertions = "1.1" tiny-keccak = "*" itertools = "0.13.0" serde = { version = "1.0.197", features = ["derive"] } +parking_lot = "0.12.3" [dev-dependencies] tempdir = "0.3.7" diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index c4f63f0..7566078 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -10,6 +10,7 @@ use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; use append_merkle::{MerkleTreeInitialData, MerkleTreeRead}; use itertools::Itertools; +use parking_lot::RwLock; use shared_types::{ChunkArray, DataRoot, FlowProof}; use ssz::{Decode, Encode}; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; @@ -24,10 +25,10 @@ use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_S pub struct FlowStore { db: FlowDBStore, // TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk. - to_seal_set: BTreeMap, + to_seal_set: RwLock>, // Data sealing is an asynchronized process. // The sealing service uses the version number to distinguish if revert happens during sealing. - to_seal_version: usize, + to_seal_version: RwLock, config: FlowConfig, } @@ -36,7 +37,7 @@ impl FlowStore { Self { db: FlowDBStore::new(db), to_seal_set: Default::default(), - to_seal_version: 0, + to_seal_version: RwLock::new(0), config, } } @@ -214,7 +215,8 @@ impl FlowRead for FlowStore { impl FlowWrite for FlowStore { /// Return the roots of completed chunks. The order is guaranteed to be increasing /// by chunk index. - fn append_entries(&mut self, data: ChunkArray) -> Result> { + fn append_entries(&self, data: ChunkArray) -> Result> { + let mut to_seal_set = self.to_seal_set.write(); trace!("append_entries: {} {}", data.start_index, data.data.len()); if data.data.len() % BYTES_PER_SECTOR != 0 { bail!("append_entries: invalid data size, len={}", data.data.len()); @@ -246,9 +248,9 @@ impl FlowWrite for FlowStore { chunk.data, )?; completed_seals.into_iter().for_each(|x| { - self.to_seal_set.insert( + to_seal_set.insert( chunk_index as usize * SEALS_PER_LOAD + x as usize, - self.to_seal_version, + *self.to_seal_version.read(), ); }); @@ -257,15 +259,16 @@ impl FlowWrite for FlowStore { self.db.put_entry_batch_list(batch_list) } - fn truncate(&mut self, start_index: u64) -> crate::error::Result<()> { + fn truncate(&self, start_index: u64) -> crate::error::Result<()> { + let mut to_seal_set = self.to_seal_set.write(); + let mut to_seal_version = self.to_seal_version.write(); let to_reseal = self.db.truncate(start_index, self.config.batch_size)?; - self.to_seal_set - .split_off(&(start_index as usize / SECTORS_PER_SEAL)); - self.to_seal_version += 1; + to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL)); + *to_seal_version += 1; to_reseal.into_iter().for_each(|x| { - self.to_seal_set.insert(x, self.to_seal_version); + to_seal_set.insert(x, *to_seal_version); }); Ok(()) } @@ -277,7 +280,8 @@ impl FlowWrite for FlowStore { impl FlowSeal for FlowStore { fn pull_seal_chunk(&self, seal_index_max: usize) -> Result>> { - let mut to_seal_iter = self.to_seal_set.iter(); + let to_seal_set = self.to_seal_set.read(); + let mut to_seal_iter = to_seal_set.iter(); let (&first_index, &first_version) = try_option!(to_seal_iter.next()); if first_index >= seal_index_max { return Ok(None); @@ -310,8 +314,9 @@ impl FlowSeal for FlowStore { } fn submit_seal_result(&mut self, answers: Vec) -> Result<()> { + let mut to_seal_set = self.to_seal_set.write(); let is_consistent = |answer: &SealAnswer| { - self.to_seal_set + to_seal_set .get(&(answer.seal_index as usize)) .map_or(false, |cur_ver| cur_ver == &answer.version) }; @@ -337,7 +342,7 @@ impl FlowSeal for FlowStore { debug!("Seal chunks: indices = {:?}", removed_seal_index); for idx in removed_seal_index.into_iter() { - self.to_seal_set.remove(&idx); + to_seal_set.remove(&idx); } self.db.put_entry_raw(updated_chunk)?; diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 7d1b1c7..b86d7a2 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -10,6 +10,7 @@ use ethereum_types::H256; use kvdb_rocksdb::{Database, DatabaseConfig}; use merkle_light::merkle::{log2_pow2, MerkleTree}; use merkle_tree::RawLeafSha3Algorithm; +use parking_lot::RwLock; use rayon::iter::ParallelIterator; use rayon::prelude::ParallelSlice; use shared_types::{ @@ -48,6 +49,10 @@ pub struct LogManager { pub(crate) db: Arc, tx_store: TransactionStore, flow_store: FlowStore, + merkle: RwLock, +} + +struct MerkleManager { // TODO(zz): Refactor the in-memory merkle and in-disk storage together. pora_chunks_merkle: Merkle, /// The in-memory structure of the sub merkle tree of the last chunk. @@ -55,6 +60,80 @@ pub struct LogManager { last_chunk_merkle: Merkle, } +impl MerkleManager { + fn last_chunk_start_index(&self) -> u64 { + if self.pora_chunks_merkle.leaves() == 0 { + 0 + } else { + PORA_CHUNK_SIZE as u64 + * if self.last_chunk_merkle.leaves() == 0 { + // The last chunk is empty and its root hash is not in `pora_chunk_merkle`, + // so all chunks in `pora_chunk_merkle` is complete. + self.pora_chunks_merkle.leaves() + } else { + // The last chunk has data, so we need to exclude it from `pora_chunks_merkle`. + self.pora_chunks_merkle.leaves() - 1 + } as u64 + } + } + + #[instrument(skip(self))] + fn commit_merkle(&mut self, tx_seq: u64) -> Result<()> { + self.pora_chunks_merkle.commit(Some(tx_seq)); + self.last_chunk_merkle.commit(Some(tx_seq)); + Ok(()) + } + + fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> { + // Special case for reverting tx_seq == 0 + if tx_seq == u64::MAX { + self.pora_chunks_merkle.reset(); + self.last_chunk_merkle.reset(); + return Ok(()); + } + let old_leaves = self.pora_chunks_merkle.leaves(); + self.pora_chunks_merkle.revert_to(tx_seq)?; + if old_leaves == self.pora_chunks_merkle.leaves() { + self.last_chunk_merkle.revert_to(tx_seq)?; + } else { + // We are reverting to a position before the current last_chunk. + self.last_chunk_merkle = + tx_store.rebuild_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1, tx_seq)?; + } + Ok(()) + } + + fn try_initialize(&mut self, flow_store: &FlowStore) -> Result<()> { + if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 { + self.last_chunk_merkle.append(H256::zero()); + self.pora_chunks_merkle + .update_last(*self.last_chunk_merkle.root()); + } else if self.last_chunk_merkle.leaves() != 0 { + let last_chunk_start_index = self.last_chunk_start_index(); + let last_chunk_data = flow_store.get_available_entries( + last_chunk_start_index, + last_chunk_start_index + PORA_CHUNK_SIZE as u64, + )?; + for e in last_chunk_data { + let start_index = e.start_index - last_chunk_start_index; + for i in 0..e.data.len() / ENTRY_SIZE { + let index = i + start_index as usize; + if index >= self.last_chunk_merkle.leaves() { + // We revert the merkle tree before truncate the flow store, + // so last_chunk_data may include data that should have been truncated. + break; + } + self.last_chunk_merkle.fill_leaf( + index, + Sha3Algorithm::leaf(&e.data[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE]), + ); + } + } + } + Ok(()) + } +} + #[derive(Clone, Default)] pub struct LogConfig { pub flow: FlowConfig, @@ -72,6 +151,7 @@ impl LogStoreInner for LogManager { impl LogStoreChunkWrite for LogManager { fn put_chunks(&mut self, tx_seq: u64, chunks: ChunkArray) -> Result<()> { + let mut merkle = self.merkle.write(); let tx = self .tx_store .get_tx_by_seq_number(tx_seq)? @@ -90,7 +170,7 @@ impl LogStoreChunkWrite for LogManager { // TODO: Use another struct to avoid confusion. let mut flow_entry_array = chunks; flow_entry_array.start_index += tx.start_entry_index; - self.append_entries(flow_entry_array)?; + self.append_entries(flow_entry_array, &mut *merkle)?; Ok(()) } @@ -101,6 +181,7 @@ impl LogStoreChunkWrite for LogManager { chunks: ChunkArray, maybe_file_proof: Option, ) -> Result { + let mut merkle = self.merkle.write(); let tx = self .tx_store .get_tx_by_seq_number(tx_seq)? @@ -122,10 +203,10 @@ impl LogStoreChunkWrite for LogManager { // TODO: Use another struct to avoid confusion. let mut flow_entry_array = chunks; flow_entry_array.start_index += tx.start_entry_index; - self.append_entries(flow_entry_array)?; + self.append_entries(flow_entry_array, &mut *merkle)?; if let Some(file_proof) = maybe_file_proof { - let updated_node_list = self.pora_chunks_merkle.fill_with_file_proof( + let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof( file_proof, tx.merkle_nodes, tx.start_entry_index, @@ -162,6 +243,7 @@ impl LogStoreWrite for LogManager { /// `put_tx` for the last tx when we restart the node to ensure that it succeeds. /// fn put_tx(&mut self, tx: Transaction) -> Result<()> { + let mut merkle = self.merkle.write(); debug!("put_tx: tx={:?}", tx); let expected_seq = self.next_tx_seq(); if tx.seq != expected_seq { @@ -176,12 +258,14 @@ impl LogStoreWrite for LogManager { } let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned(); // TODO(zz): Should we validate received tx? - self.append_subtree_list(tx.merkle_nodes.clone())?; - self.commit_merkle(tx.seq)?; + self.append_subtree_list(tx.merkle_nodes.clone(), &mut *merkle)?; + merkle.commit_merkle(tx.seq)?; debug!( "commit flow root: root={:?}", - self.pora_chunks_merkle.root() + merkle.pora_chunks_merkle.root() ); + // Drop the lock because `copy_tx_data` will lock again. + drop(merkle); if let Some(old_tx_seq) = maybe_same_data_tx_seq { if self.check_tx_completed(old_tx_seq)? { @@ -261,9 +345,17 @@ impl LogStoreWrite for LogManager { /// `tx_seq == u64::MAX` is a special case for reverting all transactions. fn revert_to(&mut self, tx_seq: u64) -> Result> { // FIXME(zz): If this revert is triggered by chain reorg after restarts, this will fail. - self.revert_merkle_tree(tx_seq)?; - let start_index = self.last_chunk_start_index() * PORA_CHUNK_SIZE as u64 - + self.last_chunk_merkle.leaves() as u64; + let mut merkle = self.merkle.write(); + merkle.revert_merkle_tree(tx_seq, &self.tx_store)?; + merkle.try_initialize(&self.flow_store)?; + assert_eq!( + Some(*merkle.last_chunk_merkle.root()), + merkle + .pora_chunks_merkle + .leaf_at(merkle.pora_chunks_merkle.leaves() - 1)? + ); + let start_index = merkle.last_chunk_start_index() * PORA_CHUNK_SIZE as u64 + + merkle.last_chunk_merkle.leaves() as u64; self.flow_store.truncate(start_index)?; let start = if tx_seq != u64::MAX { tx_seq + 1 } else { 0 }; self.tx_store.remove_tx_after(start) @@ -275,8 +367,10 @@ impl LogStoreWrite for LogManager { data: &ChunkArrayWithProof, ) -> Result { let valid = self.validate_range_proof(tx_seq, data)?; + // `merkle` is used in `validate_range_proof`. + let mut merkle = self.merkle.write(); if valid { - let updated_nodes = self + let updated_nodes = merkle .pora_chunks_merkle .fill_with_range_proof(data.proof.clone())?; self.flow_store.put_mpt_node_list(updated_nodes)?; @@ -421,7 +515,11 @@ impl LogStoreRead for LogManager { &leaves, (data.chunks.start_index + tx.start_entry_index) as usize, )?; - Ok(self.pora_chunks_merkle.check_root(&data.proof.root())) + Ok(self + .merkle + .read_recursive() + .pora_chunks_merkle + .check_root(&data.proof.root())) } fn get_sync_progress(&self) -> Result> { @@ -455,9 +553,10 @@ impl LogStoreRead for LogManager { } fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> { + let merkle = self.merkle.read_recursive(); Ok(( - *self.pora_chunks_merkle.root(), - self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64, + *merkle.pora_chunks_merkle.root(), + merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64, )) } } @@ -578,19 +677,23 @@ impl LogManager { // update the merkle root pora_chunks_merkle.commit(start_tx_seq); } + let merkle = RwLock::new(MerkleManager { + pora_chunks_merkle, + last_chunk_merkle, + }); let mut log_manager = Self { db, tx_store, flow_store, - pora_chunks_merkle, - last_chunk_merkle, + merkle, }; if let Some(tx) = last_tx_to_insert { log_manager.put_tx(tx)?; + let mut merkle = log_manager.merkle.write(); for (index, h) in extra_leaves { - if index < log_manager.pora_chunks_merkle.leaves() { - log_manager.pora_chunks_merkle.fill_leaf(index, h); + if index < merkle.pora_chunks_merkle.leaves() { + merkle.pora_chunks_merkle.fill_leaf(index, h); } else { error!("out of range extra leaf: index={} hash={:?}", index, h); } @@ -598,45 +701,19 @@ impl LogManager { } else { assert!(extra_leaves.is_empty()); } - log_manager.try_initialize()?; + log_manager + .merkle + .write() + .try_initialize(&log_manager.flow_store)?; Ok(log_manager) } - fn try_initialize(&mut self) -> Result<()> { - if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 { - self.last_chunk_merkle.append(H256::zero()); - self.pora_chunks_merkle - .update_last(*self.last_chunk_merkle.root()); - } else if self.last_chunk_merkle.leaves() != 0 { - let last_chunk_start_index = self.last_chunk_start_index(); - let last_chunk_data = self.flow_store.get_available_entries( - last_chunk_start_index, - last_chunk_start_index + PORA_CHUNK_SIZE as u64, - )?; - for e in last_chunk_data { - let start_index = e.start_index - last_chunk_start_index; - for i in 0..e.data.len() / ENTRY_SIZE { - let index = i + start_index as usize; - if index >= self.last_chunk_merkle.leaves() { - // We revert the merkle tree before truncate the flow store, - // so last_chunk_data may include data that should have been truncated. - break; - } - self.last_chunk_merkle.fill_leaf( - index, - Sha3Algorithm::leaf(&e.data[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE]), - ); - } - } - } - Ok(()) - } - fn gen_proof(&self, flow_index: u64, maybe_root: Option) -> Result { + let merkle = self.merkle.read_recursive(); let chunk_index = flow_index / PORA_CHUNK_SIZE as u64; let top_proof = match maybe_root { - None => self.pora_chunks_merkle.gen_proof(chunk_index as usize)?, - Some(root) => self + None => merkle.pora_chunks_merkle.gen_proof(chunk_index as usize)?, + Some(root) => merkle .pora_chunks_merkle .at_root_version(&root)? .gen_proof(chunk_index as usize)?, @@ -649,17 +726,17 @@ impl LogManager { // and `flow_index` must be within a complete PoRA chunk. For possible future usages, // we'll need to find the flow length at the given root and load a partial chunk // if `flow_index` is in the last chunk. - let sub_proof = if chunk_index as usize != self.pora_chunks_merkle.leaves() - 1 - || self.last_chunk_merkle.leaves() == 0 + let sub_proof = if chunk_index as usize != merkle.pora_chunks_merkle.leaves() - 1 + || merkle.last_chunk_merkle.leaves() == 0 { self.flow_store .gen_proof_in_batch(chunk_index as usize, flow_index as usize % PORA_CHUNK_SIZE)? } else { match maybe_root { - None => self + None => merkle .last_chunk_merkle .gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?, - Some(root) => self + Some(root) => merkle .last_chunk_merkle .at_root_version(&root)? .gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?, @@ -668,45 +745,56 @@ impl LogManager { entry_proof(&top_proof, &sub_proof) } - #[instrument(skip(self))] - fn append_subtree_list(&mut self, merkle_list: Vec<(usize, DataRoot)>) -> Result<()> { + #[instrument(skip(self, merkle))] + fn append_subtree_list( + &self, + merkle_list: Vec<(usize, DataRoot)>, + merkle: &mut MerkleManager, + ) -> Result<()> { if merkle_list.is_empty() { return Ok(()); } - self.pad_tx(1 << (merkle_list[0].0 - 1))?; + self.pad_tx(1 << (merkle_list[0].0 - 1), &mut *merkle)?; let mut batch_root_map = BTreeMap::new(); for (subtree_depth, subtree_root) in merkle_list { let subtree_size = 1 << (subtree_depth - 1); - if self.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE { - self.last_chunk_merkle + if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE { + merkle + .last_chunk_merkle .append_subtree(subtree_depth, subtree_root)?; - if self.last_chunk_merkle.leaves() == subtree_size { + if merkle.last_chunk_merkle.leaves() == subtree_size { // `last_chunk_merkle` was empty, so this is a new leaf in the top_tree. - self.pora_chunks_merkle - .append_subtree(1, *self.last_chunk_merkle.root())?; + merkle + .pora_chunks_merkle + .append_subtree(1, *merkle.last_chunk_merkle.root())?; } else { - self.pora_chunks_merkle - .update_last(*self.last_chunk_merkle.root()); + merkle + .pora_chunks_merkle + .update_last(*merkle.last_chunk_merkle.root()); } - if self.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE { + if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE { batch_root_map.insert( - self.pora_chunks_merkle.leaves() - 1, - (*self.last_chunk_merkle.root(), 1), + merkle.pora_chunks_merkle.leaves() - 1, + (*merkle.last_chunk_merkle.root(), 1), ); - self.complete_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1)?; + self.complete_last_chunk_merkle( + merkle.pora_chunks_merkle.leaves() - 1, + &mut *merkle, + )?; } } else { // `last_chunk_merkle` has been padded here, so a subtree should not be across // the chunks boundary. - assert_eq!(self.last_chunk_merkle.leaves(), 0); + assert_eq!(merkle.last_chunk_merkle.leaves(), 0); assert!(subtree_size >= PORA_CHUNK_SIZE); batch_root_map.insert( - self.pora_chunks_merkle.leaves(), + merkle.pora_chunks_merkle.leaves(), (subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)), ); - self.pora_chunks_merkle + merkle + .pora_chunks_merkle .append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?; } } @@ -714,46 +802,50 @@ impl LogManager { Ok(()) } - #[instrument(skip(self))] - fn pad_tx(&mut self, first_subtree_size: u64) -> Result<()> { + #[instrument(skip(self, merkle))] + fn pad_tx(&self, first_subtree_size: u64, merkle: &mut MerkleManager) -> Result<()> { // Check if we need to pad the flow. let mut tx_start_flow_index = - self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64; + merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64; let extra = tx_start_flow_index % first_subtree_size; trace!( "before pad_tx {} {}", - self.pora_chunks_merkle.leaves(), - self.last_chunk_merkle.leaves() + merkle.pora_chunks_merkle.leaves(), + merkle.last_chunk_merkle.leaves() ); if extra != 0 { for pad_data in Self::padding((first_subtree_size - extra) as usize) { let mut root_map = BTreeMap::new(); // Update the in-memory merkle tree. - let last_chunk_pad = if self.last_chunk_merkle.leaves() == 0 { + let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 { 0 } else { - (PORA_CHUNK_SIZE - self.last_chunk_merkle.leaves()) * ENTRY_SIZE + (PORA_CHUNK_SIZE - merkle.last_chunk_merkle.leaves()) * ENTRY_SIZE }; let mut completed_chunk_index = None; if pad_data.len() < last_chunk_pad { - self.last_chunk_merkle + merkle + .last_chunk_merkle .append_list(data_to_merkle_leaves(&pad_data)?); - self.pora_chunks_merkle - .update_last(*self.last_chunk_merkle.root()); + merkle + .pora_chunks_merkle + .update_last(*merkle.last_chunk_merkle.root()); } else { if last_chunk_pad != 0 { // Pad the last chunk. - self.last_chunk_merkle + merkle + .last_chunk_merkle .append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?); - self.pora_chunks_merkle - .update_last(*self.last_chunk_merkle.root()); + merkle + .pora_chunks_merkle + .update_last(*merkle.last_chunk_merkle.root()); root_map.insert( - self.pora_chunks_merkle.leaves() - 1, - (*self.last_chunk_merkle.root(), 1), + merkle.pora_chunks_merkle.leaves() - 1, + (*merkle.last_chunk_merkle.root(), 1), ); - completed_chunk_index = Some(self.pora_chunks_merkle.leaves() - 1); + completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1); } // Pad with more complete chunks. @@ -763,8 +855,8 @@ impl LogManager { ..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] .to_vec(); let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root(); - self.pora_chunks_merkle.append(root); - root_map.insert(self.pora_chunks_merkle.leaves() - 1, (root, 1)); + merkle.pora_chunks_merkle.append(root); + root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1)); start_index += PORA_CHUNK_SIZE; } assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); @@ -782,20 +874,24 @@ impl LogManager { })?; tx_start_flow_index += data_size as u64; if let Some(index) = completed_chunk_index { - self.complete_last_chunk_merkle(index)?; + self.complete_last_chunk_merkle(index, &mut *merkle)?; } } } trace!( "after pad_tx {} {}", - self.pora_chunks_merkle.leaves(), - self.last_chunk_merkle.leaves() + merkle.pora_chunks_merkle.leaves(), + merkle.last_chunk_merkle.leaves() ); Ok(()) } - fn append_entries(&mut self, flow_entry_array: ChunkArray) -> Result<()> { - let last_chunk_start_index = self.last_chunk_start_index(); + fn append_entries( + &self, + flow_entry_array: ChunkArray, + merkle: &mut MerkleManager, + ) -> Result<()> { + let last_chunk_start_index = merkle.last_chunk_start_index(); if flow_entry_array.start_index + bytes_to_chunks(flow_entry_array.data.len()) as u64 > last_chunk_start_index { @@ -822,14 +918,16 @@ impl LogManager { .chunks_exact(ENTRY_SIZE) .enumerate() { - self.last_chunk_merkle + merkle + .last_chunk_merkle .fill_leaf(chunk_start_index + local_index, Sha3Algorithm::leaf(entry)); } } let chunk_roots = self.flow_store.append_entries(flow_entry_array)?; for (chunk_index, chunk_root) in chunk_roots { - if chunk_index < self.pora_chunks_merkle.leaves() as u64 { - self.pora_chunks_merkle + if chunk_index < merkle.pora_chunks_merkle.leaves() as u64 { + merkle + .pora_chunks_merkle .fill_leaf(chunk_index as usize, chunk_root); } else { // TODO(zz): This assumption may be false in the future. @@ -857,56 +955,6 @@ impl LogManager { vec![0; len * ENTRY_SIZE] } - fn last_chunk_start_index(&self) -> u64 { - if self.pora_chunks_merkle.leaves() == 0 { - 0 - } else { - PORA_CHUNK_SIZE as u64 - * if self.last_chunk_merkle.leaves() == 0 { - // The last chunk is empty and its root hash is not in `pora_chunk_merkle`, - // so all chunks in `pora_chunk_merkle` is complete. - self.pora_chunks_merkle.leaves() - } else { - // The last chunk has data, so we need to exclude it from `pora_chunks_merkle`. - self.pora_chunks_merkle.leaves() - 1 - } as u64 - } - } - - #[instrument(skip(self))] - fn commit_merkle(&mut self, tx_seq: u64) -> Result<()> { - self.pora_chunks_merkle.commit(Some(tx_seq)); - self.last_chunk_merkle.commit(Some(tx_seq)); - Ok(()) - } - - fn revert_merkle_tree(&mut self, tx_seq: u64) -> Result<()> { - // Special case for reverting tx_seq == 0 - if tx_seq == u64::MAX { - self.pora_chunks_merkle.reset(); - self.last_chunk_merkle.reset(); - self.try_initialize()?; - return Ok(()); - } - let old_leaves = self.pora_chunks_merkle.leaves(); - self.pora_chunks_merkle.revert_to(tx_seq)?; - if old_leaves == self.pora_chunks_merkle.leaves() { - self.last_chunk_merkle.revert_to(tx_seq)?; - } else { - // We are reverting to a position before the current last_chunk. - self.last_chunk_merkle = self - .tx_store - .rebuild_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1, tx_seq)?; - self.try_initialize()?; - assert_eq!( - Some(*self.last_chunk_merkle.root()), - self.pora_chunks_merkle - .leaf_at(self.pora_chunks_merkle.leaves() - 1)? - ); - } - Ok(()) - } - #[cfg(test)] pub fn flow_store(&self) -> &FlowStore { &self.flow_store @@ -960,6 +1008,7 @@ impl LogManager { } fn copy_tx_data(&mut self, from_tx_seq: u64, to_tx_seq_list: Vec) -> Result<()> { + let mut merkle = self.merkle.write(); // We have all the data need for this tx, so just copy them. let old_tx = self .get_tx_by_seq_number(from_tx_seq)? @@ -992,7 +1041,7 @@ impl LogManager { for (_, offset) in &to_tx_offset_list { let mut data = batch_data.clone(); data.start_index += offset; - self.append_entries(data)?; + self.append_entries(data, &mut *merkle)?; } } // num_entries() includes the rear padding data, so no need for more padding. @@ -1007,9 +1056,9 @@ impl LogManager { /// we can still provide proof for known data in it. /// Another choice is to insert these subtrees earlier in `put_tx`. To insert them here can /// batch them and avoid inserting for the subtrees with all data known. - fn complete_last_chunk_merkle(&mut self, index: usize) -> Result<()> { - let subtree_list = self.last_chunk_merkle.get_subtrees(); - self.last_chunk_merkle = + fn complete_last_chunk_merkle(&self, index: usize, merkle: &mut MerkleManager) -> Result<()> { + let subtree_list = merkle.last_chunk_merkle.get_subtrees(); + merkle.last_chunk_merkle = Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None); // Only insert non-leave subtrees. The leave data should have been available. diff --git a/node/storage/src/log_store/mod.rs b/node/storage/src/log_store/mod.rs index 0c5022d..d47f411 100644 --- a/node/storage/src/log_store/mod.rs +++ b/node/storage/src/log_store/mod.rs @@ -206,11 +206,11 @@ pub trait FlowWrite { /// Append data to the flow. `start_index` is included in `ChunkArray`, so /// it's possible to append arrays in any place. /// Return the list of completed chunks. - fn append_entries(&mut self, data: ChunkArray) -> Result>; + fn append_entries(&self, data: ChunkArray) -> Result>; /// Remove all the entries after `start_index`. /// This is used to remove deprecated data in case of chain reorg. - fn truncate(&mut self, start_index: u64) -> Result<()>; + fn truncate(&self, start_index: u64) -> Result<()>; /// Update the shard config. fn update_shard_config(&mut self, shard_config: ShardConfig);