mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-12 16:15:17 +00:00
Save updated mpt nodes from proof to DB. (#20)
* Save updated mpt nodes from proof to DB. * Free disk space for Github Actions. * Use an alternative Rust cache action. * Fix action usage. * Do not free large packages.
This commit is contained in:
parent
01c2dd1135
commit
310bf1c8dd
13
.github/actions/setup-rust/action.yml
vendored
13
.github/actions/setup-rust/action.yml
vendored
@ -2,20 +2,11 @@ name: Setup Rust (cache & toolchain)
|
|||||||
runs:
|
runs:
|
||||||
using: composite
|
using: composite
|
||||||
steps:
|
steps:
|
||||||
- name: Cargo cache
|
|
||||||
uses: actions/cache@v3
|
|
||||||
with:
|
|
||||||
path: |
|
|
||||||
~/.cargo/bin/
|
|
||||||
~/.cargo/registry/index/
|
|
||||||
~/.cargo/registry/cache/
|
|
||||||
~/.cargo/git/db/
|
|
||||||
target/
|
|
||||||
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
|
|
||||||
|
|
||||||
- name: Install toolchain 1.75.0
|
- name: Install toolchain 1.75.0
|
||||||
uses: actions-rs/toolchain@v1
|
uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
profile: minimal
|
profile: minimal
|
||||||
toolchain: 1.75.0
|
toolchain: 1.75.0
|
||||||
components: rustfmt, clippy
|
components: rustfmt, clippy
|
||||||
|
|
||||||
|
- uses: Swatinem/rust-cache@v2
|
13
.github/workflows/cc.yml
vendored
13
.github/workflows/cc.yml
vendored
@ -15,6 +15,19 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
- name: Free Disk Space (Ubuntu)
|
||||||
|
uses: jlumbroso/free-disk-space@main
|
||||||
|
with:
|
||||||
|
# this might remove tools that are actually needed,
|
||||||
|
# if set to "true" but frees about 6 GB
|
||||||
|
tool-cache: false
|
||||||
|
android: true
|
||||||
|
dotnet: true
|
||||||
|
haskell: true
|
||||||
|
large-packages: false
|
||||||
|
docker-images: true
|
||||||
|
swap-storage: true
|
||||||
|
|
||||||
- name: Checkout sources
|
- name: Checkout sources
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
with:
|
with:
|
||||||
|
@ -88,6 +88,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
for (index, h) in initial_data.known_leaves {
|
for (index, h) in initial_data.known_leaves {
|
||||||
merkle.fill_leaf(index, h);
|
merkle.fill_leaf(index, h);
|
||||||
}
|
}
|
||||||
|
for (layer_index, position, h) in initial_data.extra_mpt_nodes {
|
||||||
|
// TODO: Delete duplicate nodes from DB.
|
||||||
|
merkle.layers[layer_index][position] = h;
|
||||||
|
}
|
||||||
Ok(merkle)
|
Ok(merkle)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,7 +196,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
/// This requires that the proof is built against this tree.
|
/// This requires that the proof is built against this tree.
|
||||||
/// This should only be called after validating the proof (including checking root existence).
|
/// This should only be called after validating the proof (including checking root existence).
|
||||||
/// Returns `Error` if the data is conflict with existing ones.
|
/// Returns `Error` if the data is conflict with existing ones.
|
||||||
pub fn fill_with_range_proof(&mut self, proof: RangeProof<E>) -> Result<()> {
|
pub fn fill_with_range_proof(
|
||||||
|
&mut self,
|
||||||
|
proof: RangeProof<E>,
|
||||||
|
) -> Result<Vec<(usize, usize, E)>> {
|
||||||
self.fill_with_proof(
|
self.fill_with_proof(
|
||||||
proof
|
proof
|
||||||
.left_proof
|
.left_proof
|
||||||
@ -212,7 +219,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
proof: Proof<E>,
|
proof: Proof<E>,
|
||||||
mut tx_merkle_nodes: Vec<(usize, E)>,
|
mut tx_merkle_nodes: Vec<(usize, E)>,
|
||||||
start_index: u64,
|
start_index: u64,
|
||||||
) -> Result<()> {
|
) -> Result<Vec<(usize, usize, E)>> {
|
||||||
let tx_merkle_nodes_size = tx_merkle_nodes.len();
|
let tx_merkle_nodes_size = tx_merkle_nodes.len();
|
||||||
if self.leaf_height != 0 {
|
if self.leaf_height != 0 {
|
||||||
tx_merkle_nodes = tx_merkle_nodes
|
tx_merkle_nodes = tx_merkle_nodes
|
||||||
@ -227,7 +234,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
.collect();
|
.collect();
|
||||||
}
|
}
|
||||||
if tx_merkle_nodes.is_empty() {
|
if tx_merkle_nodes.is_empty() {
|
||||||
return Ok(());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
let mut position_and_data =
|
let mut position_and_data =
|
||||||
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
proof.file_proof_nodes_in_tree(tx_merkle_nodes, tx_merkle_nodes_size);
|
||||||
@ -239,7 +246,12 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
|
||||||
fn fill_with_proof(&mut self, position_and_data: Vec<(usize, E)>) -> Result<()> {
|
/// Return the inserted nodes and position.
|
||||||
|
fn fill_with_proof(
|
||||||
|
&mut self,
|
||||||
|
position_and_data: Vec<(usize, E)>,
|
||||||
|
) -> Result<Vec<(usize, usize, E)>> {
|
||||||
|
let mut updated_nodes = Vec::new();
|
||||||
// A valid proof should not fail the following checks.
|
// A valid proof should not fail the following checks.
|
||||||
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
|
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
|
||||||
let layer = &mut self.layers[i];
|
let layer = &mut self.layers[i];
|
||||||
@ -254,7 +266,10 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
// skip padding node.
|
// skip padding node.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if layer[position] != E::null() && layer[position] != data {
|
if layer[position] == E::null() {
|
||||||
|
layer[position] = data.clone();
|
||||||
|
updated_nodes.push((i, position, data))
|
||||||
|
} else if layer[position] != data {
|
||||||
bail!(
|
bail!(
|
||||||
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
|
"conflict data layer={} position={} tree_data={:?} proof_data={:?}",
|
||||||
i,
|
i,
|
||||||
@ -263,9 +278,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
|||||||
data
|
data
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
layer[position] = data;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(updated_nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn gen_range_proof(&self, start_index: usize, end_index: usize) -> Result<RangeProof<E>> {
|
pub fn gen_range_proof(&self, start_index: usize, end_index: usize) -> Result<RangeProof<E>> {
|
||||||
|
@ -127,6 +127,11 @@ pub struct MerkleTreeInitialData<E: HashElement> {
|
|||||||
/// These leaves are in some large subtrees of `subtree_list`. 1-node subtrees are also leaves,
|
/// These leaves are in some large subtrees of `subtree_list`. 1-node subtrees are also leaves,
|
||||||
/// but they will not be duplicated in `known_leaves`.
|
/// but they will not be duplicated in `known_leaves`.
|
||||||
pub known_leaves: Vec<(usize, E)>,
|
pub known_leaves: Vec<(usize, E)>,
|
||||||
|
|
||||||
|
/// A list of `(layer_index, position, hash)`.
|
||||||
|
/// These are the nodes known from proofs.
|
||||||
|
/// They should only be inserted after constructing the tree.
|
||||||
|
pub extra_mpt_nodes: Vec<(usize, usize, E)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: HashElement> MerkleTreeInitialData<E> {
|
impl<E: HashElement> MerkleTreeInitialData<E> {
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
use super::load_chunk::EntryBatch;
|
use super::load_chunk::EntryBatch;
|
||||||
use super::{MineLoadChunk, SealAnswer, SealTask};
|
use super::{MineLoadChunk, SealAnswer, SealTask};
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::log_store::log_manager::{bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT};
|
use crate::log_store::log_manager::{
|
||||||
|
bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES,
|
||||||
|
};
|
||||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
@ -70,6 +72,10 @@ impl FlowStore {
|
|||||||
})?;
|
})?;
|
||||||
merkle.gen_proof(sector_index)
|
merkle.gen_proof(sector_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
||||||
|
self.db.put_mpt_node_list(node_list)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@ -441,9 +447,11 @@ impl FlowDBStore {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let extra_node_list = self.get_mpt_node_list()?;
|
||||||
Ok(MerkleTreeInitialData {
|
Ok(MerkleTreeInitialData {
|
||||||
subtree_list: root_list,
|
subtree_list: root_list,
|
||||||
known_leaves: leaf_list,
|
known_leaves: leaf_list,
|
||||||
|
extra_mpt_nodes: extra_node_list,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -492,6 +500,32 @@ impl FlowDBStore {
|
|||||||
self.kvdb.write(tx)?;
|
self.kvdb.write(tx)?;
|
||||||
Ok(index_to_reseal)
|
Ok(index_to_reseal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn put_mpt_node_list(&self, mpt_node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
|
||||||
|
let mut tx = self.kvdb.transaction();
|
||||||
|
for (layer_index, position, data) in mpt_node_list {
|
||||||
|
tx.put(
|
||||||
|
COL_FLOW_MPT_NODES,
|
||||||
|
&encode_mpt_node_key(layer_index, position),
|
||||||
|
data.as_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(self.kvdb.write(tx)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_mpt_node_list(&self) -> Result<Vec<(usize, usize, DataRoot)>> {
|
||||||
|
let mut node_list = Vec::new();
|
||||||
|
for r in self.kvdb.iter(COL_FLOW_MPT_NODES) {
|
||||||
|
let (index_bytes, node_bytes) = r?;
|
||||||
|
let (layer_index, position) = decode_mpt_node_key(index_bytes.as_ref())?;
|
||||||
|
node_list.push((
|
||||||
|
layer_index,
|
||||||
|
position,
|
||||||
|
DataRoot::from_slice(node_bytes.as_ref()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(node_list)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
|
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
|
||||||
@ -537,3 +571,18 @@ fn decode_batch_root_key(data: &[u8]) -> Result<(usize, usize)> {
|
|||||||
let subtree_depth = usize::MAX - try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
let subtree_depth = usize::MAX - try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
||||||
Ok((batch_index, subtree_depth))
|
Ok((batch_index, subtree_depth))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn encode_mpt_node_key(layer_index: usize, position: usize) -> Vec<u8> {
|
||||||
|
let mut key = layer_index.to_be_bytes().to_vec();
|
||||||
|
key.extend_from_slice(&position.to_be_bytes());
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
|
||||||
|
if data.len() != mem::size_of::<usize>() * 2 {
|
||||||
|
bail!("invalid data length");
|
||||||
|
}
|
||||||
|
let layer_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
|
||||||
|
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
|
||||||
|
Ok((layer_index, position))
|
||||||
|
}
|
||||||
|
@ -36,7 +36,8 @@ pub const COL_ENTRY_BATCH_ROOT: u32 = 3;
|
|||||||
pub const COL_TX_COMPLETED: u32 = 4;
|
pub const COL_TX_COMPLETED: u32 = 4;
|
||||||
pub const COL_MISC: u32 = 5;
|
pub const COL_MISC: u32 = 5;
|
||||||
pub const COL_SEAL_CONTEXT: u32 = 6;
|
pub const COL_SEAL_CONTEXT: u32 = 6;
|
||||||
pub const COL_NUM: u32 = 7;
|
pub const COL_FLOW_MPT_NODES: u32 = 7;
|
||||||
|
pub const COL_NUM: u32 = 8;
|
||||||
|
|
||||||
pub struct LogManager {
|
pub struct LogManager {
|
||||||
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
|
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
|
||||||
@ -119,11 +120,12 @@ impl LogStoreChunkWrite for LogManager {
|
|||||||
self.append_entries(flow_entry_array)?;
|
self.append_entries(flow_entry_array)?;
|
||||||
|
|
||||||
if let Some(file_proof) = maybe_file_proof {
|
if let Some(file_proof) = maybe_file_proof {
|
||||||
self.pora_chunks_merkle.fill_with_file_proof(
|
let updated_node_list = self.pora_chunks_merkle.fill_with_file_proof(
|
||||||
file_proof,
|
file_proof,
|
||||||
tx.merkle_nodes,
|
tx.merkle_nodes,
|
||||||
tx.start_entry_index,
|
tx.start_entry_index,
|
||||||
)?;
|
)?;
|
||||||
|
self.flow_store.put_mpt_node_list(updated_node_list)?;
|
||||||
}
|
}
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
@ -273,8 +275,10 @@ impl LogStoreWrite for LogManager {
|
|||||||
) -> Result<bool> {
|
) -> Result<bool> {
|
||||||
let valid = self.validate_range_proof(tx_seq, data)?;
|
let valid = self.validate_range_proof(tx_seq, data)?;
|
||||||
if valid {
|
if valid {
|
||||||
self.pora_chunks_merkle
|
let updated_nodes = self
|
||||||
|
.pora_chunks_merkle
|
||||||
.fill_with_range_proof(data.proof.clone())?;
|
.fill_with_range_proof(data.proof.clone())?;
|
||||||
|
self.flow_store.put_mpt_node_list(updated_nodes)?;
|
||||||
}
|
}
|
||||||
Ok(valid)
|
Ok(valid)
|
||||||
}
|
}
|
||||||
|
@ -89,6 +89,11 @@ class SyncTest(TestFramework):
|
|||||||
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
|
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
|
||||||
assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None)
|
assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None)
|
||||||
|
|
||||||
|
# Restart node 1 to check if the proof nodes are persisted.
|
||||||
|
self.stop_storage_node(0)
|
||||||
|
self.start_storage_node(0)
|
||||||
|
self.nodes[0].wait_for_rpc_connection()
|
||||||
|
|
||||||
# Trigger chunks sync by rpc
|
# Trigger chunks sync by rpc
|
||||||
assert(client2.admin_start_sync_chunks(1, 1024, 2048) is None)
|
assert(client2.admin_start_sync_chunks(1, 1024, 2048) is None)
|
||||||
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(1))
|
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(1))
|
||||||
|
Loading…
Reference in New Issue
Block a user