From 2ed6bca446644da4799e987e7f344634697bb55b Mon Sep 17 00:00:00 2001 From: peilun-conflux <48905552+peilun-conflux@users.noreply.github.com> Date: Thu, 14 Mar 2024 11:37:34 +0800 Subject: [PATCH] Process padding data in groups to avoid OOM. (#27) --- node/storage/src/log_store/log_manager.rs | 117 +++++++++++++--------- node/storage/src/log_store/tests.rs | 2 +- node/storage/src/log_store/tx_store.rs | 2 +- 3 files changed, 70 insertions(+), 51 deletions(-) diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index eec1558..8dd0783 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -39,6 +39,9 @@ pub const COL_SEAL_CONTEXT: u32 = 6; pub const COL_FLOW_MPT_NODES: u32 = 7; pub const COL_NUM: u32 = 8; +// Process at most 1M entries (256MB) pad data at a time. +const PAD_MAX_SIZE: usize = 1 << 20; + pub struct LogManager { pub(crate) db: Arc, tx_store: TransactionStore, @@ -697,7 +700,7 @@ impl LogManager { #[instrument(skip(self))] fn pad_tx(&mut self, first_subtree_size: u64) -> Result<()> { // Check if we need to pad the flow. - let tx_start_flow_index = + let mut tx_start_flow_index = self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64; let extra = tx_start_flow_index % first_subtree_size; trace!( @@ -706,61 +709,64 @@ impl LogManager { self.last_chunk_merkle.leaves() ); if extra != 0 { - let pad_data = Self::padding((first_subtree_size - extra) as usize); + for pad_data in Self::padding((first_subtree_size - extra) as usize) { + let mut root_map = BTreeMap::new(); - // Update the in-memory merkle tree. - let mut root_map = BTreeMap::new(); - let last_chunk_pad = if self.last_chunk_merkle.leaves() == 0 { - 0 - } else { - (PORA_CHUNK_SIZE - self.last_chunk_merkle.leaves()) * ENTRY_SIZE - }; + // Update the in-memory merkle tree. + let last_chunk_pad = if self.last_chunk_merkle.leaves() == 0 { + 0 + } else { + (PORA_CHUNK_SIZE - self.last_chunk_merkle.leaves()) * ENTRY_SIZE + }; - let mut completed_chunk_index = None; - if pad_data.len() < last_chunk_pad { - self.last_chunk_merkle - .append_list(data_to_merkle_leaves(&pad_data)?); - self.pora_chunks_merkle - .update_last(*self.last_chunk_merkle.root()); - } else { - if last_chunk_pad != 0 { - // Pad the last chunk. + let mut completed_chunk_index = None; + if pad_data.len() < last_chunk_pad { self.last_chunk_merkle - .append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?); + .append_list(data_to_merkle_leaves(&pad_data)?); self.pora_chunks_merkle .update_last(*self.last_chunk_merkle.root()); - root_map.insert( - self.pora_chunks_merkle.leaves() - 1, - (*self.last_chunk_merkle.root(), 1), - ); - completed_chunk_index = Some(self.pora_chunks_merkle.leaves() - 1); + } else { + if last_chunk_pad != 0 { + // Pad the last chunk. + self.last_chunk_merkle + .append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?); + self.pora_chunks_merkle + .update_last(*self.last_chunk_merkle.root()); + root_map.insert( + self.pora_chunks_merkle.leaves() - 1, + (*self.last_chunk_merkle.root(), 1), + ); + completed_chunk_index = Some(self.pora_chunks_merkle.leaves() - 1); + } + + // Pad with more complete chunks. + let mut start_index = last_chunk_pad / ENTRY_SIZE; + while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE { + let data = pad_data[start_index * ENTRY_SIZE + ..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] + .to_vec(); + let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root(); + self.pora_chunks_merkle.append(root); + root_map.insert(self.pora_chunks_merkle.leaves() - 1, (root, 1)); + start_index += PORA_CHUNK_SIZE; + } + assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); } - // Pad with more complete chunks. - let mut start_index = last_chunk_pad / ENTRY_SIZE; - while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE { - let data = pad_data - [start_index * ENTRY_SIZE..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] - .to_vec(); - let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root(); - self.pora_chunks_merkle.append(root); - root_map.insert(self.pora_chunks_merkle.leaves() - 1, (root, 1)); - start_index += PORA_CHUNK_SIZE; + // Update the root index. + self.flow_store.put_batch_root_list(root_map)?; + // Update the flow database. + // This should be called before `complete_last_chunk_merkle` so that we do not save + // subtrees with data known. + let data_size = pad_data.len() / ENTRY_SIZE; + self.flow_store.append_entries(ChunkArray { + data: pad_data, + start_index: tx_start_flow_index, + })?; + tx_start_flow_index += data_size as u64; + if let Some(index) = completed_chunk_index { + self.complete_last_chunk_merkle(index)?; } - assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); - } - - // Update the root index. - self.flow_store.put_batch_root_list(root_map)?; - // Update the flow database. - // This should be called before `complete_last_chunk_merkle` so that we do not save - // subtrees with data known. - self.flow_store.append_entries(ChunkArray { - data: pad_data, - start_index: tx_start_flow_index, - })?; - if let Some(index) = completed_chunk_index { - self.complete_last_chunk_merkle(index)?; } } trace!( @@ -817,7 +823,20 @@ impl LogManager { } // FIXME(zz): Implement padding. - pub fn padding(len: usize) -> Vec { + pub fn padding(len: usize) -> Vec> { + let remainder = len % PAD_MAX_SIZE; + let n = len / PAD_MAX_SIZE; + let mut pad_data = vec![Self::padding_raw(PAD_MAX_SIZE); n]; + if remainder == 0 { + pad_data + } else { + // insert the remainder to the front, so the rest are processed with alignment. + pad_data.insert(0, Self::padding_raw(remainder)); + pad_data + } + } + + pub fn padding_raw(len: usize) -> Vec { vec![0; len * ENTRY_SIZE] } diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index 8e3636b..6928540 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -23,7 +23,7 @@ fn test_put_get() { } let (padded_chunks, _) = compute_padded_chunk_size(data_size); let mut merkle = AppendMerkleTree::::new(vec![H256::zero()], 0, None); - merkle.append_list(data_to_merkle_leaves(&LogManager::padding(start_offset - 1)).unwrap()); + merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap()); let mut data_padded = data.clone(); data_padded.append(&mut vec![0u8; CHUNK_SIZE]); merkle.append_list(data_to_merkle_leaves(&data_padded).unwrap()); diff --git a/node/storage/src/log_store/tx_store.rs b/node/storage/src/log_store/tx_store.rs index 054caf3..d52fb76 100644 --- a/node/storage/src/log_store/tx_store.rs +++ b/node/storage/src/log_store/tx_store.rs @@ -263,7 +263,7 @@ impl TransactionStore { if merkle.leaves() % first_subtree != 0 { let pad_len = cmp::min(first_subtree, PORA_CHUNK_SIZE) - (merkle.leaves() % first_subtree); - merkle.append_list(data_to_merkle_leaves(&LogManager::padding(pad_len))?); + merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(pad_len))?); } // Since we are building the last merkle with a given last tx_seq, it's ensured // that appending subtrees will not go beyond the max size.