mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Initialize LogManager with NodeManager.
This commit is contained in:
parent
19829f1def
commit
9c2f6e9d7d
@ -71,49 +71,33 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
pub fn new_with_subtrees(
|
pub fn new_with_subtrees(
|
||||||
node_db: Arc<dyn NodeDatabase<E>>,
|
node_db: Arc<dyn NodeDatabase<E>>,
|
||||||
node_cache_capacity: usize,
|
node_cache_capacity: usize,
|
||||||
initial_data: MerkleTreeInitialData<E>,
|
|
||||||
leaf_height: usize,
|
leaf_height: usize,
|
||||||
start_tx_seq: Option<u64>,
|
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
node_manager: NodeManager::new(node_db, node_cache_capacity),
|
node_manager: NodeManager::new(node_db, node_cache_capacity)?,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: None,
|
min_depth: None,
|
||||||
leaf_height,
|
leaf_height,
|
||||||
_a: Default::default(),
|
_a: Default::default(),
|
||||||
};
|
};
|
||||||
merkle.node_manager.add_layer();
|
if merkle.height() == 0 {
|
||||||
if initial_data.subtree_list.is_empty() {
|
merkle.node_manager.start_transaction();
|
||||||
if let Some(seq) = start_tx_seq {
|
merkle.node_manager.add_layer();
|
||||||
merkle.delta_nodes_map.insert(
|
merkle.node_manager.commit();
|
||||||
seq,
|
|
||||||
DeltaNodes {
|
|
||||||
right_most_nodes: vec![],
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return Ok(merkle);
|
|
||||||
}
|
|
||||||
merkle.append_subtree_list(initial_data.subtree_list)?;
|
|
||||||
merkle.commit(start_tx_seq);
|
|
||||||
for (index, h) in initial_data.known_leaves {
|
|
||||||
merkle.fill_leaf(index, h);
|
|
||||||
}
|
|
||||||
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
|
|
||||||
// TODO: Delete duplicate nodes from DB.
|
|
||||||
merkle.update_node(layer_index, position, h);
|
|
||||||
}
|
}
|
||||||
Ok(merkle)
|
Ok(merkle)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
/// This is only used for the last chunk, so `leaf_height` is always 0 so far.
|
||||||
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
pub fn new_with_depth(leaves: Vec<E>, depth: usize, start_tx_seq: Option<u64>) -> Self {
|
||||||
|
let mut node_manager = NodeManager::new_dummy();
|
||||||
|
node_manager.start_transaction();
|
||||||
if leaves.is_empty() {
|
if leaves.is_empty() {
|
||||||
// Create an empty merkle tree with `depth`.
|
// Create an empty merkle tree with `depth`.
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
// dummy node manager for the last chunk.
|
// dummy node manager for the last chunk.
|
||||||
node_manager: NodeManager::new_dummy(),
|
node_manager,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: Some(depth),
|
min_depth: Some(depth),
|
||||||
@ -135,7 +119,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
} else {
|
} else {
|
||||||
let mut merkle = Self {
|
let mut merkle = Self {
|
||||||
// dummy node manager for the last chunk.
|
// dummy node manager for the last chunk.
|
||||||
node_manager: NodeManager::new_dummy(),
|
node_manager,
|
||||||
delta_nodes_map: BTreeMap::new(),
|
delta_nodes_map: BTreeMap::new(),
|
||||||
root_to_tx_seq_map: HashMap::new(),
|
root_to_tx_seq_map: HashMap::new(),
|
||||||
min_depth: Some(depth),
|
min_depth: Some(depth),
|
||||||
@ -569,6 +553,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// Any previous state of an empty tree is always empty.
|
// Any previous state of an empty tree is always empty.
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
self.node_manager.start_transaction();
|
||||||
let delta_nodes = self
|
let delta_nodes = self
|
||||||
.delta_nodes_map
|
.delta_nodes_map
|
||||||
.get(&tx_seq)
|
.get(&tx_seq)
|
||||||
@ -585,6 +570,24 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
self.update_node(height, *last_index, right_most_node.clone())
|
self.update_node(height, *last_index, right_most_node.clone())
|
||||||
}
|
}
|
||||||
self.clear_after(tx_seq);
|
self.clear_after(tx_seq);
|
||||||
|
self.node_manager.commit();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revert to a tx_seq not in `delta_nodes_map`.
|
||||||
|
// This is needed to revert the last unfinished tx after restart.
|
||||||
|
pub fn revert_to_leaves(&mut self, leaves: usize) -> Result<()> {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
|
for height in (0..self.height()).rev() {
|
||||||
|
let kept_nodes = leaves >> height;
|
||||||
|
if kept_nodes == 0 {
|
||||||
|
self.node_manager.truncate_layer(height);
|
||||||
|
} else {
|
||||||
|
self.node_manager.truncate_nodes(height, kept_nodes + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.recompute_after_append_leaves(leaves);
|
||||||
|
self.node_manager.commit();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -611,6 +614,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn reset(&mut self) {
|
pub fn reset(&mut self) {
|
||||||
|
self.node_manager.start_transaction();
|
||||||
for height in (0..self.height()).rev() {
|
for height in (0..self.height()).rev() {
|
||||||
self.node_manager.truncate_layer(height);
|
self.node_manager.truncate_layer(height);
|
||||||
}
|
}
|
||||||
@ -621,6 +625,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
} else {
|
} else {
|
||||||
self.node_manager.add_layer();
|
self.node_manager.add_layer();
|
||||||
}
|
}
|
||||||
|
self.node_manager.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clear_after(&mut self, tx_seq: u64) {
|
fn clear_after(&mut self, tx_seq: u64) {
|
||||||
|
@ -13,13 +13,19 @@ pub struct NodeManager<E: HashElement> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<E: HashElement> NodeManager<E> {
|
impl<E: HashElement> NodeManager<E> {
|
||||||
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Self {
|
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Result<Self> {
|
||||||
Self {
|
let mut layer = 0;
|
||||||
|
let mut layer_size = Vec::new();
|
||||||
|
while let Some(size) = db.get_layer_size(layer)? {
|
||||||
|
layer_size.push(size);
|
||||||
|
layer += 1;
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
|
||||||
layer_size: vec![],
|
layer_size,
|
||||||
db,
|
db,
|
||||||
db_tx: None,
|
db_tx: None,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_dummy() -> Self {
|
pub fn new_dummy() -> Self {
|
||||||
|
@ -629,12 +629,8 @@ impl LogManager {
|
|||||||
let tx_store = TransactionStore::new(db.clone())?;
|
let tx_store = TransactionStore::new(db.clone())?;
|
||||||
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
|
||||||
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
|
||||||
let mut initial_data = flow_store.get_chunk_root_list()?;
|
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||||
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
// first and call `put_tx` later.
|
||||||
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
|
||||||
// and inserted later.
|
|
||||||
let mut extra_leaves = Vec::new();
|
|
||||||
|
|
||||||
let next_tx_seq = tx_store.next_tx_seq();
|
let next_tx_seq = tx_store.next_tx_seq();
|
||||||
let mut start_tx_seq = if next_tx_seq > 0 {
|
let mut start_tx_seq = if next_tx_seq > 0 {
|
||||||
Some(next_tx_seq - 1)
|
Some(next_tx_seq - 1)
|
||||||
@ -642,13 +638,19 @@ impl LogManager {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
let mut last_tx_to_insert = None;
|
let mut last_tx_to_insert = None;
|
||||||
|
|
||||||
|
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
||||||
|
flow_db,
|
||||||
|
config.flow.merkle_node_cache_capacity,
|
||||||
|
log2_pow2(PORA_CHUNK_SIZE),
|
||||||
|
)?;
|
||||||
if let Some(last_tx_seq) = start_tx_seq {
|
if let Some(last_tx_seq) = start_tx_seq {
|
||||||
if !tx_store.check_tx_completed(last_tx_seq)? {
|
if !tx_store.check_tx_completed(last_tx_seq)? {
|
||||||
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
// Last tx not finalized, we need to check if its `put_tx` is completed.
|
||||||
let last_tx = tx_store
|
let last_tx = tx_store
|
||||||
.get_tx_by_seq_number(last_tx_seq)?
|
.get_tx_by_seq_number(last_tx_seq)?
|
||||||
.expect("tx missing");
|
.expect("tx missing");
|
||||||
let mut current_len = initial_data.leaves();
|
let current_len = pora_chunks_merkle.leaves();
|
||||||
let expected_len =
|
let expected_len =
|
||||||
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
sector_to_segment(last_tx.start_entry_index + last_tx.num_entries() as u64);
|
||||||
match expected_len.cmp(&(current_len)) {
|
match expected_len.cmp(&(current_len)) {
|
||||||
@ -678,42 +680,15 @@ impl LogManager {
|
|||||||
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
previous_tx.start_entry_index + previous_tx.num_entries() as u64,
|
||||||
);
|
);
|
||||||
if current_len > expected_len {
|
if current_len > expected_len {
|
||||||
while let Some((subtree_depth, _)) = initial_data.subtree_list.pop()
|
pora_chunks_merkle.revert_to_leaves(expected_len)?;
|
||||||
{
|
|
||||||
current_len -= 1 << (subtree_depth - 1);
|
|
||||||
if current_len == expected_len {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!(
|
|
||||||
"revert last tx with no-op: {} {}",
|
|
||||||
current_len, expected_len
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
assert_eq!(current_len, expected_len);
|
start_tx_seq = Some(previous_tx.seq);
|
||||||
while let Some((index, h)) = initial_data.known_leaves.pop() {
|
|
||||||
if index < current_len {
|
|
||||||
initial_data.known_leaves.push((index, h));
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
extra_leaves.push((index, h));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
start_tx_seq = Some(last_tx_seq - 1);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
|
|
||||||
flow_db,
|
|
||||||
config.flow.merkle_node_cache_capacity,
|
|
||||||
initial_data,
|
|
||||||
log2_pow2(PORA_CHUNK_SIZE),
|
|
||||||
start_tx_seq,
|
|
||||||
)?;
|
|
||||||
let last_chunk_merkle = match start_tx_seq {
|
let last_chunk_merkle = match start_tx_seq {
|
||||||
Some(tx_seq) => {
|
Some(tx_seq) => {
|
||||||
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
tx_store.rebuild_last_chunk_merkle(pora_chunks_merkle.leaves(), tx_seq)?
|
||||||
@ -751,18 +726,7 @@ impl LogManager {
|
|||||||
log_manager.start_receiver(receiver, executor);
|
log_manager.start_receiver(receiver, executor);
|
||||||
|
|
||||||
if let Some(tx) = last_tx_to_insert {
|
if let Some(tx) = last_tx_to_insert {
|
||||||
log_manager.revert_to(tx.seq - 1)?;
|
|
||||||
log_manager.put_tx(tx)?;
|
log_manager.put_tx(tx)?;
|
||||||
let mut merkle = log_manager.merkle.write();
|
|
||||||
for (index, h) in extra_leaves {
|
|
||||||
if index < merkle.pora_chunks_merkle.leaves() {
|
|
||||||
merkle.pora_chunks_merkle.fill_leaf(index, h);
|
|
||||||
} else {
|
|
||||||
error!("out of range extra leaf: index={} hash={:?}", index, h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
assert!(extra_leaves.is_empty());
|
|
||||||
}
|
}
|
||||||
log_manager
|
log_manager
|
||||||
.merkle
|
.merkle
|
||||||
|
Loading…
Reference in New Issue
Block a user