Update the merkle tree with proof data. (#9)

* Add proof data for chunk proof.

* Support file proof.

* Update with segment proof and fix issues.

* Fix more issues.

* Fix the process of file proof.

* Merge branch 'main' into fix_proof

* Enable sync_test.

* Fix wrongly updated submodule.

* Fix bsc node version.
This commit is contained in:
peilun-conflux 2024-01-26 10:29:09 +08:00 committed by GitHub
parent 2c2dba8730
commit 0c123500c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 236 additions and 34 deletions

View File

@ -7,7 +7,7 @@ use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use tracing::warn;
use tracing::{debug, warn};
pub use crate::merkle_tree::{Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead};
pub use proof::{Proof, RangeProof};
@ -188,6 +188,81 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
}
/// Fill nodes with a valid proof data.
/// This requires that the proof is built against this tree.
/// This should only be called after validating the proof (including checking root existence).
/// Returns `Error` if the data is conflict with existing ones.
pub fn fill_with_range_proof(&mut self, proof: RangeProof<E>) -> Result<()> {
self.fill_with_proof(
proof
.left_proof
.proof_nodes_in_tree()
.split_off(self.leaf_height),
)?;
self.fill_with_proof(
proof
.right_proof
.proof_nodes_in_tree()
.split_off(self.leaf_height),
)
}
pub fn fill_with_file_proof(
&mut self,
proof: Proof<E>,
mut tx_merkle_nodes: Vec<(usize, E)>,
start_index: u64,
) -> Result<()> {
if self.leaf_height != 0 {
tx_merkle_nodes = tx_merkle_nodes
.into_iter()
.filter_map(|(height, data)| {
if height >= self.leaf_height + 1 {
Some((height - self.leaf_height - 1, data))
} else {
None
}
})
.collect();
}
if tx_merkle_nodes.is_empty() {
return Ok(());
}
let mut position_and_data = proof.file_proof_nodes_in_tree(tx_merkle_nodes);
let start_index = (start_index >> self.leaf_height) as usize;
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
*position += start_index >> i;
}
self.fill_with_proof(position_and_data)
}
/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
fn fill_with_proof(&mut self, position_and_data: Vec<(usize, E)>) -> Result<()> {
// A valid proof should not fail the following checks.
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
let layer = &mut self.layers[i];
if position > layer.len() {
bail!(
"proof position out of range, position={} layer.len()={}",
position,
layer.len()
);
}
if position == layer.len() {
// skip padding node.
continue;
}
if layer[position] != E::null() && layer[position] != data {
// This is possible for a valid file proof only when the file proof node is an intermediate node,
// so the correct proof node in the flow merkle tree must have been computed as we pad rear data.
// Thus, it's okay to skip this case directly.
continue;
}
layer[position] = data;
}
Ok(())
}
pub fn gen_range_proof(&self, start_index: usize, end_index: usize) -> Result<RangeProof<E>> {
if end_index <= start_index {
bail!(
@ -609,23 +684,23 @@ mod tests {
AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data.clone());
merkle.commit(Some(0));
verify(&data, &merkle);
verify(&data, &mut merkle);
data.push(H256::random());
merkle.append(*data.last().unwrap());
merkle.commit(Some(1));
verify(&data, &merkle);
verify(&data, &mut merkle);
for _ in 0..6 {
data.push(H256::random());
}
merkle.append_list(data[data.len() - 6..].to_vec());
merkle.commit(Some(2));
verify(&data, &merkle);
verify(&data, &mut merkle);
}
}
fn verify(data: &Vec<H256>, merkle: &AppendMerkleTree<H256, Sha3Algorithm>) {
fn verify(data: &Vec<H256>, merkle: &mut AppendMerkleTree<H256, Sha3Algorithm>) {
for i in 0..data.len() {
let proof = merkle.gen_proof(i + 1).unwrap();
let r = merkle.validate(&proof, &data[i], i + 1);
@ -636,6 +711,7 @@ mod tests {
let range_proof = merkle.gen_range_proof(i + 1, end + 1).unwrap();
let r = range_proof.validate::<Sha3Algorithm>(&data[i..end], i + 1);
assert!(r.is_ok(), "{:?}", r);
merkle.fill_with_range_proof(range_proof).unwrap();
}
}
}

View File

@ -106,6 +106,13 @@ pub trait MerkleTreeRead {
index_in_layer >>= 1;
}
lemma.push(self.root().clone());
if lemma.contains(&Self::E::null()) {
bail!(
"Not enough data to generate proof, lemma={:?} path={:?}",
lemma,
path
);
}
Ok(Proof::new(lemma, path))
}
}

View File

@ -85,6 +85,56 @@ impl<T: HashElement> Proof<T> {
}
pos
}
/// Return `Vec<(index_in_layer, data)>`.
pub fn proof_nodes_in_tree(&self) -> Vec<(usize, T)> {
let mut r = Vec::with_capacity(self.lemma.len());
let mut pos = 0;
r.push((0, self.root()));
for (i, is_left) in self.path.iter().rev().enumerate() {
pos <<= 1;
if !*is_left {
pos += 1;
}
let lemma_pos = if *is_left { pos + 1 } else { pos - 1 };
r.push((lemma_pos, self.lemma[self.lemma.len() - 2 - i].clone()));
}
r.reverse();
r
}
pub fn file_proof_nodes_in_tree(&self, tx_merkle_nodes: Vec<(usize, T)>) -> Vec<(usize, T)> {
let mut r = Vec::with_capacity(self.lemma.len());
let mut subtree_pos = 0;
let mut root_pos = 0;
let mut in_subtree = tx_merkle_nodes.len() == 1;
for (i, is_left) in self.path.iter().rev().enumerate() {
if !in_subtree {
if *is_left {
in_subtree = true;
} else {
if i < tx_merkle_nodes.len() {
root_pos += 1 << tx_merkle_nodes[i].0;
} else {
break;
}
}
} else {
subtree_pos <<= 1;
if !*is_left {
subtree_pos += 1;
}
let lemma_pos = if *is_left {
root_pos + subtree_pos + 1
} else {
root_pos + subtree_pos - 1
};
r.push((lemma_pos, self.lemma[self.lemma.len() - 2 - i].clone()));
}
}
r.reverse();
r
}
}
#[derive(Clone, Debug, Eq, PartialEq, DeriveEncode, DeriveDecode, Deserialize, Serialize)]

View File

@ -2,7 +2,7 @@ use super::mem_pool::MemoryChunkPool;
use crate::mem_pool::FileID;
use anyhow::Result;
use network::NetworkMessage;
use shared_types::ChunkArray;
use shared_types::{ChunkArray, FileProof};
use std::sync::Arc;
use storage_async::Store;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@ -45,11 +45,16 @@ impl ChunkPoolHandler {
// when store support to write chunks with reference.
if let Some(file) = self.mem_pool.remove_cached_file(&id.root).await {
// If there is still cache of chunks, write them into store
let mut segments: Vec<ChunkArray> = file.segments.into_values().collect();
while let Some(seg) = segments.pop() {
let mut segments: Vec<(ChunkArray, FileProof)> = file.segments.into_values().collect();
while let Some((seg, proof)) = segments.pop() {
if !self
.log_store
.put_chunks_with_tx_hash(id.tx_id.seq, id.tx_id.hash, seg)
.put_chunks_with_tx_hash(
id.tx_id.seq,
id.tx_id.hash,
seg,
Some(proof.try_into()?),
)
.await?
{
return Ok(false);

View File

@ -2,7 +2,7 @@ use super::FileID;
use crate::{Config, SegmentInfo};
use anyhow::{bail, Result};
use hashlink::LinkedHashMap;
use shared_types::{bytes_to_chunks, ChunkArray, DataRoot, Transaction, CHUNK_SIZE};
use shared_types::{bytes_to_chunks, ChunkArray, DataRoot, FileProof, Transaction, CHUNK_SIZE};
use std::collections::HashMap;
use std::ops::Add;
use std::time::{Duration, Instant};
@ -13,7 +13,7 @@ pub struct MemoryCachedFile {
pub id: FileID,
pub chunks_per_segment: usize,
/// Window to control the cache of each file
pub segments: HashMap<usize, ChunkArray>,
pub segments: HashMap<usize, (ChunkArray, FileProof)>,
/// Total number of chunks for the cache file, which is updated from log entry.
pub total_chunks: usize,
/// Used for garbage collection. It is updated when new segment uploaded.

View File

@ -6,7 +6,7 @@ use anyhow::{bail, Result};
use async_lock::Mutex;
use log_entry_sync::LogSyncEvent;
use shared_types::{
bytes_to_chunks, compute_segment_size, ChunkArray, DataRoot, Transaction, CHUNK_SIZE,
bytes_to_chunks, compute_segment_size, ChunkArray, DataRoot, FileProof, Transaction, CHUNK_SIZE,
};
use std::sync::Arc;
use storage_async::Store;
@ -37,7 +37,7 @@ impl Inner {
fn get_all_cached_segments_to_write(
&mut self,
root: &DataRoot,
) -> Result<(FileID, Vec<ChunkArray>)> {
) -> Result<(FileID, Vec<(ChunkArray, FileProof)>)> {
// Limits the number of writing threads.
if self.write_control.total_writings >= self.config.max_writings {
bail!("too many data writing: {}", self.config.max_writings);
@ -59,17 +59,21 @@ impl Inner {
pub struct SegmentInfo {
pub root: DataRoot,
pub seg_data: Vec<u8>,
pub seg_proof: FileProof,
pub seg_index: usize,
pub chunks_per_segment: usize,
}
impl From<SegmentInfo> for ChunkArray {
impl From<SegmentInfo> for (ChunkArray, FileProof) {
fn from(seg_info: SegmentInfo) -> Self {
let start_index = seg_info.seg_index * seg_info.chunks_per_segment;
ChunkArray {
data: seg_info.seg_data,
start_index: start_index as u64,
}
(
ChunkArray {
data: seg_info.seg_data,
start_index: start_index as u64,
},
seg_info.seg_proof,
)
}
}
@ -155,7 +159,12 @@ impl MemoryChunkPool {
match self
.log_store
.put_chunks_with_tx_hash(file_id.tx_id.seq, file_id.tx_id.hash, seg)
.put_chunks_with_tx_hash(
file_id.tx_id.seq,
file_id.tx_id.hash,
seg,
Some(seg_info.seg_proof.try_into()?),
)
.await
{
Ok(true) => {}
@ -212,11 +221,12 @@ impl MemoryChunkPool {
.remove_file(&tx.data_merkle_root);
if let Some(mut file) = maybe_file {
file.update_with_tx(tx);
for (seg_index, seg) in file.segments.into_iter() {
for (seg_index, (seg, proof)) in file.segments.into_iter() {
self.write_chunks(
SegmentInfo {
root: tx.data_merkle_root,
seg_data: seg.data.clone(),
seg_proof: proof,
seg_index,
chunks_per_segment: file.chunks_per_segment,
},
@ -276,19 +286,24 @@ impl MemoryChunkPool {
}
async fn write_all_cached_chunks_and_finalize(&self, root: DataRoot) -> Result<()> {
let (file, mut segments) = self
let (file, mut segments_with_proof) = self
.inner
.lock()
.await
.get_all_cached_segments_to_write(&root)?;
while let Some(seg) = segments.pop() {
while let Some((seg, proof)) = segments_with_proof.pop() {
// TODO(qhz): error handling
// 1. Push the failed segment back to front. (enhance store to return Err(ChunkArray))
// 2. Put the incompleted segments back to memory pool.
match self
.log_store
.put_chunks_with_tx_hash(file.tx_id.seq, file.tx_id.hash, seg)
.put_chunks_with_tx_hash(
file.tx_id.seq,
file.tx_id.hash,
seg,
Some(proof.try_into()?),
)
.await
{
Ok(true) => {}

View File

@ -269,6 +269,7 @@ impl LogSyncManager {
data,
start_index: 0,
},
None,
)
.and_then(|_| store.finalize_tx_with_hash(tx.seq, tx.hash()))
{

View File

@ -63,6 +63,7 @@ impl RpcServer for RpcServerImpl {
let seg_info = SegmentInfo {
root: segment.root,
seg_data: segment.data,
seg_proof: segment.proof,
seg_index: segment.index,
chunks_per_segment: self.ctx.config.chunks_per_segment,
};

View File

@ -1,6 +1,6 @@
mod proof;
use anyhow::bail;
use anyhow::{anyhow, bail, Error};
use append_merkle::{
AppendMerkleTree, Proof as RawProof, RangeProof as RawRangeProof, Sha3Algorithm,
};
@ -346,3 +346,19 @@ pub fn compute_segment_merkle_root(data: &[u8], segment_chunks: usize) -> [u8; 3
MerkleTree::<_, RawLeafSha3Algorithm>::new(hashes).root()
}
impl TryFrom<FileProof> for FlowProof {
type Error = Error;
fn try_from(value: FileProof) -> Result<Self, Self::Error> {
let mut lemma = value.lemma;
if value.path.is_empty() {
lemma.push(*lemma.first().ok_or(anyhow!("empty file proof"))?);
}
if lemma.len() != value.path.len() + 2 {
Err(anyhow!("invalid file proof"))
} else {
Ok(Self::new(lemma, value.path))
}
}
}

View File

@ -2,7 +2,7 @@
extern crate tracing;
use anyhow::bail;
use shared_types::{Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, Transaction};
use shared_types::{Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, Transaction};
use std::sync::Arc;
use storage::{error, error::Result, log_store::Store as LogStore, H256};
use task_executor::TaskExecutor;
@ -43,7 +43,7 @@ impl Store {
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArrayWithProof>>);
delegate!(fn get_tx_by_seq_number(seq: u64) -> Result<Option<Transaction>>);
delegate!(fn put_chunks(tx_seq: u64, chunks: ChunkArray) -> Result<()>);
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray) -> Result<bool>);
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);
delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);

View File

@ -93,6 +93,7 @@ impl LogStoreChunkWrite for LogManager {
tx_seq: u64,
tx_hash: H256,
chunks: ChunkArray,
maybe_file_proof: Option<FlowProof>,
) -> Result<bool> {
let tx = self
.tx_store
@ -116,6 +117,14 @@ impl LogStoreChunkWrite for LogManager {
let mut flow_entry_array = chunks;
flow_entry_array.start_index += tx.start_entry_index;
self.append_entries(flow_entry_array)?;
if let Some(file_proof) = maybe_file_proof {
self.pora_chunks_merkle.fill_with_file_proof(
file_proof,
tx.merkle_nodes,
tx.start_entry_index,
)?;
}
Ok(true)
}
@ -256,6 +265,19 @@ impl LogStoreWrite for LogManager {
let start = if tx_seq != u64::MAX { tx_seq + 1 } else { 0 };
self.tx_store.remove_tx_after(start)
}
fn validate_and_insert_range_proof(
&mut self,
tx_seq: u64,
data: &ChunkArrayWithProof,
) -> Result<bool> {
let valid = self.validate_range_proof(tx_seq, data)?;
if valid {
self.pora_chunks_merkle
.fill_with_range_proof(data.proof.clone())?;
}
Ok(valid)
}
}
impl LogStoreChunkRead for LogManager {
@ -886,6 +908,7 @@ impl LogManager {
+ last_segment_size_for_file)
as u64,
},
None,
)?;
}

View File

@ -1,7 +1,8 @@
use append_merkle::MerkleTreeInitialData;
use ethereum_types::H256;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowRangeProof, Transaction,
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
Transaction,
};
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
@ -114,6 +115,13 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
///
/// Reverted transactions are returned in order.
fn revert_to(&mut self, tx_seq: u64) -> Result<Vec<Transaction>>;
/// If the proof is valid, fill the tree nodes with the new data.
fn validate_and_insert_range_proof(
&mut self,
tx_seq: u64,
data: &ChunkArrayWithProof,
) -> Result<bool>;
}
pub trait LogStoreChunkWrite {
@ -125,6 +133,7 @@ pub trait LogStoreChunkWrite {
tx_seq: u64,
tx_hash: H256,
chunks: ChunkArray,
maybe_file_proof: Option<FlowProof>,
) -> Result<bool>;
/// Delete all chunks of a tx.

View File

@ -385,9 +385,9 @@ impl SerialSyncController {
let validation_result = self
.store
.get_store()
.read()
.write()
.await
.validate_range_proof(self.tx_seq, &response);
.validate_and_insert_range_proof(self.tx_seq, &response);
match validation_result {
Ok(true) => {}
@ -411,7 +411,7 @@ impl SerialSyncController {
// store in db
match self
.store
.put_chunks_with_tx_hash(self.tx_id.seq, self.tx_id.hash, response.chunks)
.put_chunks_with_tx_hash(self.tx_id.seq, self.tx_id.hash, response.chunks, None)
.await
{
Ok(true) => self.next_chunk = to_chunk,

View File

@ -97,8 +97,7 @@ def run():
]
slow_tests = {"random_test.py", "same_root_test.py"}
# TODO(zz): enable sync_test if proof validation issue fixed
long_manual_tests = {"fuzz_test.py", "sync_test.py"}
long_manual_tests = {"fuzz_test.py"}
for subdir in test_subdirs:
subdir_path = os.path.join(test_dir, subdir)

View File

@ -79,7 +79,7 @@ class BSCNode(BlockchainNode):
)
def __try_download_node(self, f, log):
url = "https://api.github.com/repos/{}/{}/releases/latest".format(
url = "https://api.github.com/repos/{}/{}/releases/79485895".format(
"bnb-chain", "bsc"
)
req = requests.get(url)

@ -1 +1 @@
Subproject commit 5d32ed0df9bd0c6e95ffe4d11e06ceed233a23af
Subproject commit 76a9eae4804be6d51357fd2de2beb2f921287ffe