Generate sync proof with the given tx seq. (#123)

* Generate sync proof with the given tx seq.

* Fill HistoryTree with new data.

* Fix clippy.
This commit is contained in:
peilun-conflux 2024-07-12 15:55:57 +08:00 committed by GitHub
parent 2bc402f94b
commit ba8d065e73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 125 additions and 53 deletions

View File

@ -4,7 +4,7 @@ mod sha3;
use anyhow::{anyhow, bail, Result};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::marker::PhantomData;
use tracing::{trace, warn};
@ -20,7 +20,7 @@ pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
layers: Vec<Vec<E>>,
/// Keep the delta nodes that can be used to construct a history tree.
/// The key is the root node of that version.
delta_nodes_map: HashMap<u64, DeltaNodes<E>>,
delta_nodes_map: BTreeMap<u64, DeltaNodes<E>>,
root_to_tx_seq_map: HashMap<E, u64>,
/// For `last_chunk_merkle` after the first chunk, this is set to `Some(10)` so that
@ -36,7 +36,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
let mut merkle = Self {
layers: vec![leaves],
delta_nodes_map: HashMap::new(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: None,
leaf_height,
@ -68,7 +68,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
) -> Result<Self> {
let mut merkle = Self {
layers: vec![vec![]],
delta_nodes_map: HashMap::new(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: None,
leaf_height,
@ -103,7 +103,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// Create an empty merkle tree with `depth`.
let mut merkle = Self {
layers: vec![vec![]; depth],
delta_nodes_map: HashMap::new(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: Some(depth),
leaf_height: 0,
@ -123,7 +123,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
layers[0] = leaves;
let mut merkle = Self {
layers,
delta_nodes_map: HashMap::new(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: Some(depth),
leaf_height: 0,
@ -288,23 +288,6 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
Ok(updated_nodes)
}
pub fn gen_range_proof(&self, start_index: usize, end_index: usize) -> Result<RangeProof<E>> {
if end_index <= start_index {
bail!(
"invalid proof range: start={} end={}",
start_index,
end_index
);
}
// TODO(zz): Optimize range proof.
let left_proof = self.gen_proof(start_index)?;
let right_proof = self.gen_proof(end_index - 1)?;
Ok(RangeProof {
left_proof,
right_proof,
})
}
pub fn check_root(&self, root: &E) -> bool {
self.root_to_tx_seq_map.contains_key(root)
}
@ -531,14 +514,17 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
Ok(())
}
pub fn at_root_version(&self, root_hash: &E) -> Result<HistoryTree<E>> {
let tx_seq = self
.root_to_tx_seq_map
pub fn tx_seq_at_root(&self, root_hash: &E) -> Result<u64> {
self.root_to_tx_seq_map
.get(root_hash)
.ok_or_else(|| anyhow!("old root unavailable, root={:?}", root_hash))?;
.cloned()
.ok_or_else(|| anyhow!("old root unavailable, root={:?}", root_hash))
}
pub fn at_version(&self, tx_seq: u64) -> Result<HistoryTree<E>> {
let delta_nodes = self
.delta_nodes_map
.get(tx_seq)
.get(&tx_seq)
.ok_or_else(|| anyhow!("tx_seq unavailable, tx_seq={:?}", tx_seq))?;
if delta_nodes.height() == 0 {
bail!("empty tree");
@ -654,8 +640,8 @@ impl<'a, E: HashElement> MerkleTreeRead for HistoryTree<'a, E> {
type E = E;
fn node(&self, layer: usize, index: usize) -> &Self::E {
match self.delta_nodes.get(layer, index).expect("range checked") {
Some(node) => node,
None => &self.layers[layer][index],
Some(node) if *node != E::null() => node,
_ => &self.layers[layer][index],
}
}
@ -746,7 +732,8 @@ mod tests {
new_data.push(H256::random());
}
merkle.append_list(new_data);
merkle.commit(Some(i as u64 / 6 + 1));
let seq = i as u64 / 6 + 1;
merkle.commit(Some(seq));
let r = range_proof.validate::<Sha3Algorithm>(&data[i..end], i + 1);
assert!(r.is_ok(), "{:?}", r);
merkle.fill_with_range_proof(range_proof).unwrap();
@ -754,6 +741,35 @@ mod tests {
}
}
#[test]
fn test_proof_at_version() {
let n = [2, 255, 256, 257];
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
let mut start_pos = 0;
for (tx_seq, &entry_len) in n.iter().enumerate() {
let mut data = Vec::new();
for _ in 0..entry_len {
data.push(H256::random());
}
merkle.append_list(data.clone());
merkle.commit(Some(tx_seq as u64));
for i in (0..data.len()).step_by(6) {
let end = std::cmp::min(start_pos + i + 3, data.len());
let range_proof = merkle
.at_version(tx_seq as u64)
.unwrap()
.gen_range_proof(start_pos + i + 1, start_pos + end + 1)
.unwrap();
let r = range_proof.validate::<Sha3Algorithm>(&data[i..end], start_pos + i + 1);
assert!(r.is_ok(), "{:?}", r);
merkle.fill_with_range_proof(range_proof).unwrap();
}
start_pos += entry_len;
}
}
fn verify(data: &[H256], merkle: &mut AppendMerkleTree<H256, Sha3Algorithm>) {
for (i, item) in data.iter().enumerate() {
let proof = merkle.gen_proof(i + 1).unwrap();

View File

@ -1,5 +1,5 @@
use crate::sha3::Sha3Algorithm;
use crate::Proof;
use crate::{Proof, RangeProof};
use anyhow::{bail, Result};
use ethereum_types::H256;
use once_cell::sync::Lazy;
@ -114,6 +114,23 @@ pub trait MerkleTreeRead {
}
Ok(Proof::new(lemma, path))
}
fn gen_range_proof(&self, start_index: usize, end_index: usize) -> Result<RangeProof<Self::E>> {
if end_index <= start_index {
bail!(
"invalid proof range: start={} end={}",
start_index,
end_index
);
}
// TODO(zz): Optimize range proof.
let left_proof = self.gen_proof(start_index)?;
let right_proof = self.gen_proof(end_index - 1)?;
Ok(RangeProof {
left_proof,
right_proof,
})
}
}
/// This includes the data to reconstruct an `AppendMerkleTree` root where some nodes

View File

@ -184,6 +184,7 @@ pub struct GetChunksRequest {
pub tx_id: TxID,
pub index_start: u64,
pub index_end: u64,
pub merkle_tx_seq: u64,
}
/* RPC Handling and Grouping */

View File

@ -856,6 +856,7 @@ mod tests {
tx_id: TxID::random_hash(7),
index_start: 66,
index_end: 99,
merkle_tx_seq: 7,
};
handler
.on_rpc_request(alice, id, Request::GetChunks(raw_request.clone()))

View File

@ -121,7 +121,7 @@ impl RpcServer for RpcServerImpl {
let segment = try_option!(
self.ctx
.log_store
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index)
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, start_index, end_index, None)
.await?
);

View File

@ -47,7 +47,7 @@ impl Store {
delegate!(fn check_tx_completed(tx_seq: u64) -> Result<bool>);
delegate!(fn get_chunk_by_tx_and_index(tx_seq: u64, index: usize) -> Result<Option<Chunk>>);
delegate!(fn get_chunks_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArray>>);
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize) -> Result<Option<ChunkArrayWithProof>>);
delegate!(fn get_chunks_with_proof_by_tx_and_index_range(tx_seq: u64, index_start: usize, index_end: usize, merkle_tx_seq: Option<u64>) -> Result<Option<ChunkArrayWithProof>>);
delegate!(fn get_tx_by_seq_number(seq: u64) -> Result<Option<Transaction>>);
delegate!(fn put_chunks(tx_seq: u64, chunks: ChunkArray) -> Result<()>);
delegate!(fn put_chunks_with_tx_hash(tx_seq: u64, tx_hash: H256, chunks: ChunkArray, maybe_file_proof: Option<FlowProof>) -> Result<bool>);

View File

@ -174,7 +174,7 @@ fn read_performance(c: &mut Criterion) {
store
.read()
.unwrap()
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, index_start, index_end)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, index_start, index_end, None)
.unwrap();
})
});

View File

@ -245,7 +245,7 @@ impl LogStoreWrite for LogManager {
fn put_tx(&self, tx: Transaction) -> Result<()> {
let mut merkle = self.merkle.write();
debug!("put_tx: tx={:?}", tx);
let expected_seq = self.next_tx_seq();
let expected_seq = self.tx_store.next_tx_seq();
if tx.seq != expected_seq {
if tx.seq + 1 == expected_seq && !self.check_tx_completed(tx.seq)? {
// special case for rerun the last tx during recovery.
@ -474,7 +474,8 @@ impl LogStoreRead for LogManager {
let single_chunk_array = try_option!(self.get_chunks_with_proof_by_tx_and_index_range(
tx_seq,
index,
index + 1
index + 1,
None
)?);
Ok(Some(ChunkWithProof {
chunk: Chunk(single_chunk_array.chunks.data.as_slice().try_into()?),
@ -487,12 +488,15 @@ impl LogStoreRead for LogManager {
tx_seq: u64,
index_start: usize,
index_end: usize,
merkle_tx_seq: Option<u64>,
) -> crate::error::Result<Option<ChunkArrayWithProof>> {
let tx = try_option!(self.tx_store.get_tx_by_seq_number(tx_seq)?);
let chunks =
try_option!(self.get_chunks_by_tx_and_index_range(tx_seq, index_start, index_end)?);
let left_proof = self.gen_proof(tx.start_entry_index + index_start as u64, None)?;
let right_proof = self.gen_proof(tx.start_entry_index + index_end as u64 - 1, None)?;
let left_proof =
self.gen_proof_at_version(tx.start_entry_index + index_start as u64, merkle_tx_seq)?;
let right_proof =
self.gen_proof_at_version(tx.start_entry_index + index_end as u64 - 1, merkle_tx_seq)?;
Ok(Some(ChunkArrayWithProof {
chunks,
proof: FlowRangeProof {
@ -709,13 +713,28 @@ impl LogManager {
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
match maybe_root {
None => self.gen_proof_at_version(flow_index, None),
Some(root) => {
let merkle = self.merkle.read_recursive();
let tx_seq = merkle.pora_chunks_merkle.tx_seq_at_root(&root)?;
self.gen_proof_at_version(flow_index, Some(tx_seq))
}
}
}
fn gen_proof_at_version(
&self,
flow_index: u64,
maybe_tx_seq: Option<u64>,
) -> Result<FlowProof> {
let merkle = self.merkle.read_recursive();
let chunk_index = flow_index / PORA_CHUNK_SIZE as u64;
let top_proof = match maybe_root {
let top_proof = match maybe_tx_seq {
None => merkle.pora_chunks_merkle.gen_proof(chunk_index as usize)?,
Some(root) => merkle
Some(tx_seq) => merkle
.pora_chunks_merkle
.at_root_version(&root)?
.at_version(tx_seq)?
.gen_proof(chunk_index as usize)?,
};
@ -732,13 +751,13 @@ impl LogManager {
self.flow_store
.gen_proof_in_batch(chunk_index as usize, flow_index as usize % PORA_CHUNK_SIZE)?
} else {
match maybe_root {
match maybe_tx_seq {
None => merkle
.last_chunk_merkle
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
Some(root) => merkle
Some(tx_version) => merkle
.last_chunk_merkle
.at_root_version(&root)?
.at_version(tx_version)?
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
}
};
@ -922,6 +941,9 @@ impl LogManager {
.last_chunk_merkle
.fill_leaf(chunk_start_index + local_index, Sha3Algorithm::leaf(entry));
}
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
}
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
for (chunk_index, chunk_root) in chunk_roots {

View File

@ -48,6 +48,7 @@ pub trait LogStoreRead: LogStoreChunkRead {
tx_seq: u64,
index_start: usize,
index_end: usize,
merkle_tx_seq: Option<u64>,
) -> Result<Option<ChunkArrayWithProof>>;
fn check_tx_completed(&self, tx_seq: u64) -> Result<bool>;

View File

@ -92,7 +92,7 @@ fn test_put_get() {
for i in (0..chunk_count).step_by(PORA_CHUNK_SIZE / 3) {
let end = std::cmp::min(i + PORA_CHUNK_SIZE, chunk_count);
let chunk_array_with_proof = store
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, i, end)
.get_chunks_with_proof_by_tx_and_index_range(tx.seq, i, end, None)
.unwrap()
.unwrap();
assert_eq!(

View File

@ -263,10 +263,16 @@ impl SerialSyncController {
let from_chunk = self.next_chunk;
let to_chunk = std::cmp::min(from_chunk + PORA_CHUNK_SIZE as u64, self.goal.index_end);
let request_id = network::RequestId::Sync(RequestId::SerialSync { tx_id: self.tx_id });
// TODO: It's possible that we read it while `nex_tx_seq - 1` is still being committed.
// We can wait for its commitment, but this will slow down this state machine.
// Or we can use `next_tx_seq - 2`, but for a restarted node without receiving new
// files, this tx seq is also unavailable.
let committed_tx_seq = self.store.get_store().next_tx_seq().saturating_sub(1);
let request = GetChunksRequest {
tx_id: self.tx_id,
index_start: from_chunk,
index_end: to_chunk,
merkle_tx_seq: committed_tx_seq,
};
// select a random peer
@ -865,6 +871,7 @@ mod tests {
tx_id: controller.tx_id,
index_start: 0,
index_end: 123,
merkle_tx_seq: controller.tx_id.seq,
})
);
@ -1148,7 +1155,7 @@ mod tests {
);
let chunks = peer_store
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count, None)
.unwrap()
.unwrap();
@ -1180,7 +1187,7 @@ mod tests {
);
let mut chunks = peer_store
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count, None)
.unwrap()
.unwrap();
@ -1247,7 +1254,7 @@ mod tests {
);
let chunks = peer_store
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count, None)
.unwrap()
.unwrap();
@ -1310,7 +1317,7 @@ mod tests {
);
let chunks = peer_store
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count, None)
.unwrap()
.unwrap();
@ -1382,7 +1389,7 @@ mod tests {
);
let chunks = peer_store
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count, None)
.unwrap()
.unwrap();
@ -1425,7 +1432,7 @@ mod tests {
);
let chunks = peer_store
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, 1024)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, 1024, None)
.unwrap()
.unwrap();
@ -1471,7 +1478,7 @@ mod tests {
);
let chunks = peer_store
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count, None)
.unwrap()
.unwrap();

View File

@ -452,6 +452,7 @@ impl SyncService {
request.tx_id.seq,
request.index_start as usize,
request.index_end as usize,
Some(request.merkle_tx_seq),
)
.await?;
@ -918,6 +919,7 @@ mod tests {
tx_id: runtime.txs[0].id(),
index_start: 0,
index_end: runtime.chunk_count as u64,
merkle_tx_seq: 0,
};
sync_send
@ -978,6 +980,7 @@ mod tests {
tx_id: runtime.txs[0].id(),
index_start: 0,
index_end: 0_u64,
merkle_tx_seq: 0,
};
sync_send
@ -1031,6 +1034,7 @@ mod tests {
},
index_start: 0,
index_end: runtime.chunk_count as u64,
merkle_tx_seq: 1,
};
sync_send
@ -1081,6 +1085,7 @@ mod tests {
tx_id: runtime.txs[0].id(),
index_start: 0,
index_end: runtime.chunk_count as u64 + 1,
merkle_tx_seq: 0,
};
sync_send
@ -1133,6 +1138,7 @@ mod tests {
tx_id: runtime.txs[0].id(),
index_start: 0,
index_end: runtime.chunk_count as u64,
merkle_tx_seq: 0,
};
sync_send
@ -1664,6 +1670,7 @@ mod tests {
tx_seq,
req.index_start as usize,
req.index_end as usize,
None,
)
.unwrap()
.unwrap();