Process padding data in groups to avoid OOM. (#27)

This commit is contained in:
peilun-conflux 2024-03-14 11:37:34 +08:00 committed by GitHub
parent ef3044fafc
commit 2ed6bca446
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 70 additions and 51 deletions

View File

@ -39,6 +39,9 @@ pub const COL_SEAL_CONTEXT: u32 = 6;
pub const COL_FLOW_MPT_NODES: u32 = 7; pub const COL_FLOW_MPT_NODES: u32 = 7;
pub const COL_NUM: u32 = 8; pub const COL_NUM: u32 = 8;
// Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20;
pub struct LogManager { pub struct LogManager {
pub(crate) db: Arc<dyn ZgsKeyValueDB>, pub(crate) db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore, tx_store: TransactionStore,
@ -697,7 +700,7 @@ impl LogManager {
#[instrument(skip(self))] #[instrument(skip(self))]
fn pad_tx(&mut self, first_subtree_size: u64) -> Result<()> { fn pad_tx(&mut self, first_subtree_size: u64) -> Result<()> {
// Check if we need to pad the flow. // Check if we need to pad the flow.
let tx_start_flow_index = let mut tx_start_flow_index =
self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64; self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64;
let extra = tx_start_flow_index % first_subtree_size; let extra = tx_start_flow_index % first_subtree_size;
trace!( trace!(
@ -706,61 +709,64 @@ impl LogManager {
self.last_chunk_merkle.leaves() self.last_chunk_merkle.leaves()
); );
if extra != 0 { if extra != 0 {
let pad_data = Self::padding((first_subtree_size - extra) as usize); for pad_data in Self::padding((first_subtree_size - extra) as usize) {
let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree. // Update the in-memory merkle tree.
let mut root_map = BTreeMap::new(); let last_chunk_pad = if self.last_chunk_merkle.leaves() == 0 {
let last_chunk_pad = if self.last_chunk_merkle.leaves() == 0 { 0
0 } else {
} else { (PORA_CHUNK_SIZE - self.last_chunk_merkle.leaves()) * ENTRY_SIZE
(PORA_CHUNK_SIZE - self.last_chunk_merkle.leaves()) * ENTRY_SIZE };
};
let mut completed_chunk_index = None; let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad { if pad_data.len() < last_chunk_pad {
self.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?);
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
} else {
if last_chunk_pad != 0 {
// Pad the last chunk.
self.last_chunk_merkle self.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?); .append_list(data_to_merkle_leaves(&pad_data)?);
self.pora_chunks_merkle self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root()); .update_last(*self.last_chunk_merkle.root());
root_map.insert( } else {
self.pora_chunks_merkle.leaves() - 1, if last_chunk_pad != 0 {
(*self.last_chunk_merkle.root(), 1), // Pad the last chunk.
); self.last_chunk_merkle
completed_chunk_index = Some(self.pora_chunks_merkle.leaves() - 1); .append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
root_map.insert(
self.pora_chunks_merkle.leaves() - 1,
(*self.last_chunk_merkle.root(), 1),
);
completed_chunk_index = Some(self.pora_chunks_merkle.leaves() - 1);
}
// Pad with more complete chunks.
let mut start_index = last_chunk_pad / ENTRY_SIZE;
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
let data = pad_data[start_index * ENTRY_SIZE
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
.to_vec();
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
self.pora_chunks_merkle.append(root);
root_map.insert(self.pora_chunks_merkle.leaves() - 1, (root, 1));
start_index += PORA_CHUNK_SIZE;
}
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
} }
// Pad with more complete chunks. // Update the root index.
let mut start_index = last_chunk_pad / ENTRY_SIZE; self.flow_store.put_batch_root_list(root_map)?;
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE { // Update the flow database.
let data = pad_data // This should be called before `complete_last_chunk_merkle` so that we do not save
[start_index * ENTRY_SIZE..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] // subtrees with data known.
.to_vec(); let data_size = pad_data.len() / ENTRY_SIZE;
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root(); self.flow_store.append_entries(ChunkArray {
self.pora_chunks_merkle.append(root); data: pad_data,
root_map.insert(self.pora_chunks_merkle.leaves() - 1, (root, 1)); start_index: tx_start_flow_index,
start_index += PORA_CHUNK_SIZE; })?;
tx_start_flow_index += data_size as u64;
if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index)?;
} }
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
}
// Update the root index.
self.flow_store.put_batch_root_list(root_map)?;
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
self.flow_store.append_entries(ChunkArray {
data: pad_data,
start_index: tx_start_flow_index,
})?;
if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index)?;
} }
} }
trace!( trace!(
@ -817,7 +823,20 @@ impl LogManager {
} }
// FIXME(zz): Implement padding. // FIXME(zz): Implement padding.
pub fn padding(len: usize) -> Vec<u8> { pub fn padding(len: usize) -> Vec<Vec<u8>> {
let remainder = len % PAD_MAX_SIZE;
let n = len / PAD_MAX_SIZE;
let mut pad_data = vec![Self::padding_raw(PAD_MAX_SIZE); n];
if remainder == 0 {
pad_data
} else {
// insert the remainder to the front, so the rest are processed with alignment.
pad_data.insert(0, Self::padding_raw(remainder));
pad_data
}
}
pub fn padding_raw(len: usize) -> Vec<u8> {
vec![0; len * ENTRY_SIZE] vec![0; len * ENTRY_SIZE]
} }

View File

@ -23,7 +23,7 @@ fn test_put_get() {
} }
let (padded_chunks, _) = compute_padded_chunk_size(data_size); let (padded_chunks, _) = compute_padded_chunk_size(data_size);
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None); let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data_to_merkle_leaves(&LogManager::padding(start_offset - 1)).unwrap()); merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap());
let mut data_padded = data.clone(); let mut data_padded = data.clone();
data_padded.append(&mut vec![0u8; CHUNK_SIZE]); data_padded.append(&mut vec![0u8; CHUNK_SIZE]);
merkle.append_list(data_to_merkle_leaves(&data_padded).unwrap()); merkle.append_list(data_to_merkle_leaves(&data_padded).unwrap());

View File

@ -263,7 +263,7 @@ impl TransactionStore {
if merkle.leaves() % first_subtree != 0 { if merkle.leaves() % first_subtree != 0 {
let pad_len = let pad_len =
cmp::min(first_subtree, PORA_CHUNK_SIZE) - (merkle.leaves() % first_subtree); cmp::min(first_subtree, PORA_CHUNK_SIZE) - (merkle.leaves() % first_subtree);
merkle.append_list(data_to_merkle_leaves(&LogManager::padding(pad_len))?); merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(pad_len))?);
} }
// Since we are building the last merkle with a given last tx_seq, it's ensured // Since we are building the last merkle with a given last tx_seq, it's ensured
// that appending subtrees will not go beyond the max size. // that appending subtrees will not go beyond the max size.