mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
Compare commits
7 Commits
28f190a178
...
f3656cca18
Author | SHA1 | Date | |
---|---|---|---|
|
f3656cca18 | ||
|
5849e9c2ba | ||
|
b7badcda5e | ||
|
93885bb25c | ||
|
e58f6ca101 | ||
|
f29af9c872 | ||
|
814bc35b3b |
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7280,6 +7280,7 @@ dependencies = [
|
|||||||
"shared_types",
|
"shared_types",
|
||||||
"static_assertions",
|
"static_assertions",
|
||||||
"tiny-keccak",
|
"tiny-keccak",
|
||||||
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"typenum",
|
"typenum",
|
||||||
"zgs_seal",
|
"zgs_seal",
|
||||||
|
@ -6,12 +6,13 @@ use ethereum_types::H256;
|
|||||||
use ethers::{prelude::Middleware, types::BlockNumber};
|
use ethers::{prelude::Middleware, types::BlockNumber};
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use jsonrpsee::tracing::{debug, error, warn};
|
use jsonrpsee::tracing::{debug, error, warn};
|
||||||
use shared_types::{ChunkArray, Transaction};
|
use shared_types::{bytes_to_chunks, ChunkArray, Transaction};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
use storage::log_store::log_manager::sector_to_segment;
|
||||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||||
use task_executor::{ShutdownReason, TaskExecutor};
|
use task_executor::{ShutdownReason, TaskExecutor};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
@ -477,6 +478,17 @@ impl LogSyncManager {
|
|||||||
error!("put_tx data error: e={:?}", e);
|
error!("put_tx data error: e={:?}", e);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
let store = self.store.clone();
|
||||||
|
let shard_config = store.flow().get_shard_config();
|
||||||
|
if sector_to_segment(bytes_to_chunks(tx.size as usize) as u64)
|
||||||
|
< shard_config.shard_id
|
||||||
|
{
|
||||||
|
if let Err(e) = store.finalize_tx_with_hash(tx.seq, tx.hash()) {
|
||||||
|
error!("finalize file that does not need to store: e={:?}", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||||
self.next_tx_seq += 1;
|
self.next_tx_seq += 1;
|
||||||
|
@ -198,11 +198,11 @@ impl Pruner {
|
|||||||
))),
|
))),
|
||||||
Ordering::Equal => Ok(None),
|
Ordering::Equal => Ok(None),
|
||||||
Ordering::Greater => {
|
Ordering::Greater => {
|
||||||
bail!(
|
error!(
|
||||||
"Unexpected first_rewardable_chunk revert: old={} new={}",
|
"Unexpected first_rewardable_chunk revert: old={} new={}",
|
||||||
self.first_rewardable_chunk,
|
self.first_rewardable_chunk, new_first_rewardable
|
||||||
new_first_rewardable
|
|
||||||
);
|
);
|
||||||
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -423,7 +423,7 @@ impl Libp2pEventHandler {
|
|||||||
let addr = self.get_listen_addr_or_add().await?;
|
let addr = self.get_listen_addr_or_add().await?;
|
||||||
|
|
||||||
let timestamp = timestamp_now();
|
let timestamp = timestamp_now();
|
||||||
let shard_config = self.store.get_store().flow().get_shard_config();
|
let shard_config = self.store.get_store().get_shard_config();
|
||||||
|
|
||||||
let msg = AnnounceFile {
|
let msg = AnnounceFile {
|
||||||
tx_ids,
|
tx_ids,
|
||||||
@ -699,7 +699,7 @@ impl Libp2pEventHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// notify sync layer if shard config matches
|
// notify sync layer if shard config matches
|
||||||
let my_shard_config = self.store.get_store().flow().get_shard_config();
|
let my_shard_config = self.store.get_store().get_shard_config();
|
||||||
if my_shard_config.intersect(&announced_shard_config) {
|
if my_shard_config.intersect(&announced_shard_config) {
|
||||||
for tx_id in msg.tx_ids.iter() {
|
for tx_id in msg.tx_ids.iter() {
|
||||||
self.send_to_sync(SyncMessage::AnnounceFileGossip {
|
self.send_to_sync(SyncMessage::AnnounceFileGossip {
|
||||||
|
@ -183,7 +183,7 @@ impl RpcServer for RpcServerImpl {
|
|||||||
|
|
||||||
async fn get_shard_config(&self) -> RpcResult<ShardConfig> {
|
async fn get_shard_config(&self) -> RpcResult<ShardConfig> {
|
||||||
debug!("zgs_getShardConfig");
|
debug!("zgs_getShardConfig");
|
||||||
let shard_config = self.ctx.log_store.get_store().flow().get_shard_config();
|
let shard_config = self.ctx.log_store.get_store().get_shard_config();
|
||||||
Ok(shard_config)
|
Ok(shard_config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,23 +95,22 @@ impl Store {
|
|||||||
&self,
|
&self,
|
||||||
seal_index_max: usize,
|
seal_index_max: usize,
|
||||||
) -> anyhow::Result<Option<Vec<SealTask>>> {
|
) -> anyhow::Result<Option<Vec<SealTask>>> {
|
||||||
self.spawn(move |store| store.flow().pull_seal_chunk(seal_index_max))
|
self.spawn(move |store| store.pull_seal_chunk(seal_index_max))
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> anyhow::Result<()> {
|
pub async fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> anyhow::Result<()> {
|
||||||
self.spawn(move |store| store.flow().submit_seal_result(answers))
|
self.spawn(move |store| store.submit_seal_result(answers))
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
|
pub async fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
|
||||||
self.spawn(move |store| store.flow().load_sealed_data(chunk_index))
|
self.spawn(move |store| store.load_sealed_data(chunk_index))
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_num_entries(&self) -> Result<u64> {
|
pub async fn get_num_entries(&self) -> Result<u64> {
|
||||||
self.spawn(move |store| store.flow().get_num_entries())
|
self.spawn(move |store| store.get_num_entries()).await
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {
|
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {
|
||||||
@ -122,7 +121,7 @@ impl Store {
|
|||||||
|
|
||||||
pub async fn update_shard_config(&self, shard_config: ShardConfig) {
|
pub async fn update_shard_config(&self, shard_config: ShardConfig) {
|
||||||
self.spawn(move |store| {
|
self.spawn(move |store| {
|
||||||
store.flow().update_shard_config(shard_config);
|
store.update_shard_config(shard_config);
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
@ -29,6 +29,7 @@ itertools = "0.13.0"
|
|||||||
serde = { version = "1.0.197", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
parking_lot = "0.12.3"
|
parking_lot = "0.12.3"
|
||||||
serde_json = "1.0.127"
|
serde_json = "1.0.127"
|
||||||
|
tokio = { version = "1.10.0", features = ["sync"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
|
@ -261,30 +261,30 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_shard_intersect() {
|
fn test_shard_intersect() {
|
||||||
// 1 shard
|
// 1 shard
|
||||||
assert_eq!(new_config(0, 1).intersect(&new_config(0, 1)), true);
|
assert!(new_config(0, 1).intersect(&new_config(0, 1)));
|
||||||
|
|
||||||
// either is 1 shard
|
// either is 1 shard
|
||||||
assert_eq!(new_config(0, 1).intersect(&new_config(0, 2)), true);
|
assert!(new_config(0, 1).intersect(&new_config(0, 2)));
|
||||||
assert_eq!(new_config(0, 1).intersect(&new_config(1, 2)), true);
|
assert!(new_config(0, 1).intersect(&new_config(1, 2)));
|
||||||
assert_eq!(new_config(0, 2).intersect(&new_config(0, 1)), true);
|
assert!(new_config(0, 2).intersect(&new_config(0, 1)));
|
||||||
assert_eq!(new_config(1, 2).intersect(&new_config(0, 1)), true);
|
assert!(new_config(1, 2).intersect(&new_config(0, 1)));
|
||||||
|
|
||||||
// same shards
|
// same shards
|
||||||
assert_eq!(new_config(1, 4).intersect(&new_config(0, 4)), false);
|
assert!(!new_config(1, 4).intersect(&new_config(0, 4)));
|
||||||
assert_eq!(new_config(1, 4).intersect(&new_config(1, 4)), true);
|
assert!(new_config(1, 4).intersect(&new_config(1, 4)));
|
||||||
assert_eq!(new_config(1, 4).intersect(&new_config(2, 4)), false);
|
assert!(!new_config(1, 4).intersect(&new_config(2, 4)));
|
||||||
assert_eq!(new_config(1, 4).intersect(&new_config(3, 4)), false);
|
assert!(!new_config(1, 4).intersect(&new_config(3, 4)));
|
||||||
|
|
||||||
// left shards is less
|
// left shards is less
|
||||||
assert_eq!(new_config(1, 2).intersect(&new_config(0, 4)), false);
|
assert!(!new_config(1, 2).intersect(&new_config(0, 4)));
|
||||||
assert_eq!(new_config(1, 2).intersect(&new_config(1, 4)), false);
|
assert!(!new_config(1, 2).intersect(&new_config(1, 4)));
|
||||||
assert_eq!(new_config(1, 2).intersect(&new_config(2, 4)), true);
|
assert!(new_config(1, 2).intersect(&new_config(2, 4)));
|
||||||
assert_eq!(new_config(1, 2).intersect(&new_config(3, 4)), true);
|
assert!(new_config(1, 2).intersect(&new_config(3, 4)));
|
||||||
|
|
||||||
// right shards is less
|
// right shards is less
|
||||||
assert_eq!(new_config(1, 4).intersect(&new_config(0, 2)), true);
|
assert!(new_config(1, 4).intersect(&new_config(0, 2)));
|
||||||
assert_eq!(new_config(1, 4).intersect(&new_config(1, 2)), false);
|
assert!(!new_config(1, 4).intersect(&new_config(1, 2)));
|
||||||
assert_eq!(new_config(2, 4).intersect(&new_config(0, 2)), false);
|
assert!(!new_config(2, 4).intersect(&new_config(0, 2)));
|
||||||
assert_eq!(new_config(2, 4).intersect(&new_config(1, 2)), true);
|
assert!(new_config(2, 4).intersect(&new_config(1, 2)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,8 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
|
|||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::log_store::log_manager::{
|
use crate::log_store::log_manager::{
|
||||||
bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
|
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
|
||||||
|
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||||
};
|
};
|
||||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Result};
|
|||||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
|
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
@ -441,6 +442,10 @@ impl FlowDBStore {
|
|||||||
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
|
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
|
||||||
let mut leaf_list = Vec::new();
|
let mut leaf_list = Vec::new();
|
||||||
let mut expected_index = 0;
|
let mut expected_index = 0;
|
||||||
|
|
||||||
|
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
|
||||||
|
let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
|
||||||
|
|
||||||
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
|
||||||
let (index_bytes, root_bytes) = r?;
|
let (index_bytes, root_bytes) = r?;
|
||||||
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
|
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
|
||||||
@ -475,25 +480,26 @@ impl FlowDBStore {
|
|||||||
expected_index += 1;
|
expected_index += 1;
|
||||||
}
|
}
|
||||||
Ordering::Greater => {
|
Ordering::Greater => {
|
||||||
bail!(
|
while batch_index > expected_index {
|
||||||
"unexpected chunk leaf in range, expected={}, get={}, range={:?}",
|
// Fill the gap with empty leaves.
|
||||||
expected_index,
|
root_list.push((1, empty_root));
|
||||||
batch_index,
|
expected_index += 1;
|
||||||
range_root,
|
}
|
||||||
);
|
range_root = None;
|
||||||
|
root_list.push((1, root));
|
||||||
|
expected_index += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if expected_index == batch_index {
|
} else {
|
||||||
|
while batch_index > expected_index {
|
||||||
|
// Fill the gap with empty leaves.
|
||||||
|
root_list.push((1, empty_root));
|
||||||
|
expected_index += 1;
|
||||||
|
}
|
||||||
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
|
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
|
||||||
root_list.push((subtree_depth, root));
|
root_list.push((subtree_depth, root));
|
||||||
expected_index += 1 << (subtree_depth - 1);
|
expected_index += 1 << (subtree_depth - 1);
|
||||||
} else {
|
|
||||||
bail!(
|
|
||||||
"unexpected range root: expected={} get={}",
|
|
||||||
expected_index,
|
|
||||||
batch_index
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let extra_node_list = self.get_mpt_node_list()?;
|
let extra_node_list = self.get_mpt_node_list()?;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use crate::config::ShardConfig;
|
||||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
|
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
|
||||||
use crate::log_store::tx_store::TransactionStore;
|
use crate::log_store::tx_store::TransactionStore;
|
||||||
use crate::log_store::{
|
use crate::log_store::{
|
||||||
@ -20,11 +21,13 @@ use shared_types::{
|
|||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::sync::mpsc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::thread;
|
||||||
use tracing::{debug, error, info, instrument, trace, warn};
|
use tracing::{debug, error, info, instrument, trace, warn};
|
||||||
|
|
||||||
use super::tx_store::BlockHashAndSubmissionIndex;
|
use super::tx_store::BlockHashAndSubmissionIndex;
|
||||||
use super::LogStoreInner;
|
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
|
||||||
|
|
||||||
/// 256 Bytes
|
/// 256 Bytes
|
||||||
pub const ENTRY_SIZE: usize = 256;
|
pub const ENTRY_SIZE: usize = 256;
|
||||||
@ -45,11 +48,18 @@ pub const COL_NUM: u32 = 9;
|
|||||||
// Process at most 1M entries (256MB) pad data at a time.
|
// Process at most 1M entries (256MB) pad data at a time.
|
||||||
const PAD_MAX_SIZE: usize = 1 << 20;
|
const PAD_MAX_SIZE: usize = 1 << 20;
|
||||||
|
|
||||||
|
pub struct UpdateFlowMessage {
|
||||||
|
pub root_map: BTreeMap<usize, (H256, usize)>,
|
||||||
|
pub pad_data: usize,
|
||||||
|
pub tx_start_flow_index: u64,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct LogManager {
|
pub struct LogManager {
|
||||||
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
|
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
|
||||||
tx_store: TransactionStore,
|
tx_store: TransactionStore,
|
||||||
flow_store: FlowStore,
|
flow_store: Arc<FlowStore>,
|
||||||
merkle: RwLock<MerkleManager>,
|
merkle: RwLock<MerkleManager>,
|
||||||
|
sender: mpsc::Sender<UpdateFlowMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MerkleManager {
|
struct MerkleManager {
|
||||||
@ -139,16 +149,6 @@ pub struct LogConfig {
|
|||||||
pub flow: FlowConfig,
|
pub flow: FlowConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LogStoreInner for LogManager {
|
|
||||||
fn flow(&self) -> &dyn super::Flow {
|
|
||||||
&self.flow_store
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flow_mut(&mut self) -> &mut dyn super::Flow {
|
|
||||||
&mut self.flow_store
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LogStoreChunkWrite for LogManager {
|
impl LogStoreChunkWrite for LogManager {
|
||||||
fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
|
fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
@ -389,6 +389,14 @@ impl LogStoreWrite for LogManager {
|
|||||||
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
|
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
|
||||||
self.tx_store.delete_block_hash_by_number(block_number)
|
self.tx_store.delete_block_hash_by_number(block_number)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_shard_config(&self, shard_config: ShardConfig) {
|
||||||
|
self.flow_store.update_shard_config(shard_config)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
|
||||||
|
self.flow_store.submit_seal_result(answers)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LogStoreChunkRead for LogManager {
|
impl LogStoreChunkRead for LogManager {
|
||||||
@ -579,6 +587,22 @@ impl LogStoreRead for LogManager {
|
|||||||
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
|
fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
|
||||||
self.tx_store.check_tx_pruned(tx_seq)
|
self.tx_store.check_tx_pruned(tx_seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
|
||||||
|
self.flow_store.pull_seal_chunk(seal_index_max)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_num_entries(&self) -> Result<u64> {
|
||||||
|
self.flow_store.get_num_entries()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
|
||||||
|
self.flow_store.load_sealed_data(chunk_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_shard_config(&self) -> ShardConfig {
|
||||||
|
self.flow_store.get_shard_config()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LogManager {
|
impl LogManager {
|
||||||
@ -596,7 +620,7 @@ impl LogManager {
|
|||||||
|
|
||||||
fn new(db: Arc<dyn ZgsKeyValueDB>, config: LogConfig) -> Result<Self> {
|
fn new(db: Arc<dyn ZgsKeyValueDB>, config: LogConfig) -> Result<Self> {
|
||||||
let tx_store = TransactionStore::new(db.clone())?;
|
let tx_store = TransactionStore::new(db.clone())?;
|
||||||
let flow_store = FlowStore::new(db.clone(), config.flow);
|
let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
|
||||||
let mut initial_data = flow_store.get_chunk_root_list()?;
|
let mut initial_data = flow_store.get_chunk_root_list()?;
|
||||||
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
|
||||||
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
|
||||||
@ -700,13 +724,19 @@ impl LogManager {
|
|||||||
pora_chunks_merkle,
|
pora_chunks_merkle,
|
||||||
last_chunk_merkle,
|
last_chunk_merkle,
|
||||||
});
|
});
|
||||||
let log_manager = Self {
|
|
||||||
|
let (sender, receiver) = mpsc::channel();
|
||||||
|
|
||||||
|
let mut log_manager = Self {
|
||||||
db,
|
db,
|
||||||
tx_store,
|
tx_store,
|
||||||
flow_store,
|
flow_store,
|
||||||
merkle,
|
merkle,
|
||||||
|
sender,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
log_manager.start_receiver(receiver);
|
||||||
|
|
||||||
if let Some(tx) = last_tx_to_insert {
|
if let Some(tx) = last_tx_to_insert {
|
||||||
log_manager.put_tx(tx)?;
|
log_manager.put_tx(tx)?;
|
||||||
let mut merkle = log_manager.merkle.write();
|
let mut merkle = log_manager.merkle.write();
|
||||||
@ -727,6 +757,32 @@ impl LogManager {
|
|||||||
Ok(log_manager)
|
Ok(log_manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn start_receiver(&mut self, rx: mpsc::Receiver<UpdateFlowMessage>) {
|
||||||
|
let flow_store = self.flow_store.clone();
|
||||||
|
thread::spawn(move || -> Result<(), anyhow::Error> {
|
||||||
|
loop {
|
||||||
|
match rx.recv() {
|
||||||
|
std::result::Result::Ok(data) => {
|
||||||
|
// Update the root index.
|
||||||
|
flow_store.put_batch_root_list(data.root_map).unwrap();
|
||||||
|
// Update the flow database.
|
||||||
|
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||||
|
// subtrees with data known.
|
||||||
|
flow_store
|
||||||
|
.append_entries(ChunkArray {
|
||||||
|
data: vec![0; data.pad_data],
|
||||||
|
start_index: data.tx_start_flow_index,
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
std::result::Result::Err(_) => {
|
||||||
|
bail!("Receiver error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
|
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
|
||||||
match maybe_root {
|
match maybe_root {
|
||||||
None => self.gen_proof_at_version(flow_index, None),
|
None => self.gen_proof_at_version(flow_index, None),
|
||||||
@ -863,6 +919,7 @@ impl LogManager {
|
|||||||
);
|
);
|
||||||
if extra != 0 {
|
if extra != 0 {
|
||||||
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
|
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
|
||||||
|
let mut is_full_empty = true;
|
||||||
let mut root_map = BTreeMap::new();
|
let mut root_map = BTreeMap::new();
|
||||||
|
|
||||||
// Update the in-memory merkle tree.
|
// Update the in-memory merkle tree.
|
||||||
@ -874,6 +931,7 @@ impl LogManager {
|
|||||||
|
|
||||||
let mut completed_chunk_index = None;
|
let mut completed_chunk_index = None;
|
||||||
if pad_data.len() < last_chunk_pad {
|
if pad_data.len() < last_chunk_pad {
|
||||||
|
is_full_empty = false;
|
||||||
merkle
|
merkle
|
||||||
.last_chunk_merkle
|
.last_chunk_merkle
|
||||||
.append_list(data_to_merkle_leaves(&pad_data)?);
|
.append_list(data_to_merkle_leaves(&pad_data)?);
|
||||||
@ -882,6 +940,7 @@ impl LogManager {
|
|||||||
.update_last(*merkle.last_chunk_merkle.root());
|
.update_last(*merkle.last_chunk_merkle.root());
|
||||||
} else {
|
} else {
|
||||||
if last_chunk_pad != 0 {
|
if last_chunk_pad != 0 {
|
||||||
|
is_full_empty = false;
|
||||||
// Pad the last chunk.
|
// Pad the last chunk.
|
||||||
merkle
|
merkle
|
||||||
.last_chunk_merkle
|
.last_chunk_merkle
|
||||||
@ -910,16 +969,26 @@ impl LogManager {
|
|||||||
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
|
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the root index.
|
|
||||||
self.flow_store.put_batch_root_list(root_map)?;
|
|
||||||
// Update the flow database.
|
|
||||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
|
||||||
// subtrees with data known.
|
|
||||||
let data_size = pad_data.len() / ENTRY_SIZE;
|
let data_size = pad_data.len() / ENTRY_SIZE;
|
||||||
self.flow_store.append_entries(ChunkArray {
|
if is_full_empty {
|
||||||
data: pad_data,
|
self.sender.send(UpdateFlowMessage {
|
||||||
start_index: tx_start_flow_index,
|
root_map,
|
||||||
})?;
|
pad_data: pad_data.len(),
|
||||||
|
tx_start_flow_index,
|
||||||
|
})?;
|
||||||
|
} else {
|
||||||
|
self.flow_store.put_batch_root_list(root_map).unwrap();
|
||||||
|
// Update the flow database.
|
||||||
|
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||||
|
// subtrees with data known.
|
||||||
|
self.flow_store
|
||||||
|
.append_entries(ChunkArray {
|
||||||
|
data: pad_data.to_vec(),
|
||||||
|
start_index: tx_start_flow_index,
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
tx_start_flow_index += data_size as u64;
|
tx_start_flow_index += data_size as u64;
|
||||||
if let Some(index) = completed_chunk_index {
|
if let Some(index) = completed_chunk_index {
|
||||||
self.complete_last_chunk_merkle(index, &mut *merkle)?;
|
self.complete_last_chunk_merkle(index, &mut *merkle)?;
|
||||||
|
@ -76,6 +76,14 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
|||||||
|
|
||||||
/// Return flow root and length.
|
/// Return flow root and length.
|
||||||
fn get_context(&self) -> Result<(DataRoot, u64)>;
|
fn get_context(&self) -> Result<(DataRoot, u64)>;
|
||||||
|
|
||||||
|
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>>;
|
||||||
|
|
||||||
|
fn get_num_entries(&self) -> Result<u64>;
|
||||||
|
|
||||||
|
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
|
||||||
|
|
||||||
|
fn get_shard_config(&self) -> ShardConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait LogStoreChunkRead {
|
pub trait LogStoreChunkRead {
|
||||||
@ -145,6 +153,10 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
|
|||||||
) -> Result<bool>;
|
) -> Result<bool>;
|
||||||
|
|
||||||
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>;
|
fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>;
|
||||||
|
|
||||||
|
fn update_shard_config(&self, shard_config: ShardConfig);
|
||||||
|
|
||||||
|
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait LogStoreChunkWrite {
|
pub trait LogStoreChunkWrite {
|
||||||
@ -168,19 +180,10 @@ pub trait LogChunkStore: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync +
|
|||||||
impl<T: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + 'static> LogChunkStore for T {}
|
impl<T: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + 'static> LogChunkStore for T {}
|
||||||
|
|
||||||
pub trait Store:
|
pub trait Store:
|
||||||
LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static
|
LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
impl<
|
impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
|
||||||
T: LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static,
|
|
||||||
> Store for T
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait LogStoreInner {
|
|
||||||
fn flow(&self) -> &dyn Flow;
|
|
||||||
fn flow_mut(&mut self) -> &mut dyn Flow;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MineLoadChunk {
|
pub struct MineLoadChunk {
|
||||||
// Use `Vec` instead of array to avoid thread stack overflow.
|
// Use `Vec` instead of array to avoid thread stack overflow.
|
||||||
|
@ -492,7 +492,7 @@ impl SerialSyncController {
|
|||||||
|
|
||||||
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
|
metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
|
||||||
|
|
||||||
let shard_config = self.store.get_store().flow().get_shard_config();
|
let shard_config = self.store.get_store().get_shard_config();
|
||||||
let next_chunk = segment_to_sector(shard_config.next_segment_index(
|
let next_chunk = segment_to_sector(shard_config.next_segment_index(
|
||||||
sector_to_segment(from_chunk),
|
sector_to_segment(from_chunk),
|
||||||
sector_to_segment(self.tx_start_chunk_in_flow),
|
sector_to_segment(self.tx_start_chunk_in_flow),
|
||||||
|
@ -807,7 +807,7 @@ impl SyncService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
|
async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
|
||||||
let shard_config = store.get_store().flow().get_shard_config();
|
let shard_config = store.get_store().get_shard_config();
|
||||||
let start_segment = sector_to_segment(tx.start_entry_index());
|
let start_segment = sector_to_segment(tx.start_entry_index());
|
||||||
let end =
|
let end =
|
||||||
bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);
|
bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);
|
||||||
|
95
tests/config/0gchain-init-genesis-mac.sh
Normal file
95
tests/config/0gchain-init-genesis-mac.sh
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
BINARY=$(cd $(dirname ${BASH_SOURCE[0]})/../tmp; pwd)/0gchaind
|
||||||
|
ROOT_DIR=${1:-.}
|
||||||
|
NUM_NODES=${2:-3}
|
||||||
|
P2P_PORT_START=${3:-26656}
|
||||||
|
CHAIN_ID=zgchainpy_9000-777
|
||||||
|
|
||||||
|
# install jq if not unavailable
|
||||||
|
jq --version >/dev/null 2>&1 || sudo snap install jq -y
|
||||||
|
|
||||||
|
mkdir -p $ROOT_DIR
|
||||||
|
|
||||||
|
# Init configs
|
||||||
|
for ((i=0; i<$NUM_NODES; i++)) do
|
||||||
|
$BINARY init node$i --home $ROOT_DIR/node$i --chain-id $CHAIN_ID
|
||||||
|
|
||||||
|
# Change genesis.json
|
||||||
|
GENESIS=$ROOT_DIR/node$i/config/genesis.json
|
||||||
|
TMP_GENESIS=$ROOT_DIR/node$i/config/tmp_genesis.json
|
||||||
|
|
||||||
|
# Replace stake with neuron
|
||||||
|
sed -in-place='' 's/stake/ua0gi/g' "$GENESIS"
|
||||||
|
|
||||||
|
# Replace the default evm denom of aphoton with neuron
|
||||||
|
sed -in-place='' 's/aphoton/neuron/g' "$GENESIS"
|
||||||
|
|
||||||
|
cat $GENESIS | jq '.consensus_params.block.max_gas = "25000000"' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
|
||||||
|
# Zero out the total supply so it gets recalculated during InitGenesis
|
||||||
|
cat $GENESIS | jq '.app_state.bank.supply = []' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
|
||||||
|
# Disable fee market
|
||||||
|
cat $GENESIS | jq '.app_state.feemarket.params.no_base_fee = true' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
|
||||||
|
# Disable london fork
|
||||||
|
cat $GENESIS | jq '.app_state.evm.params.chain_config.london_block = null' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
cat $GENESIS | jq '.app_state.evm.params.chain_config.arrow_glacier_block = null' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
cat $GENESIS | jq '.app_state.evm.params.chain_config.gray_glacier_block = null' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
cat $GENESIS | jq '.app_state.evm.params.chain_config.merge_netsplit_block = null' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
cat $GENESIS | jq '.app_state.evm.params.chain_config.shanghai_block = null' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
cat $GENESIS | jq '.app_state.evm.params.chain_config.cancun_block = null' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
|
||||||
|
# cat $GENESIS | jq '.app_state["staking"]["params"]["bond_denom"]="a0gi"' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
# cat $GENESIS | jq '.app_state["gov"]["params"]["min_deposit"][0]["denom"]="a0gi"' >$TMP_GENESIS && mv $TMP_GENESIS $GENESIS
|
||||||
|
|
||||||
|
cat "$GENESIS" | jq '.app_state["staking"]["params"]["max_validators"]=125' >"$TMP_GENESIS" && mv "$TMP_GENESIS" "$GENESIS"
|
||||||
|
cat "$GENESIS" | jq '.app_state["slashing"]["params"]["signed_blocks_window"]="1000"' >"$TMP_GENESIS" && mv "$TMP_GENESIS" "$GENESIS"
|
||||||
|
|
||||||
|
cat "$GENESIS" | jq '.app_state["consensus_params"]["block"]["time_iota_ms"]="3000"' >"$TMP_GENESIS" && mv "$TMP_GENESIS" "$GENESIS"
|
||||||
|
|
||||||
|
# Change app.toml
|
||||||
|
APP_TOML=$ROOT_DIR/node$i/config/app.toml
|
||||||
|
sed -i '' 's/minimum-gas-prices = "0ua0gi"/minimum-gas-prices = "1000000000neuron"/' $APP_TOML
|
||||||
|
sed -i '' '/\[grpc\]/,/^\[/ s/enable = true/enable = false/' $APP_TOML
|
||||||
|
sed -i '' '/\[grpc-web\]/,/^\[/ s/enable = true/enable = false/' $APP_TOML
|
||||||
|
sed -i '' '/\[json-rpc\]/,/^\[/ s/enable = false/enable = true/' $APP_TOML
|
||||||
|
|
||||||
|
# Change config.toml
|
||||||
|
CONFIG_TOML=$ROOT_DIR/node$i/config/config.toml
|
||||||
|
sed -i '' '/seeds = /c\
|
||||||
|
seeds = ""' $CONFIG_TOML
|
||||||
|
sed -i '' 's/addr_book_strict = true/addr_book_strict = false/' $CONFIG_TOML
|
||||||
|
done
|
||||||
|
|
||||||
|
# Update persistent_peers in config.toml
|
||||||
|
for ((i=1; i<$NUM_NODES; i++)) do
|
||||||
|
PERSISTENT_NODES=""
|
||||||
|
for ((j=0; j<$i; j++)) do
|
||||||
|
if [[ $j -gt 0 ]]; then PERSISTENT_NODES=$PERSISTENT_NODES,; fi
|
||||||
|
NODE_ID=`$BINARY tendermint show-node-id --home $ROOT_DIR/node$j`
|
||||||
|
P2P_PORT=$(($P2P_PORT_START+$j))
|
||||||
|
PERSISTENT_NODES=$PERSISTENT_NODES$NODE_ID@127.0.0.1:$P2P_PORT
|
||||||
|
done
|
||||||
|
sed -i '' "/persistent_peers = /c\
|
||||||
|
persistent_peers = \"$PERSISTENT_NODES\"" $ROOT_DIR/node$i/config/config.toml
|
||||||
|
done
|
||||||
|
|
||||||
|
# Create genesis with a single validator
|
||||||
|
$BINARY keys add val0 --keyring-backend test --home $ROOT_DIR/node0
|
||||||
|
$BINARY add-genesis-account val0 15000000000000000000ua0gi --keyring-backend test --home $ROOT_DIR/node0
|
||||||
|
|
||||||
|
# add genesis account for tests, see GENESIS_PRIV_KEY and GENESIS_PRIV_KEY1 in node_config.py
|
||||||
|
$BINARY add-genesis-account 0g1l0j9dqdvd3fatfqywhm4y6avrln4jracmt6ztf 40000000000000000000ua0gi --home $ROOT_DIR/node0
|
||||||
|
$BINARY add-genesis-account 0g1pemg6y3etj9tlhkl0vdwkrw36f74u2nl8sjw7g 40000000000000000000ua0gi --home $ROOT_DIR/node0
|
||||||
|
|
||||||
|
mkdir -p $ROOT_DIR/gentxs
|
||||||
|
$BINARY gentx val0 10000000000000000000ua0gi --keyring-backend test --home $ROOT_DIR/node0 --output-document $ROOT_DIR/gentxs/node0.json
|
||||||
|
$BINARY collect-gentxs --home $ROOT_DIR/node0 --gentx-dir $ROOT_DIR/gentxs
|
||||||
|
$BINARY validate-genesis --home $ROOT_DIR/node0
|
||||||
|
for ((i=1; i<$NUM_NODES; i++)) do
|
||||||
|
cp $ROOT_DIR/node0/config/genesis.json $ROOT_DIR/node$i/config/genesis.json
|
||||||
|
done
|
@ -1,6 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import platform
|
||||||
|
|
||||||
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
|
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
|
||||||
from utility.utils import blockchain_rpc_port, arrange_port
|
from utility.utils import blockchain_rpc_port, arrange_port
|
||||||
@ -21,6 +22,13 @@ def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
|
|||||||
os.path.dirname(os.path.realpath(__file__)), # test_framework folder
|
os.path.dirname(os.path.realpath(__file__)), # test_framework folder
|
||||||
"..", "config", "0gchain-init-genesis.sh"
|
"..", "config", "0gchain-init-genesis.sh"
|
||||||
)
|
)
|
||||||
|
sys = platform.system().lower()
|
||||||
|
if sys == "darwin":
|
||||||
|
shell_script = os.path.join(
|
||||||
|
os.path.dirname(os.path.realpath(__file__)), # test_framework folder
|
||||||
|
"..", "config", "0gchain-init-genesis-mac.sh"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
zgchaind_dir = os.path.join(root_dir, "0gchaind")
|
zgchaind_dir = os.path.join(root_dir, "0gchaind")
|
||||||
os.mkdir(zgchaind_dir)
|
os.mkdir(zgchaind_dir)
|
||||||
|
Loading…
Reference in New Issue
Block a user