replace filter with eth_getLogs (#32)

* replace filter with eth_getLogs

* update log

* store block history into db

* fix lints error

* add free disk step

* set max worker to 3

* resolve comments and refact code

* resolve comments

* set max-workers to 4

* set max-workers to 3
This commit is contained in:
Joel Liu 2024-03-27 13:54:06 +08:00 committed by GitHub
parent 06a335add2
commit f9ce286909
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 685 additions and 219 deletions

View File

@ -15,6 +15,19 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: true
- name: Checkout sources
uses: actions/checkout@v3
with:
@ -40,4 +53,4 @@ jobs:
- name: Run tests
run: |
cd tests
python test_all.py
python test_all.py --max-workers 3

View File

@ -25,6 +25,13 @@ pub struct LogSyncConfig {
// the duration between each paginated getLogs RPC call, in ms.
// This is set to avoid triggering the throttling mechanism in the RPC server.
pub recover_query_delay: u64,
// the counter assumed the finalized block behind the latest block
pub default_finalized_block_count: u64,
// remove finalized block trigger interval
pub remove_finalized_block_interval_minutes: u64,
// watch_loop (eth_getLogs) trigger interval
pub watch_loop_wait_time_ms: u64,
}
#[derive(Clone)]
@ -48,6 +55,9 @@ impl LogSyncConfig {
timeout_retries: u32,
initial_backoff: u64,
recover_query_delay: u64,
default_finalized_block_count: u64,
remove_finalized_block_interval_minutes: u64,
watch_loop_wait_time_ms: u64,
) -> Self {
Self {
rpc_endpoint_url,
@ -60,6 +70,9 @@ impl LogSyncConfig {
timeout_retries,
initial_backoff,
recover_query_delay,
default_finalized_block_count,
remove_finalized_block_interval_minutes,
watch_loop_wait_time_ms,
}
}
}

View File

@ -1,22 +1,26 @@
use crate::rpc_proxy::ContractAddress;
use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::{repeat_run_and_log, RETRY_WAIT_MS};
use crate::sync_manager::RETRY_WAIT_MS;
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm};
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
use ethers::abi::RawLog;
use ethers::prelude::{BlockNumber, EthLogDecode, Http, Log, Middleware, Provider, U256};
use ethers::providers::{FilterKind, HttpRateLimitRetryPolicy, RetryClient, RetryClientBuilder};
use ethers::prelude::{BlockNumber, EthLogDecode, Http, Middleware, Provider};
use ethers::providers::{HttpRateLimitRetryPolicy, RetryClient, RetryClientBuilder};
use ethers::types::H256;
use futures::StreamExt;
use jsonrpsee::tracing::{debug, error, info};
use jsonrpsee::tracing::{debug, error, info, warn};
use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
};
pub struct LogEntryFetcher {
contract_address: ContractAddress,
@ -52,6 +56,150 @@ impl LogEntryFetcher {
})
}
pub fn handle_reorg(
&self,
block_number: u64,
block_hash: H256,
executor: &TaskExecutor,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
) -> UnboundedReceiver<LogFetchProgress> {
let (reorg_tx, reorg_rx) = tokio::sync::mpsc::unbounded_channel();
let provider = self.provider.clone();
executor.spawn(
async move {
let mut block_number = block_number;
let mut block_hash = block_hash;
debug!(
"handle_reorg starts, block number={} hash={}",
block_number, block_hash
);
loop {
match provider.get_block(block_number).await {
Ok(Some(b)) => {
if b.hash == Some(block_hash) {
break;
} else {
warn!(
"log sync reorg check hash fails, \
block_number={:?} expect={:?} get={:?}",
block_number, block_hash, b.hash
);
match revert_one_block(
block_hash,
block_number,
&reorg_tx,
&block_hash_cache,
)
.await
{
Ok((parent_block_number, parent_block_hash)) => {
block_number = parent_block_number;
block_hash = parent_block_hash;
}
Err(e) => {
error!("revert block fails, e={:?}", e);
}
}
}
}
e => {
error!("handle reorg fails, e={:?}", e);
}
};
}
},
"handle reorg",
);
reorg_rx
}
pub fn start_remove_finalized_block_task(
&self,
executor: &TaskExecutor,
store: Arc<RwLock<dyn Store>>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
default_finalized_block_count: u64,
remove_finalized_block_interval_minutes: u64,
) {
let provider = self.provider.clone();
executor.spawn(
async move {
loop {
debug!("processing finalized block");
let processed_block_number = match store.read().await.get_sync_progress() {
Ok(Some((processed_block_number, _))) => Some(processed_block_number),
Ok(None) => None,
Err(e) => {
error!("get sync progress error: e={:?}", e);
None
}
};
if let Some(processed_block_number) = processed_block_number {
let finalized_block_number =
match provider.get_block(BlockNumber::Finalized).await {
Ok(block) => match block {
Some(b) => match b.number {
Some(f) => Some(f.as_u64()),
None => {
error!("block number is none for finalized block");
None
}
},
None => {
error!("finalized block is none");
None
}
},
Err(e) => {
error!("get finalized block number: e={:?}", e);
Some(processed_block_number - default_finalized_block_count)
}
};
if let Some(finalized_block_number) = finalized_block_number {
if processed_block_number >= finalized_block_number {
let mut pending_keys = vec![];
for (key, _) in block_hash_cache.read().await.iter() {
if *key <= finalized_block_number {
pending_keys.push(*key);
} else {
break;
}
}
for key in pending_keys.into_iter() {
if let Err(e) =
store.write().await.delete_block_hash_by_number(key)
{
error!(
"remove block tx for number {} error: e={:?}",
key, e
);
} else {
block_hash_cache.write().await.remove(&key);
}
}
}
}
}
tokio::time::sleep(Duration::from_secs(
60 * remove_finalized_block_interval_minutes,
))
.await;
}
},
"handle reorg",
);
}
pub fn start_recover(
&self,
start_block_number: u64,
@ -87,6 +235,7 @@ impl LogEntryFetcher {
let synced_block = LogFetchProgress::SyncedBlock((
log.block_number.unwrap().as_u64(),
log.block_hash.unwrap(),
None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
@ -133,43 +282,39 @@ impl LogEntryFetcher {
pub fn start_watch(
&self,
start_block_number: u64,
parent_block_hash: H256,
executor: &TaskExecutor,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
watch_loop_wait_time_ms: u64,
) -> UnboundedReceiver<LogFetchProgress> {
let (watch_tx, watch_rx) = tokio::sync::mpsc::unbounded_channel();
let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
let provider = self.provider.clone();
let mut log_confirmation_queue = LogConfirmationQueue::new(self.confirmation_delay);
let confirmation_delay = self.confirmation_delay;
executor.spawn(
async move {
let mut filter = contract
.submit_filter()
.from_block(start_block_number)
.address(contract.address().into())
.filter;
debug!("start_watch starts, start={}", start_block_number);
let mut filter_id =
repeat_run_and_log(|| provider.new_filter(FilterKind::Logs(&filter))).await;
let mut progress = start_block_number;
let mut parent_block_hash = parent_block_hash;
loop {
match Self::watch_loop(
provider.as_ref(),
filter_id,
progress,
parent_block_hash,
&watch_tx,
&mut log_confirmation_queue,
confirmation_delay,
&contract,
&block_hash_cache,
)
.await
{
Err(e) => {
error!("log sync watch error: e={:?}", e);
filter = filter.from_block(progress).address(contract.address());
filter_id = repeat_run_and_log(|| {
provider.new_filter(FilterKind::Logs(&filter))
})
.await;
}
Ok(Some(p)) => {
progress = p;
Ok(Some((p, h, _))) => {
progress = p.saturating_add(1);
parent_block_hash = h;
info!("log sync to block number {:?}", progress);
}
Ok(None) => {
@ -179,7 +324,7 @@ impl LogEntryFetcher {
)
}
}
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(watch_loop_wait_time_ms)).await;
}
},
"log watch",
@ -187,36 +332,106 @@ impl LogEntryFetcher {
watch_rx
}
#[allow(clippy::too_many_arguments)]
async fn watch_loop(
provider: &Provider<RetryClient<Http>>,
filter_id: U256,
block_number: u64,
parent_block_hash: H256,
watch_tx: &UnboundedSender<LogFetchProgress>,
log_confirmation_queue: &mut LogConfirmationQueue,
) -> Result<Option<u64>> {
debug!("get block");
let latest_block = provider
.get_block(BlockNumber::Latest)
.await?
.ok_or_else(|| anyhow!("None for latest block"))?;
debug!("get filter changes");
let logs: Vec<Log> = provider.get_filter_changes(filter_id).await?;
if let Some(reverted) = log_confirmation_queue.push(logs)? {
watch_tx.send(LogFetchProgress::Reverted(reverted))?;
confirmation_delay: u64,
contract: &ZgsFlow<Provider<RetryClient<Http>>>,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
) -> Result<Option<(u64, H256, Option<Option<u64>>)>> {
let latest_block_number = provider.get_block_number().await?.as_u64();
debug!(
"block number {}, latest block number {}, confirmation_delay {}",
block_number, latest_block_number, confirmation_delay
);
if block_number > latest_block_number.saturating_sub(confirmation_delay) {
return Ok(None);
}
debug!("get filter end");
for log in log_confirmation_queue.confirm_logs(latest_block.number.unwrap().as_u64()) {
assert!(!log.removed.unwrap_or(false));
// TODO(zz): Log parse error means logs might be lost here.
let block = provider
.get_block_with_txs(block_number)
.await?
.ok_or_else(|| anyhow!("None for block {}", block_number))?;
if block_number > 0 && block.parent_hash != parent_block_hash {
// reorg happened
let (parent_block_number, block_hash) = revert_one_block(
parent_block_hash,
block_number.saturating_sub(1),
watch_tx,
block_hash_cache,
)
.await?;
return Ok(Some((parent_block_number, block_hash, None)));
}
let txs_hm = block
.transactions
.iter()
.map(|tx| (tx.transaction_index, tx))
.collect::<HashMap<_, _>>();
let filter = contract
.submit_filter()
.from_block(block_number)
.to_block(block_number)
.address(contract.address().into())
.filter;
let mut logs = vec![];
let mut first_submission_index = None;
for log in provider.get_logs(&filter).await? {
if log.block_hash != block.hash {
bail!(
"log block hash mismatch, log block hash {:?}, block hash {:?}",
log.block_hash,
block.hash
);
}
if log.block_number != block.number {
bail!(
"log block num mismatch, log block number {:?}, block number {:?}",
log.block_number,
block.number
);
}
let tx = txs_hm[&log.transaction_index];
if log.transaction_hash != Some(tx.hash) {
bail!(
"log tx hash mismatch, log transaction {:?}, block transaction {:?}",
log.transaction_hash,
tx.hash
);
}
if log.transaction_index != tx.transaction_index {
bail!(
"log tx index mismatch, log tx index {:?}, block transaction index {:?}",
log.transaction_index,
tx.transaction_index
);
}
let tx = SubmitFilter::decode_log(&RawLog {
topics: log.topics,
data: log.data.to_vec(),
})?;
watch_tx.send(submission_event_to_transaction(tx))?;
if first_submission_index.is_none()
|| first_submission_index > Some(tx.submission_index.as_u64())
{
first_submission_index = Some(tx.submission_index.as_u64());
}
logs.push(submission_event_to_transaction(tx));
}
let progress = if latest_block.hash.is_some() && latest_block.number.is_some() {
let progress = if block.hash.is_some() && block.number.is_some() {
Some((
latest_block.number.unwrap().as_u64(),
latest_block.hash.unwrap(),
block.number.unwrap().as_u64(),
block.hash.unwrap(),
Some(first_submission_index),
))
} else {
None
@ -224,7 +439,12 @@ impl LogEntryFetcher {
if let Some(p) = &progress {
watch_tx.send(LogFetchProgress::SyncedBlock(*p))?;
}
Ok(progress.map(|p| p.0))
for log in logs.into_iter() {
watch_tx.send(log)?;
}
Ok(progress)
}
pub fn provider(&self) -> &Provider<RetryClient<Http>> {
@ -232,101 +452,44 @@ impl LogEntryFetcher {
}
}
struct LogConfirmationQueue {
/// Keep the unconfirmed new logs.
/// The key is the block number and the value is the set of needed logs in that block.
queue: VecDeque<(u64, Vec<Log>)>,
async fn revert_one_block(
block_hash: H256,
block_number: u64,
watch_tx: &UnboundedSender<LogFetchProgress>,
block_hash_cache: &Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
) -> Result<(u64, H256), anyhow::Error> {
debug!("revert block {}, block hash {:?}", block_number, block_hash);
let block = block_hash_cache
.read()
.await
.get(&block_number)
.ok_or_else(|| anyhow!("None for block {}", block_number))?
.clone();
latest_block_number: u64,
confirmation_delay: u64,
}
impl LogConfirmationQueue {
fn new(confirmation_delay: u64) -> Self {
Self {
queue: VecDeque::new(),
latest_block_number: 0,
confirmation_delay,
}
}
/// Push a set of new logs.
/// We assumes that these logs are in order, and removed logs are returned first.
///
/// Return `Ok(Some(tx_seq))` of the first reverted tx_seq if chain reorg happens.
/// `Err` is returned if assumptions are violated (like the log have missing fields).
fn push(&mut self, logs: Vec<Log>) -> Result<Option<u64>> {
let mut revert_to = None;
// First merge logs according to the block number.
let mut block_logs: BTreeMap<u64, Vec<Log>> = BTreeMap::new();
let mut removed_block_logs = BTreeMap::new();
for log in logs {
let set = if log.removed.unwrap_or(false) {
&mut removed_block_logs
} else {
&mut block_logs
};
let block_number = log
.block_number
.ok_or_else(|| anyhow!("block number missing"))?
.as_u64();
set.entry(block_number).or_default().push(log);
}
// Handle revert if it happens.
for (block_number, removed_logs) in &removed_block_logs {
if revert_to.is_none() {
let reverted_index = match self.queue.binary_search_by_key(block_number, |e| e.0) {
Ok(x) => x,
Err(x) => x,
};
self.queue.truncate(reverted_index);
let first = removed_logs.first().expect("not empty");
let first_reverted_tx_seq = SubmitFilter::decode_log(&RawLog {
topics: first.topics.clone(),
data: first.data.to_vec(),
})?
.submission_index
.as_u64();
revert_to = Some(first_reverted_tx_seq);
} else {
// Other removed logs should have larger tx seq, so no need to process them.
break;
}
}
// Add new logs to the queue.
for (block_number, new_logs) in block_logs {
if block_number <= self.queue.back().map(|e| e.0).unwrap_or(0) {
bail!("reverted without being notified");
}
self.queue.push_back((block_number, new_logs));
}
Ok(revert_to)
assert!(block_hash == block.block_hash);
if let Some(reverted) = block.first_submission_index {
watch_tx.send(LogFetchProgress::Reverted(reverted))?;
}
/// Pass in the latest block number and return the confirmed logs.
fn confirm_logs(&mut self, latest_block_number: u64) -> Vec<Log> {
self.latest_block_number = latest_block_number;
let mut confirmed_logs = Vec::new();
while let Some((block_number, _)) = self.queue.front() {
if *block_number
> self
.latest_block_number
.wrapping_sub(self.confirmation_delay)
{
break;
}
let (_, mut logs) = self.queue.pop_front().unwrap();
confirmed_logs.append(&mut logs);
}
confirmed_logs
}
let parent_block_number = block_number.saturating_sub(1);
let parent_block_hash = block_hash_cache
.read()
.await
.get(&parent_block_number)
.ok_or_else(|| anyhow!("None for block {}", parent_block_number))?
.clone()
.block_hash;
let synced_block =
LogFetchProgress::SyncedBlock((parent_block_number, parent_block_hash, None));
watch_tx.send(synced_block)?;
Ok((parent_block_number, parent_block_hash))
}
#[derive(Debug)]
pub enum LogFetchProgress {
SyncedBlock((u64, H256)),
SyncedBlock((u64, H256, Option<Option<u64>>)),
Transaction(Transaction),
Reverted(u64),
}

View File

@ -4,6 +4,7 @@ use jsonrpsee::tracing::trace;
use std::future::Future;
use std::time::Duration;
use std::{
cmp::min,
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
@ -100,7 +101,12 @@ where
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
} else {
// if paginatable, load last block
let fut = self.provider.get_block_number();
let fut = match self.filter.get_to_block() {
// if to_block is not none in filter, getLogs from from_block to to_block
Some(number) => Box::pin(async move { Ok(number) }),
// if to_block is none in filter, getLogs from from_block to latest block
_ => self.provider.get_block_number(),
};
rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut));
}
}
@ -113,7 +119,7 @@ where
// this is okay because we will only enter this state when the filter is
// paginatable i.e. from block is set
let from_block = self.filter.get_from_block().unwrap();
let to_block = from_block + self.page_size;
let to_block = min(from_block + self.page_size, last_block);
self.from_block = Some(to_block + 1);
let filter = self
@ -150,7 +156,12 @@ where
// load new logs if there are still more pages to go through
// can safely assume this will always be set in this state
let from_block = self.from_block.unwrap();
let to_block = from_block + self.page_size;
let to_block = if let Some(l) = self.last_block {
// if last_block is not none, only getLogs from to_block to last_block
min(from_block + self.page_size, l)
} else {
from_block + self.page_size
};
// no more pages to load, and everything is consumed
// can safely assume this will always be set in this state

View File

@ -1,16 +1,17 @@
use crate::sync_manager::config::LogSyncConfig;
use crate::sync_manager::data_cache::DataCache;
use crate::sync_manager::log_entry_fetcher::{LogEntryFetcher, LogFetchProgress};
use anyhow::{bail, Result};
use ethers::prelude::Middleware;
use anyhow::{anyhow, bail, Result};
use ethers::{prelude::Middleware, types::BlockNumber};
use futures::FutureExt;
use jsonrpsee::tracing::{debug, error, trace, warn};
use shared_types::{ChunkArray, Transaction};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use storage::log_store::Store;
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedReceiver;
@ -39,6 +40,8 @@ pub struct LogSyncManager {
/// To broadcast events to handle in advance.
event_send: broadcast::Sender<LogSyncEvent>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, BlockHashAndSubmissionIndex>>>,
}
impl LogSyncManager {
@ -75,6 +78,15 @@ impl LogSyncManager {
)
.await?;
let data_cache = DataCache::new(config.cache_config.clone());
let block_hash_cache = Arc::new(RwLock::new(
store
.read()
.await
.get_block_hashes()?
.into_iter()
.collect::<BTreeMap<_, _>>(),
));
let mut log_sync_manager = Self {
config,
log_fetcher,
@ -82,63 +94,172 @@ impl LogSyncManager {
store,
data_cache,
event_send,
block_hash_cache,
};
let finalized_block = match log_sync_manager
.log_fetcher
.provider()
.get_block(BlockNumber::Finalized)
.await
{
Ok(Some(finalized_block)) => finalized_block,
e => {
warn!("unable to get finalized block: {:?}", e);
log_sync_manager
.log_fetcher
.provider()
.get_block(0)
.await?
.ok_or_else(|| anyhow!("None for block 0"))?
}
};
let finalized_block_number = finalized_block
.number
.ok_or_else(|| anyhow!("None block number for finalized block"))?
.as_u64();
// Load previous progress from db and check if chain reorg happens after restart.
// TODO(zz): Handle reorg instead of return.
let start_block_number =
let mut need_handle_reorg = false;
let (mut start_block_number, mut start_block_hash) =
match log_sync_manager.store.read().await.get_sync_progress()? {
// No previous progress, so just use config.
None => log_sync_manager.config.start_block_number,
Some((block_number, block_hash)) => {
match log_sync_manager
None => {
let block_number = log_sync_manager.config.start_block_number;
let block_hash = log_sync_manager
.log_fetcher
.provider()
.get_block(block_number)
.await
{
Ok(Some(b)) => {
if b.hash == Some(block_hash) {
block_number
} else {
warn!(
"log sync progress check hash fails, \
block_number={:?} expect={:?} get={:?}",
block_number, block_hash, b.hash
);
// Assume the blocks before this are not reverted.
block_number.saturating_sub(
log_sync_manager.config.confirmation_block_count,
)
}
}
e => {
error!("log sync progress check rpc fails, e={:?}", e);
bail!("log sync start error");
.await?
.ok_or_else(|| anyhow!("None for block {}", block_number))?
.hash
.ok_or_else(|| {
anyhow!("None block hash for block {}", block_number)
})?;
(block_number, block_hash)
}
Some((block_number, block_hash)) => {
if block_number <= finalized_block_number {
let expect_block_hash = log_sync_manager
.log_fetcher
.provider()
.get_block(block_number)
.await?
.ok_or_else(|| anyhow!("None for block {}", block_number))?
.hash
.ok_or_else(|| {
anyhow!("None block hash for block {}", block_number)
})?;
if expect_block_hash != block_hash {
need_handle_reorg = true;
}
}
(block_number, block_hash)
}
};
let latest_block_number = log_sync_manager
.log_fetcher
.provider()
.get_block_number()
.await?
.as_u64();
debug!(
"current start block number {}, block hash {}, finalized block number {}",
start_block_number, start_block_hash, finalized_block_number
);
if need_handle_reorg {
let reorg_rx = log_sync_manager.log_fetcher.handle_reorg(
start_block_number,
start_block_hash,
&executor_clone,
log_sync_manager.block_hash_cache.clone(),
);
log_sync_manager.handle_data(reorg_rx).await?;
if let Some((block_number, block_hash)) =
log_sync_manager.store.read().await.get_sync_progress()?
{
start_block_number = block_number;
start_block_hash = block_hash;
} else {
bail!("get log sync progress error");
}
}
// Start watching before recovery to ensure that no log is skipped.
// TODO(zz): Rate limit to avoid OOM during recovery.
let watch_rx = log_sync_manager
.log_fetcher
.start_watch(latest_block_number, &executor_clone);
let recover_rx = log_sync_manager.log_fetcher.start_recover(
start_block_number,
// -1 so the recover and watch ranges do not overlap.
latest_block_number.wrapping_sub(1),
let mut submission_idx = None;
let parent_block_hash = if start_block_number >= finalized_block_number {
if start_block_number > 0 {
let parent_block_number = start_block_number.saturating_sub(1);
match log_sync_manager
.block_hash_cache
.read()
.await
.get(&parent_block_number)
{
Some(b) => {
submission_idx = b.first_submission_index; // special case avoid reorg
b.block_hash
}
_ => log_sync_manager
.log_fetcher
.provider()
.get_block(parent_block_number)
.await?
.ok_or_else(|| {
anyhow!("None for block {}", parent_block_number)
})?
.hash
.ok_or_else(|| {
anyhow!("None block hash for block {}", parent_block_number)
})?,
}
} else {
start_block_hash
}
} else {
finalized_block
.hash
.ok_or_else(|| anyhow!("None for finalized block hash"))?
};
if let Some(submission_idx) = submission_idx {
log_sync_manager.process_reverted(submission_idx).await;
}
let watch_rx = log_sync_manager.log_fetcher.start_watch(
if start_block_number >= finalized_block_number {
start_block_number
} else {
finalized_block_number.saturating_add(1)
},
parent_block_hash,
&executor_clone,
Duration::from_millis(log_sync_manager.config.recover_query_delay),
log_sync_manager.block_hash_cache.clone(),
log_sync_manager.config.watch_loop_wait_time_ms,
);
log_sync_manager.handle_data(recover_rx).await?;
if start_block_number < finalized_block_number {
let recover_rx = log_sync_manager.log_fetcher.start_recover(
start_block_number,
finalized_block_number,
&executor_clone,
Duration::from_millis(log_sync_manager.config.recover_query_delay),
);
log_sync_manager.handle_data(recover_rx).await?;
}
log_sync_manager
.log_fetcher
.start_remove_finalized_block_task(
&executor_clone,
log_sync_manager.store.clone(),
log_sync_manager.block_hash_cache.clone(),
log_sync_manager.config.default_finalized_block_count,
log_sync_manager
.config
.remove_finalized_block_interval_minutes,
);
// Syncing `watch_rx` is supposed to block forever.
log_sync_manager.handle_data(watch_rx).await?;
Ok(())
@ -209,23 +330,35 @@ impl LogSyncManager {
while let Some(data) = rx.recv().await {
trace!("handle_data: data={:?}", data);
match data {
LogFetchProgress::SyncedBlock(progress) => {
match self
.log_fetcher
.provider()
.get_block(
progress
.0
.saturating_sub(self.config.confirmation_block_count),
)
.await
{
LogFetchProgress::SyncedBlock((
block_number,
block_hash,
first_submission_index,
)) => {
if first_submission_index.is_some() {
self.block_hash_cache.write().await.insert(
block_number,
BlockHashAndSubmissionIndex {
block_hash,
first_submission_index: first_submission_index.unwrap(),
},
);
}
self.store.write().await.put_sync_progress((
block_number,
block_hash,
first_submission_index,
))?;
match self.log_fetcher.provider().get_block(block_number).await {
Ok(Some(b)) => {
if let (Some(block_number), Some(block_hash)) = (b.number, b.hash) {
self.store
.write()
.await
.put_sync_progress((block_number.as_u64(), block_hash))?;
if b.number != Some(block_number.into()) {
error!(
"block number not match, reorg possible happened, block number {:?}, received {}", b.number, block_number
);
} else if b.hash != Some(block_hash) {
error!("block hash not match, reorg possible happened, block hash {:?}, received {}", b.hash, block_hash);
}
}
e => {
@ -301,19 +434,6 @@ where
}
}
async fn repeat_run_and_log<R, E, F>(f: impl Fn() -> F) -> R
where
E: Debug,
F: Future<Output = std::result::Result<R, E>> + Send,
{
loop {
if let Some(r) = run_and_log(|| {}, f()).await {
break r;
}
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
}
}
pub(crate) mod config;
mod data_cache;
mod log_entry_fetcher;

View File

@ -105,6 +105,9 @@ impl ZgsConfig {
self.timeout_retries,
self.initial_backoff,
self.recover_query_delay,
self.default_finalized_block_count,
self.remove_finalized_block_interval_minutes,
self.watch_loop_wait_time_ms,
))
}

View File

@ -34,6 +34,10 @@ build_config! {
(initial_backoff, (u64), 500)
(recover_query_delay, (u64), 50)
(default_finalized_block_count, (u64), 100)
(remove_finalized_block_interval_minutes, (u64), 30)
(watch_loop_wait_time_ms, (u64), 50)
// rpc
(rpc_enabled, (bool), true)
(rpc_listen_address, (String), "127.0.0.1:5678".to_string())

View File

@ -16,6 +16,14 @@ pub trait ZgsKeyValueDB: KeyValueDB {
self.write(tx)
}
fn puts(&self, items: Vec<(u32, Vec<u8>, Vec<u8>)>) -> std::io::Result<()> {
let mut tx = self.transaction();
items
.into_iter()
.for_each(|(col, key, val)| tx.put(col, &key, &val));
self.write(tx)
}
fn delete(&self, col: u32, key: &[u8]) -> std::io::Result<()> {
let mut tx = self.transaction();
tx.delete(col, key);

View File

@ -22,6 +22,7 @@ use std::path::Path;
use std::sync::Arc;
use tracing::{debug, error, info, instrument, trace, warn};
use super::tx_store::BlockHashAndSubmissionIndex;
use super::LogStoreInner;
/// 256 Bytes
@ -37,7 +38,8 @@ pub const COL_TX_COMPLETED: u32 = 4;
pub const COL_MISC: u32 = 5;
pub const COL_SEAL_CONTEXT: u32 = 6;
pub const COL_FLOW_MPT_NODES: u32 = 7;
pub const COL_NUM: u32 = 8;
pub const COL_BLOCK_PROGRESS: u32 = 8;
pub const COL_NUM: u32 = 9;
// Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20;
@ -255,7 +257,7 @@ impl LogStoreWrite for LogManager {
}
}
fn put_sync_progress(&self, progress: (u64, H256)) -> Result<()> {
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
self.tx_store.put_progress(progress)
}
@ -285,6 +287,10 @@ impl LogStoreWrite for LogManager {
}
Ok(valid)
}
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
self.tx_store.delete_block_hash_by_number(block_number)
}
}
impl LogStoreChunkRead for LogManager {
@ -426,6 +432,14 @@ impl LogStoreRead for LogManager {
self.tx_store.get_progress()
}
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>> {
self.tx_store.get_block_hash_by_number(block_number)
}
fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>> {
self.tx_store.get_block_hashes()
}
fn next_tx_seq(&self) -> u64 {
self.tx_store.next_tx_seq()
}

View File

@ -8,13 +8,15 @@ use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
use crate::error::Result;
use self::tx_store::BlockHashAndSubmissionIndex;
pub mod config;
mod flow_store;
mod load_chunk;
pub mod log_manager;
#[cfg(test)]
mod tests;
mod tx_store;
pub mod tx_store;
/// The trait to read the transactions already appended to the log.
///
@ -53,6 +55,10 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
fn get_block_hash_by_number(&self, block_number: u64) -> Result<Option<(H256, Option<u64>)>>;
fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>>;
fn validate_range_proof(&self, tx_seq: u64, data: &ChunkArrayWithProof) -> Result<bool>;
fn get_proof_at_root(&self, root: &DataRoot, index: u64, length: u64)
@ -108,7 +114,7 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
fn finalize_tx_with_hash(&mut self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
/// Store the progress of synced block number and its hash.
fn put_sync_progress(&self, progress: (u64, H256)) -> Result<()>;
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
/// Revert the log state to a given tx seq.
/// This is needed when transactions are reverted because of chain reorg.
@ -122,6 +128,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
tx_seq: u64,
data: &ChunkArrayWithProof,
) -> Result<bool>;
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>;
}
pub trait LogStoreChunkWrite {

View File

@ -1,6 +1,6 @@
use crate::error::Error;
use crate::log_store::log_manager::{
data_to_merkle_leaves, sub_merkle_tree, COL_MISC, COL_TX, COL_TX_COMPLETED,
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
};
use crate::{try_option, LogManager, ZgsKeyValueDB};
@ -20,6 +20,12 @@ use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq";
#[derive(Clone, Debug)]
pub struct BlockHashAndSubmissionIndex {
pub block_hash: H256,
pub first_submission_index: Option<u64>,
}
pub struct TransactionStore {
kvdb: Arc<dyn ZgsKeyValueDB>,
/// This is always updated before writing the database to ensure no intermediate states.
@ -164,12 +170,21 @@ impl TransactionStore {
}
#[instrument(skip(self))]
pub fn put_progress(&self, progress: (u64, H256)) -> Result<()> {
Ok(self.kvdb.put(
pub fn put_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()> {
let mut items = vec![(
COL_MISC,
LOG_SYNC_PROGRESS_KEY.as_bytes(),
&progress.as_ssz_bytes(),
)?)
LOG_SYNC_PROGRESS_KEY.as_bytes().to_vec(),
(progress.0, progress.1).as_ssz_bytes(),
)];
if let Some(p) = progress.2 {
items.push((
COL_BLOCK_PROGRESS,
progress.0.to_be_bytes().to_vec(),
(progress.1, p).as_ssz_bytes(),
));
}
Ok(self.kvdb.puts(items)?)
}
#[instrument(skip(self))]
@ -182,6 +197,44 @@ impl TransactionStore {
))
}
pub fn get_block_hash_by_number(
&self,
block_number: u64,
) -> Result<Option<(H256, Option<u64>)>> {
Ok(Some(
<(H256, Option<u64>)>::from_ssz_bytes(&try_option!(self
.kvdb
.get(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?))
.map_err(Error::from)?,
))
}
pub fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>> {
let mut block_numbers = vec![];
for r in self.kvdb.iter(COL_BLOCK_PROGRESS) {
let (key, val) = r?;
let block_number =
u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?);
let val = <(H256, Option<u64>)>::from_ssz_bytes(val.as_ref()).map_err(Error::from)?;
block_numbers.push((
block_number,
BlockHashAndSubmissionIndex {
block_hash: val.0,
first_submission_index: val.1,
},
));
}
Ok(block_numbers)
}
pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
Ok(self
.kvdb
.delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?)
}
/// Build the merkle tree at `pora_chunk_index` with the data before (including) `tx_seq`.
/// This first rebuild the tree with the tx root nodes lists by repeatedly checking previous
/// until we reach the start of this chunk.

View File

@ -52,7 +52,7 @@ class RandomTest(TestFramework):
for node_index in range(len(self.nodes)):
if node_index != chosen_node:
self.log.debug(f"check {node_index}")
wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root) is not None, timeout=120)
wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root) is not None, timeout=300)
wait_until(lambda: self.nodes[node_index].zgs_get_file_info(data_root)["finalized"], timeout=300)
# TODO(zz): This is a temp solution to trigger auto sync after all nodes started.
if i >= tx_count - 2:

View File

@ -443,6 +443,20 @@ pub fn decode_list_of_variable_length_items<T: Decode>(
Ok(values)
}
impl<T: Decode> Decode for Option<T> {
fn is_ssz_fixed_len() -> bool {
false
}
fn from_ssz_bytes(bytes: &[u8]) -> Result<Self, DecodeError> {
let (selector, body) = split_union_bytes(bytes)?;
match selector.into() {
0u8 => Ok(None),
1u8 => <T as Decode>::from_ssz_bytes(body).map(Option::Some),
other => Err(DecodeError::UnionSelectorInvalid(other)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -694,4 +708,10 @@ mod tests {
Ok((65535, 0))
);
}
#[test]
fn option_u64() {
assert_eq!(<Option<u64>>::from_ssz_bytes(&[0]), Ok(None));
assert_eq!(<Option<u64>>::from_ssz_bytes(&[1, 2, 0, 0, 0, 0, 0, 0, 0]), Ok(Some(2)));
}
}

View File

@ -392,6 +392,34 @@ macro_rules! impl_encodable_for_u8_array {
impl_encodable_for_u8_array!(4);
impl_encodable_for_u8_array!(32);
impl<T: Encode> Encode for Option<T> {
fn is_ssz_fixed_len() -> bool {
false
}
fn ssz_append(&self, buf: &mut Vec<u8>) {
match self {
None => buf.push(0u8),
Some(t) => {
buf.push(1u8);
t.ssz_append(buf);
}
}
}
fn ssz_bytes_len(&self) -> usize {
match self {
Option::None => 1usize,
Option::Some(ref inner) => inner
.ssz_bytes_len()
.checked_add(1)
.expect("encoded length must be less than usize::max_value"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -501,4 +529,12 @@ mod tests {
assert_eq!((10u32, 11u8).as_ssz_bytes(), vec![10, 0, 0, 0, 11]);
assert_eq!((10u8, 11u8, 12u8).as_ssz_bytes(), vec![10, 11, 12]);
}
#[test]
fn ssz_encode_option_u64() {
let opt: Option<u64> = None;
assert_eq!(opt.as_ssz_bytes(), vec![0]);
let opt: Option<u64> = Some(2);
assert_eq!(opt.as_ssz_bytes(), vec![1, 2, 0, 0, 0, 0, 0, 0, 0]);
}
}