From 9c2f6e9d7d2c59da9b3ff6ec8f7b33a912b8f551 Mon Sep 17 00:00:00 2001 From: Peilun Li Date: Mon, 14 Oct 2024 23:35:59 +0800 Subject: [PATCH] Initialize LogManager with NodeManager. --- common/append_merkle/src/lib.rs | 55 +++++++++++---------- common/append_merkle/src/node_manager.rs | 14 ++++-- node/storage/src/log_store/log_manager.rs | 58 +++++------------------ 3 files changed, 51 insertions(+), 76 deletions(-) diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index 70faf2e..12f8fcf 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -71,49 +71,33 @@ impl> AppendMerkleTree { pub fn new_with_subtrees( node_db: Arc>, node_cache_capacity: usize, - initial_data: MerkleTreeInitialData, leaf_height: usize, - start_tx_seq: Option, ) -> Result { let mut merkle = Self { - node_manager: NodeManager::new(node_db, node_cache_capacity), + node_manager: NodeManager::new(node_db, node_cache_capacity)?, delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, leaf_height, _a: Default::default(), }; - merkle.node_manager.add_layer(); - if initial_data.subtree_list.is_empty() { - if let Some(seq) = start_tx_seq { - merkle.delta_nodes_map.insert( - seq, - DeltaNodes { - right_most_nodes: vec![], - }, - ); - } - return Ok(merkle); - } - merkle.append_subtree_list(initial_data.subtree_list)?; - merkle.commit(start_tx_seq); - for (index, h) in initial_data.known_leaves { - merkle.fill_leaf(index, h); - } - for (layer_index, position, h) in initial_data.extra_mpt_nodes { - // TODO: Delete duplicate nodes from DB. - merkle.update_node(layer_index, position, h); + if merkle.height() == 0 { + merkle.node_manager.start_transaction(); + merkle.node_manager.add_layer(); + merkle.node_manager.commit(); } Ok(merkle) } /// This is only used for the last chunk, so `leaf_height` is always 0 so far. pub fn new_with_depth(leaves: Vec, depth: usize, start_tx_seq: Option) -> Self { + let mut node_manager = NodeManager::new_dummy(); + node_manager.start_transaction(); if leaves.is_empty() { // Create an empty merkle tree with `depth`. let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new_dummy(), + node_manager, delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -135,7 +119,7 @@ impl> AppendMerkleTree { } else { let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new_dummy(), + node_manager, delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -569,6 +553,7 @@ impl> AppendMerkleTree { // Any previous state of an empty tree is always empty. return Ok(()); } + self.node_manager.start_transaction(); let delta_nodes = self .delta_nodes_map .get(&tx_seq) @@ -585,6 +570,24 @@ impl> AppendMerkleTree { self.update_node(height, *last_index, right_most_node.clone()) } self.clear_after(tx_seq); + self.node_manager.commit(); + Ok(()) + } + + // Revert to a tx_seq not in `delta_nodes_map`. + // This is needed to revert the last unfinished tx after restart. + pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> { + self.node_manager.start_transaction(); + for height in (0..self.height()).rev() { + let kept_nodes = leaves >> height; + if kept_nodes == 0 { + self.node_manager.truncate_layer(height); + } else { + self.node_manager.truncate_nodes(height, kept_nodes + 1); + } + } + self.recompute_after_append_leaves(leaves); + self.node_manager.commit(); Ok(()) } @@ -611,6 +614,7 @@ impl> AppendMerkleTree { } pub fn reset(&mut self) { + self.node_manager.start_transaction(); for height in (0..self.height()).rev() { self.node_manager.truncate_layer(height); } @@ -621,6 +625,7 @@ impl> AppendMerkleTree { } else { self.node_manager.add_layer(); } + self.node_manager.commit(); } fn clear_after(&mut self, tx_seq: u64) { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index a583467..50c30f1 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -13,13 +13,19 @@ pub struct NodeManager { } impl NodeManager { - pub fn new(db: Arc>, capacity: usize) -> Self { - Self { + pub fn new(db: Arc>, capacity: usize) -> Result { + let mut layer = 0; + let mut layer_size = Vec::new(); + while let Some(size) = db.get_layer_size(layer)? { + layer_size.push(size); + layer += 1; + } + Ok(Self { cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")), - layer_size: vec![], + layer_size, db, db_tx: None, - } + }) } pub fn new_dummy() -> Self { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index d81f500..a33ad45 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -629,12 +629,8 @@ impl LogManager { let tx_store = TransactionStore::new(db.clone())?; let flow_db = Arc::new(FlowDBStore::new(db.clone())); let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone())); - let mut initial_data = flow_store.get_chunk_root_list()?; - // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` - // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` - // and inserted later. - let mut extra_leaves = Vec::new(); - + // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle` + // first and call `put_tx` later. let next_tx_seq = tx_store.next_tx_seq(); let mut start_tx_seq = if next_tx_seq > 0 { Some(next_tx_seq - 1) @@ -642,13 +638,19 @@ impl LogManager { None }; let mut last_tx_to_insert = None; + + let mut pora_chunks_merkle = Merkle::new_with_subtrees( + flow_db, + config.flow.merkle_node_cache_capacity, + log2_pow2(PORA_CHUNK_SIZE), + )?; if let Some(last_tx_seq) = start_tx_seq { if !tx_store.check_tx_completed(last_tx_seq)? { // Last tx not finalized, we need to check if its `put_tx` is completed. let last_tx = tx_store .get_tx_by_seq_number(last_tx_seq)? .expect("tx missing"); - let mut current_len = initial_data.leaves(); + let current_len = pora_chunks_merkle.leaves(); let expected_len = sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64); match expected_len.cmp(&(current_len)) { @@ -678,42 +680,15 @@ impl LogManager { previous_tx.start_entry_index + previous_tx.num_entries() as u64, ); if current_len > expected_len { - while let Some((subtree_depth, _)) = initial_data.subtree_list.pop() - { - current_len -= 1 << (subtree_depth - 1); - if current_len == expected_len { - break; - } - } - } else { - warn!( - "revert last tx with no-op: {} {}", - current_len, expected_len - ); + pora_chunks_merkle.revert_to_leaves(expected_len)?; } - assert_eq!(current_len, expected_len); - while let Some((index, h)) = initial_data.known_leaves.pop() { - if index < current_len { - initial_data.known_leaves.push((index, h)); - break; - } else { - extra_leaves.push((index, h)); - } - } - start_tx_seq = Some(last_tx_seq - 1); + start_tx_seq = Some(previous_tx.seq); }; } } } } - let mut pora_chunks_merkle = Merkle::new_with_subtrees( - flow_db, - config.flow.merkle_node_cache_capacity, - initial_data, - log2_pow2(PORA_CHUNK_SIZE), - start_tx_seq, - )?; let last_chunk_merkle = match start_tx_seq { Some(tx_seq) => { tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)? @@ -751,18 +726,7 @@ impl LogManager { log_manager.start_receiver(receiver, executor); if let Some(tx) = last_tx_to_insert { - log_manager.revert_to(tx.seq - 1)?; log_manager.put_tx(tx)?; - let mut merkle = log_manager.merkle.write(); - for (index, h) in extra_leaves { - if index < merkle.pora_chunks_merkle.leaves() { - merkle.pora_chunks_merkle.fill_leaf(index, h); - } else { - error!("out of range extra leaf: index={} hash={:?}", index, h); - } - } - } else { - assert!(extra_leaves.is_empty()); } log_manager .merkle