mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Compare commits
2 Commits
814bc35b3b
...
e58f6ca101
Author | SHA1 | Date | |
---|---|---|---|
|
e58f6ca101 | ||
|
f29af9c872 |
@ -110,8 +110,7 @@ impl Store {
|
||||
}
|
||||
|
||||
pub async fn get_num_entries(&self) -> Result<u64> {
|
||||
self.spawn(move |store| store.get_num_entries())
|
||||
.await
|
||||
self.spawn(move |store| store.get_num_entries()).await
|
||||
}
|
||||
|
||||
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {
|
||||
|
@ -3,7 +3,8 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
|
||||
use crate::config::ShardConfig;
|
||||
use crate::error::Error;
|
||||
use crate::log_store::log_manager::{
|
||||
bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
|
||||
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
|
||||
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
|
||||
use itertools::Itertools;
|
||||
use parking_lot::RwLock;
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||
use std::cmp::Ordering;
|
||||
@ -441,6 +442,10 @@ impl FlowDBStore {
|
||||
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
|
||||
let mut leaf_list = Vec::new();
|
||||
let mut expected_index = 0;
|
||||
|
||||
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
||||
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||
|
||||
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
||||
let (index_bytes, root_bytes) = r?;
|
||||
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
|
||||
@ -475,12 +480,14 @@ impl FlowDBStore {
|
||||
expected_index += 1;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
bail!(
|
||||
"unexpected chunk leaf in range, expected={}, get={}, range={:?}",
|
||||
expected_index,
|
||||
batch_index,
|
||||
range_root,
|
||||
);
|
||||
while batch_index > expected_index {
|
||||
// Fill the gap with empty leaves.
|
||||
root_list.push((1, empty_root));
|
||||
expected_index += 1;
|
||||
}
|
||||
range_root = None;
|
||||
root_list.push((1, root));
|
||||
expected_index += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,9 +21,9 @@ use shared_types::{
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::sync::mpsc;
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
use super::tx_store::BlockHashAndSubmissionIndex;
|
||||
@ -732,7 +732,7 @@ impl LogManager {
|
||||
tx_store,
|
||||
flow_store,
|
||||
merkle,
|
||||
sender
|
||||
sender,
|
||||
};
|
||||
|
||||
log_manager.start_receiver(receiver);
|
||||
@ -768,10 +768,12 @@ impl LogManager {
|
||||
// Update the flow database.
|
||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||
// subtrees with data known.
|
||||
flow_store.append_entries(ChunkArray {
|
||||
flow_store
|
||||
.append_entries(ChunkArray {
|
||||
data: data.pad_data,
|
||||
start_index: data.tx_start_flow_index,
|
||||
}).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
std::result::Result::Err(_) => {
|
||||
bail!("Receiver error");
|
||||
@ -917,6 +919,7 @@ impl LogManager {
|
||||
);
|
||||
if extra != 0 {
|
||||
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
|
||||
let mut is_full_empty = true;
|
||||
let mut root_map = BTreeMap::new();
|
||||
|
||||
// Update the in-memory merkle tree.
|
||||
@ -928,6 +931,7 @@ impl LogManager {
|
||||
|
||||
let mut completed_chunk_index = None;
|
||||
if pad_data.len() < last_chunk_pad {
|
||||
is_full_empty = false;
|
||||
merkle
|
||||
.last_chunk_merkle
|
||||
.append_list(data_to_merkle_leaves(&pad_data)?);
|
||||
@ -936,6 +940,7 @@ impl LogManager {
|
||||
.update_last(*merkle.last_chunk_merkle.root());
|
||||
} else {
|
||||
if last_chunk_pad != 0 {
|
||||
is_full_empty = false;
|
||||
// Pad the last chunk.
|
||||
merkle
|
||||
.last_chunk_merkle
|
||||
@ -964,13 +969,26 @@ impl LogManager {
|
||||
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
|
||||
}
|
||||
|
||||
|
||||
let data_size = pad_data.len() / ENTRY_SIZE;
|
||||
if is_full_empty {
|
||||
self.sender.send(UpdateFlowMessage {
|
||||
root_map,
|
||||
pad_data: pad_data.to_vec(),
|
||||
tx_start_flow_index,
|
||||
})?;
|
||||
} else {
|
||||
self.flow_store.put_batch_root_list(root_map).unwrap();
|
||||
// Update the flow database.
|
||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||
// subtrees with data known.
|
||||
self.flow_store
|
||||
.append_entries(ChunkArray {
|
||||
data: pad_data.to_vec(),
|
||||
start_index: tx_start_flow_index,
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
tx_start_flow_index += data_size as u64;
|
||||
if let Some(index) = completed_chunk_index {
|
||||
self.complete_last_chunk_merkle(index, &mut *merkle)?;
|
||||
|
@ -183,11 +183,7 @@ pub trait Store:
|
||||
LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
|
||||
{
|
||||
}
|
||||
impl<
|
||||
T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static,
|
||||
> Store for T
|
||||
{
|
||||
}
|
||||
impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
|
||||
|
||||
pub struct MineLoadChunk {
|
||||
// Use `Vec` instead of array to avoid thread stack overflow.
|
||||
|
Loading…
Reference in New Issue
Block a user