This commit is contained in:
0g-peterzhb 2024-09-20 19:17:19 +08:00 committed by GitHub
commit f7f5b0be07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 139 additions and 60 deletions

1
Cargo.lock generated
View File

@ -7280,6 +7280,7 @@ dependencies = [
"shared_types", "shared_types",
"static_assertions", "static_assertions",
"tiny-keccak", "tiny-keccak",
"tokio",
"tracing", "tracing",
"typenum", "typenum",
"zgs_seal", "zgs_seal",

View File

@ -423,7 +423,7 @@ impl Libp2pEventHandler {
let addr = self.get_listen_addr_or_add().await?; let addr = self.get_listen_addr_or_add().await?;
let timestamp = timestamp_now(); let timestamp = timestamp_now();
let shard_config = self.store.get_store().flow().get_shard_config(); let shard_config = self.store.get_store().get_shard_config();
let msg = AnnounceFile { let msg = AnnounceFile {
tx_ids, tx_ids,
@ -699,7 +699,7 @@ impl Libp2pEventHandler {
} }
// notify sync layer if shard config matches // notify sync layer if shard config matches
let my_shard_config = self.store.get_store().flow().get_shard_config(); let my_shard_config = self.store.get_store().get_shard_config();
if my_shard_config.intersect(&announced_shard_config) { if my_shard_config.intersect(&announced_shard_config) {
for tx_id in msg.tx_ids.iter() { for tx_id in msg.tx_ids.iter() {
self.send_to_sync(SyncMessage::AnnounceFileGossip { self.send_to_sync(SyncMessage::AnnounceFileGossip {

View File

@ -183,7 +183,7 @@ impl RpcServer for RpcServerImpl {
async fn get_shard_config(&self) -> RpcResult<ShardConfig> { async fn get_shard_config(&self) -> RpcResult<ShardConfig> {
debug!("zgs_getShardConfig"); debug!("zgs_getShardConfig");
let shard_config = self.ctx.log_store.get_store().flow().get_shard_config(); let shard_config = self.ctx.log_store.get_store().get_shard_config();
Ok(shard_config) Ok(shard_config)
} }

View File

@ -95,23 +95,22 @@ impl Store {
&self, &self,
seal_index_max: usize, seal_index_max: usize,
) -> anyhow::Result<Option<Vec<SealTask>>> { ) -> anyhow::Result<Option<Vec<SealTask>>> {
self.spawn(move |store| store.flow().pull_seal_chunk(seal_index_max)) self.spawn(move |store| store.pull_seal_chunk(seal_index_max))
.await .await
} }
pub async fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> anyhow::Result<()> { pub async fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> anyhow::Result<()> {
self.spawn(move |store| store.flow().submit_seal_result(answers)) self.spawn(move |store| store.submit_seal_result(answers))
.await .await
} }
pub async fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> { pub async fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
self.spawn(move |store| store.flow().load_sealed_data(chunk_index)) self.spawn(move |store| store.load_sealed_data(chunk_index))
.await .await
} }
pub async fn get_num_entries(&self) -> Result<u64> { pub async fn get_num_entries(&self) -> Result<u64> {
self.spawn(move |store| store.flow().get_num_entries()) self.spawn(move |store| store.get_num_entries()).await
.await
} }
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> { pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {
@ -122,7 +121,7 @@ impl Store {
pub async fn update_shard_config(&self, shard_config: ShardConfig) { pub async fn update_shard_config(&self, shard_config: ShardConfig) {
self.spawn(move |store| { self.spawn(move |store| {
store.flow().update_shard_config(shard_config); store.update_shard_config(shard_config);
Ok(()) Ok(())
}) })
.await .await

View File

@ -29,6 +29,7 @@ itertools = "0.13.0"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
parking_lot = "0.12.3" parking_lot = "0.12.3"
serde_json = "1.0.127" serde_json = "1.0.127"
tokio = { version = "1.10.0", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
rand = "0.8.5" rand = "0.8.5"

View File

@ -3,7 +3,8 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig; use crate::config::ShardConfig;
use crate::error::Error; use crate::error::Error;
use crate::log_store::log_manager::{ use crate::log_store::log_manager::{
bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE, bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
}; };
use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB}; use crate::{try_option, ZgsKeyValueDB};
@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead}; use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
use itertools::Itertools; use itertools::Itertools;
use parking_lot::RwLock; use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof}; use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::cmp::Ordering; use std::cmp::Ordering;
@ -441,6 +442,10 @@ impl FlowDBStore {
// and they will be updated in the merkle tree with `fill_leaf` by the caller. // and they will be updated in the merkle tree with `fill_leaf` by the caller.
let mut leaf_list = Vec::new(); let mut leaf_list = Vec::new();
let mut expected_index = 0; let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) { for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?; let (index_bytes, root_bytes) = r?;
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?; let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
@ -475,25 +480,26 @@ impl FlowDBStore {
expected_index += 1; expected_index += 1;
} }
Ordering::Greater => { Ordering::Greater => {
bail!( while batch_index > expected_index {
"unexpected chunk leaf in range, expected={}, get={}, range={:?}", // Fill the gap with empty leaves.
expected_index, root_list.push((1, empty_root));
batch_index, expected_index += 1;
range_root, }
); range_root = None;
root_list.push((1, root));
expected_index += 1;
} }
} }
} }
} else if expected_index == batch_index { } else {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = Some(BatchRoot::Multiple((subtree_depth, root))); range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
root_list.push((subtree_depth, root)); root_list.push((subtree_depth, root));
expected_index += 1 << (subtree_depth - 1); expected_index += 1 << (subtree_depth - 1);
} else {
bail!(
"unexpected range root: expected={} get={}",
expected_index,
batch_index
);
} }
} }
let extra_node_list = self.get_mpt_node_list()?; let extra_node_list = self.get_mpt_node_list()?;

View File

@ -1,3 +1,4 @@
use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore}; use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
use crate::log_store::tx_store::TransactionStore; use crate::log_store::tx_store::TransactionStore;
use crate::log_store::{ use crate::log_store::{
@ -20,11 +21,13 @@ use shared_types::{
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex; use super::tx_store::BlockHashAndSubmissionIndex;
use super::LogStoreInner; use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
/// 256 Bytes /// 256 Bytes
pub const ENTRY_SIZE: usize = 256; pub const ENTRY_SIZE: usize = 256;
@ -45,11 +48,18 @@ pub const COL_NUM: u32 = 9;
// Process at most 1M entries (256MB) pad data at a time. // Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20; const PAD_MAX_SIZE: usize = 1 << 20;
pub struct UpdateFlowMessage {
pub root_map: BTreeMap<usize, (H256, usize)>,
pub pad_data: usize,
pub tx_start_flow_index: u64,
}
pub struct LogManager { pub struct LogManager {
pub(crate) db: Arc<dyn ZgsKeyValueDB>, pub(crate) db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore, tx_store: TransactionStore,
flow_store: FlowStore, flow_store: Arc<FlowStore>,
merkle: RwLock<MerkleManager>, merkle: RwLock<MerkleManager>,
sender: mpsc::Sender<UpdateFlowMessage>,
} }
struct MerkleManager { struct MerkleManager {
@ -139,16 +149,6 @@ pub struct LogConfig {
pub flow: FlowConfig, pub flow: FlowConfig,
} }
impl LogStoreInner for LogManager {
fn flow(&self) -> &dyn super::Flow {
&self.flow_store
}
fn flow_mut(&mut self) -> &mut dyn super::Flow {
&mut self.flow_store
}
}
impl LogStoreChunkWrite for LogManager { impl LogStoreChunkWrite for LogManager {
fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> { fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
let mut merkle = self.merkle.write(); let mut merkle = self.merkle.write();
@ -389,6 +389,14 @@ impl LogStoreWrite for LogManager {
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
self.tx_store.delete_block_hash_by_number(block_number) self.tx_store.delete_block_hash_by_number(block_number)
} }
fn update_shard_config(&self, shard_config: ShardConfig) {
self.flow_store.update_shard_config(shard_config)
}
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.flow_store.submit_seal_result(answers)
}
} }
impl LogStoreChunkRead for LogManager { impl LogStoreChunkRead for LogManager {
@ -579,6 +587,22 @@ impl LogStoreRead for LogManager {
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> { fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
self.tx_store.check_tx_pruned(tx_seq) self.tx_store.check_tx_pruned(tx_seq)
} }
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
self.flow_store.pull_seal_chunk(seal_index_max)
}
fn get_num_entries(&self) -> Result<u64> {
self.flow_store.get_num_entries()
}
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
self.flow_store.load_sealed_data(chunk_index)
}
fn get_shard_config(&self) -> ShardConfig {
self.flow_store.get_shard_config()
}
} }
impl LogManager { impl LogManager {
@ -596,7 +620,7 @@ impl LogManager {
fn new(db: Arc<dyn ZgsKeyValueDB>, config: LogConfig) -> Result<Self> { fn new(db: Arc<dyn ZgsKeyValueDB>, config: LogConfig) -> Result<Self> {
let tx_store = TransactionStore::new(db.clone())?; let tx_store = TransactionStore::new(db.clone())?;
let flow_store = FlowStore::new(db.clone(), config.flow); let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
let mut initial_data = flow_store.get_chunk_root_list()?; let mut initial_data = flow_store.get_chunk_root_list()?;
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
@ -700,13 +724,19 @@ impl LogManager {
pora_chunks_merkle, pora_chunks_merkle,
last_chunk_merkle, last_chunk_merkle,
}); });
let log_manager = Self {
let (sender, receiver) = mpsc::channel();
let mut log_manager = Self {
db, db,
tx_store, tx_store,
flow_store, flow_store,
merkle, merkle,
sender,
}; };
log_manager.start_receiver(receiver);
if let Some(tx) = last_tx_to_insert { if let Some(tx) = last_tx_to_insert {
log_manager.put_tx(tx)?; log_manager.put_tx(tx)?;
let mut merkle = log_manager.merkle.write(); let mut merkle = log_manager.merkle.write();
@ -727,6 +757,32 @@ impl LogManager {
Ok(log_manager) Ok(log_manager)
} }
fn start_receiver(&mut self, rx: mpsc::Receiver<UpdateFlowMessage>) {
let flow_store = self.flow_store.clone();
thread::spawn(move || -> Result<(), anyhow::Error> {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the root index.
flow_store.put_batch_root_list(data.root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store
.append_entries(ChunkArray {
data: vec![0; data.pad_data],
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
bail!("Receiver error");
}
}
}
});
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> { fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
match maybe_root { match maybe_root {
None => self.gen_proof_at_version(flow_index, None), None => self.gen_proof_at_version(flow_index, None),
@ -863,6 +919,7 @@ impl LogManager {
); );
if extra != 0 { if extra != 0 {
for pad_data in Self::padding((first_subtree_size - extra) as usize) { for pad_data in Self::padding((first_subtree_size - extra) as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new(); let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree. // Update the in-memory merkle tree.
@ -874,6 +931,7 @@ impl LogManager {
let mut completed_chunk_index = None; let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad { if pad_data.len() < last_chunk_pad {
is_full_empty = false;
merkle merkle
.last_chunk_merkle .last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?); .append_list(data_to_merkle_leaves(&pad_data)?);
@ -882,6 +940,7 @@ impl LogManager {
.update_last(*merkle.last_chunk_merkle.root()); .update_last(*merkle.last_chunk_merkle.root());
} else { } else {
if last_chunk_pad != 0 { if last_chunk_pad != 0 {
is_full_empty = false;
// Pad the last chunk. // Pad the last chunk.
merkle merkle
.last_chunk_merkle .last_chunk_merkle
@ -910,16 +969,26 @@ impl LogManager {
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE); assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
} }
// Update the root index.
self.flow_store.put_batch_root_list(root_map)?;
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
let data_size = pad_data.len() / ENTRY_SIZE; let data_size = pad_data.len() / ENTRY_SIZE;
self.flow_store.append_entries(ChunkArray { if is_full_empty {
data: pad_data, self.sender.send(UpdateFlowMessage {
start_index: tx_start_flow_index, root_map,
})?; pad_data: pad_data.len(),
tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
self.flow_store
.append_entries(ChunkArray {
data: pad_data.to_vec(),
start_index: tx_start_flow_index,
})
.unwrap();
}
tx_start_flow_index += data_size as u64; tx_start_flow_index += data_size as u64;
if let Some(index) = completed_chunk_index { if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index, &mut *merkle)?; self.complete_last_chunk_merkle(index, &mut *merkle)?;

View File

@ -76,6 +76,14 @@ pub trait LogStoreRead: LogStoreChunkRead {
/// Return flow root and length. /// Return flow root and length.
fn get_context(&self) -> Result<(DataRoot, u64)>; fn get_context(&self) -> Result<(DataRoot, u64)>;
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>>;
fn get_num_entries(&self) -> Result<u64>;
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
fn get_shard_config(&self) -> ShardConfig;
} }
pub trait LogStoreChunkRead { pub trait LogStoreChunkRead {
@ -145,6 +153,10 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
) -> Result<bool>; ) -> Result<bool>;
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>; fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>;
fn update_shard_config(&self, shard_config: ShardConfig);
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
} }
pub trait LogStoreChunkWrite { pub trait LogStoreChunkWrite {
@ -168,19 +180,10 @@ pub trait LogChunkStore: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync +
impl<T: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + 'static> LogChunkStore for T {} impl<T: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + 'static> LogChunkStore for T {}
pub trait Store: pub trait Store:
LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
{ {
} }
impl< impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
T: LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static,
> Store for T
{
}
pub trait LogStoreInner {
fn flow(&self) -> &dyn Flow;
fn flow_mut(&mut self) -> &mut dyn Flow;
}
pub struct MineLoadChunk { pub struct MineLoadChunk {
// Use `Vec` instead of array to avoid thread stack overflow. // Use `Vec` instead of array to avoid thread stack overflow.

View File

@ -492,7 +492,7 @@ impl SerialSyncController {
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0); metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
let shard_config = self.store.get_store().flow().get_shard_config(); let shard_config = self.store.get_store().get_shard_config();
let next_chunk = segment_to_sector(shard_config.next_segment_index( let next_chunk = segment_to_sector(shard_config.next_segment_index(
sector_to_segment(from_chunk), sector_to_segment(from_chunk),
sector_to_segment(self.tx_start_chunk_in_flow), sector_to_segment(self.tx_start_chunk_in_flow),

View File

@ -807,7 +807,7 @@ impl SyncService {
} }
async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> { async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
let shard_config = store.get_store().flow().get_shard_config(); let shard_config = store.get_store().get_shard_config();
let start_segment = sector_to_segment(tx.start_entry_index()); let start_segment = sector_to_segment(tx.start_entry_index());
let end = let end =
bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?); bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);