mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-26 08:15:17 +00:00
Compare commits
3 Commits
cfe05b6f00
...
40104de891
Author | SHA1 | Date | |
---|---|---|---|
|
40104de891 | ||
|
0da3c374db | ||
|
2a24bbde18 |
@ -96,7 +96,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F
|
|||||||
/// Defines the current P2P protocol version.
|
/// Defines the current P2P protocol version.
|
||||||
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
|
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
|
||||||
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
|
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
|
||||||
pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0];
|
pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 0];
|
||||||
|
pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 0];
|
||||||
|
|
||||||
/// Application level requests sent to the network.
|
/// Application level requests sent to the network.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
@ -246,12 +246,7 @@ impl RpcServerImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
|
||||||
let (finalized, pruned) = match self
|
let (finalized, pruned) = match self.ctx.log_store.get_store().get_tx_status(tx.seq)? {
|
||||||
.ctx
|
|
||||||
.log_store
|
|
||||||
.get_store()
|
|
||||||
.get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))?
|
|
||||||
{
|
|
||||||
Some(TxStatus::Finalized) => (true, false),
|
Some(TxStatus::Finalized) => (true, false),
|
||||||
Some(TxStatus::Pruned) => (false, true),
|
Some(TxStatus::Pruned) => (false, true),
|
||||||
None => (false, false),
|
None => (false, false),
|
||||||
|
@ -39,13 +39,18 @@ impl ZgsConfig {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
.map_err(|e| format!("Unable to get chain id: {:?}", e))?
|
||||||
.as_u64();
|
.as_u64();
|
||||||
|
let network_protocol_version = if self.sync.neighbors_only {
|
||||||
|
network::PROTOCOL_VERSION_V2
|
||||||
|
} else {
|
||||||
|
network::PROTOCOL_VERSION_V1
|
||||||
|
};
|
||||||
let local_network_id = NetworkIdentity {
|
let local_network_id = NetworkIdentity {
|
||||||
chain_id,
|
chain_id,
|
||||||
flow_address,
|
flow_address,
|
||||||
p2p_protocol_version: ProtocolVersion {
|
p2p_protocol_version: ProtocolVersion {
|
||||||
major: network::PROTOCOL_VERSION[0],
|
major: network_protocol_version[0],
|
||||||
minor: network::PROTOCOL_VERSION[1],
|
minor: network_protocol_version[1],
|
||||||
build: network::PROTOCOL_VERSION[2],
|
build: network_protocol_version[2],
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
network_config.network_id = local_network_id.clone();
|
network_config.network_id = local_network_id.clone();
|
||||||
|
@ -21,7 +21,6 @@ use rayon::prelude::ParallelSlice;
|
|||||||
use shared_types::{
|
use shared_types::{
|
||||||
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
|
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
|
||||||
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
|
||||||
TxSeqOrRoot,
|
|
||||||
};
|
};
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
|
|
||||||
@ -538,7 +537,15 @@ impl LogStoreRead for LogManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> {
|
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> {
|
||||||
self.tx_store.get_first_tx_seq_by_data_root(data_root)
|
let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
|
||||||
|
for tx_seq in &seq_list {
|
||||||
|
if self.tx_store.check_tx_completed(*tx_seq)? {
|
||||||
|
// Return the first finalized tx if possible.
|
||||||
|
return Ok(Some(*tx_seq));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// No tx is finalized, return the first one.
|
||||||
|
Ok(seq_list.first().cloned())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_chunk_with_proof_by_tx_and_index(
|
fn get_chunk_with_proof_by_tx_and_index(
|
||||||
@ -582,14 +589,7 @@ impl LogStoreRead for LogManager {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>> {
|
fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
|
||||||
let tx_seq = match tx_seq_or_data_root {
|
|
||||||
TxSeqOrRoot::TxSeq(v) => v,
|
|
||||||
TxSeqOrRoot::Root(root) => {
|
|
||||||
try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.tx_store.get_tx_status(tx_seq)
|
self.tx_store.get_tx_status(tx_seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ use ethereum_types::H256;
|
|||||||
use flow_store::PadPair;
|
use flow_store::PadPair;
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||||
Transaction, TxSeqOrRoot,
|
Transaction,
|
||||||
};
|
};
|
||||||
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
|
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
|
||||||
|
|
||||||
@ -31,8 +31,12 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
|||||||
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
|
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
|
||||||
|
|
||||||
/// Get a transaction by the data root of its data.
|
/// Get a transaction by the data root of its data.
|
||||||
|
/// If all txs are not finalized, return the first one.
|
||||||
|
/// Otherwise, return the first finalized tx.
|
||||||
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>;
|
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>;
|
||||||
|
|
||||||
|
/// If all txs are not finalized, return the first one.
|
||||||
|
/// Otherwise, return the first finalized tx.
|
||||||
fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
|
fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
|
||||||
match self.get_tx_seq_by_data_root(data_root)? {
|
match self.get_tx_seq_by_data_root(data_root)? {
|
||||||
Some(seq) => self.get_tx_by_seq_number(seq),
|
Some(seq) => self.get_tx_by_seq_number(seq),
|
||||||
@ -58,7 +62,7 @@ pub trait LogStoreRead: LogStoreChunkRead {
|
|||||||
|
|
||||||
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
|
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
|
||||||
|
|
||||||
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>>;
|
fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>>;
|
||||||
|
|
||||||
fn next_tx_seq(&self) -> u64;
|
fn next_tx_seq(&self) -> u64;
|
||||||
|
|
||||||
|
@ -181,14 +181,6 @@ impl TransactionStore {
|
|||||||
Ok(Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?)
|
Ok(Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_first_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>> {
|
|
||||||
let value = try_option!(self
|
|
||||||
.kvdb
|
|
||||||
.get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())?);
|
|
||||||
let seq_list = Vec::<u64>::from_ssz_bytes(&value).map_err(Error::from)?;
|
|
||||||
Ok(seq_list.first().cloned())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
|
pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
|
||||||
Ok(self.kvdb.put(
|
Ok(self.kvdb.put(
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
|
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use shared_types::TxSeqOrRoot;
|
|
||||||
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
|
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
|
||||||
use storage_async::Store;
|
use storage_async::Store;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
@ -86,11 +85,7 @@ impl Batcher {
|
|||||||
|
|
||||||
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
async fn poll_tx(&self, tx_seq: u64) -> Result<Option<SyncResult>> {
|
||||||
// file already finalized or even pruned
|
// file already finalized or even pruned
|
||||||
if let Some(tx_status) = self
|
if let Some(tx_status) = self.store.get_store().get_tx_status(tx_seq)? {
|
||||||
.store
|
|
||||||
.get_store()
|
|
||||||
.get_tx_status(TxSeqOrRoot::TxSeq(tx_seq))?
|
|
||||||
{
|
|
||||||
let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await;
|
let num_terminated: usize = self.terminate_file_sync(tx_seq, false).await;
|
||||||
if num_terminated > 0 {
|
if num_terminated > 0 {
|
||||||
info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db");
|
info!(%tx_seq, %num_terminated, ?tx_status, "Terminate file sync due to file already completed in db");
|
||||||
|
@ -87,12 +87,7 @@ impl HistoricalTxWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// write tx in sync store if not finalized or pruned
|
// write tx in sync store if not finalized or pruned
|
||||||
if self
|
if self.store.get_store().get_tx_status(next_tx_seq)?.is_none() {
|
||||||
.store
|
|
||||||
.get_store()
|
|
||||||
.get_tx_status(shared_types::TxSeqOrRoot::TxSeq(next_tx_seq))?
|
|
||||||
.is_none()
|
|
||||||
{
|
|
||||||
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
|
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
jsonrpcclient==4.0.3
|
jsonrpcclient==4.0.3
|
||||||
pyyaml==6.0.1
|
pyyaml==6.0.1
|
||||||
pysha3==1.0.2
|
safe-pysha3==1.0.4
|
||||||
coincurve==18.0.0
|
coincurve==20.0.0
|
||||||
eth-utils==3.0.0
|
eth-utils==5.1.0
|
||||||
py-ecc==7.0.0
|
py-ecc==7.0.0
|
||||||
web3==6.14.0
|
web3==7.5.0
|
||||||
eth_tester
|
eth_tester
|
||||||
cffi==1.16.0
|
cffi==1.16.0
|
||||||
rtoml==0.10.0
|
rtoml==0.11.0
|
41
tests/snapshot_test.py
Normal file
41
tests/snapshot_test.py
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
from test_framework.test_framework import TestFramework
|
||||||
|
from utility.utils import wait_until
|
||||||
|
|
||||||
|
class SnapshotTask(TestFramework):
|
||||||
|
def setup_params(self):
|
||||||
|
self.num_nodes = 2
|
||||||
|
|
||||||
|
# Enable random auto sync only
|
||||||
|
for i in range(self.num_nodes):
|
||||||
|
self.zgs_node_configs[i] = {
|
||||||
|
"sync": {
|
||||||
|
"auto_sync_enabled": True,
|
||||||
|
"max_sequential_workers": 3,
|
||||||
|
"max_random_workers": 3,
|
||||||
|
"neighbors_only": True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
# Submit and upload files on node 0
|
||||||
|
data_root_1 = self.__upload_file__(0, 256 * 1024)
|
||||||
|
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
|
||||||
|
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
|
||||||
|
|
||||||
|
# Start the last node to verify historical file sync
|
||||||
|
self.nodes[1].shutdown()
|
||||||
|
shutil.rmtree(os.path.join(self.nodes[1].data_dir, 'db/data_db'))
|
||||||
|
|
||||||
|
self.start_storage_node(1)
|
||||||
|
self.nodes[1].wait_for_rpc_connection()
|
||||||
|
|
||||||
|
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
|
||||||
|
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
SnapshotTask().main()
|
@ -4,7 +4,7 @@ import tempfile
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from web3 import Web3, HTTPProvider
|
from web3 import Web3, HTTPProvider
|
||||||
from web3.middleware import construct_sign_and_send_raw_middleware
|
from web3.middleware import SignAndSendRawMiddlewareBuilder
|
||||||
from enum import Enum, unique
|
from enum import Enum, unique
|
||||||
from config.node_config import (
|
from config.node_config import (
|
||||||
GENESIS_PRIV_KEY,
|
GENESIS_PRIV_KEY,
|
||||||
@ -262,7 +262,7 @@ class BlockchainNode(TestNode):
|
|||||||
account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
|
account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
|
||||||
account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
|
account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
|
||||||
w3.middleware_onion.add(
|
w3.middleware_onion.add(
|
||||||
construct_sign_and_send_raw_middleware([account1, account2])
|
SignAndSendRawMiddlewareBuilder.build([account1, account2])
|
||||||
)
|
)
|
||||||
# account = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
|
# account = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
|
||||||
# w3.middleware_onion.add(construct_sign_and_send_raw_middleware(account))
|
# w3.middleware_onion.add(construct_sign_and_send_raw_middleware(account))
|
||||||
@ -365,7 +365,7 @@ class BlockchainNode(TestNode):
|
|||||||
account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
|
account1 = w3.eth.account.from_key(GENESIS_PRIV_KEY)
|
||||||
account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
|
account2 = w3.eth.account.from_key(GENESIS_PRIV_KEY1)
|
||||||
w3.middleware_onion.add(
|
w3.middleware_onion.add(
|
||||||
construct_sign_and_send_raw_middleware([account1, account2])
|
SignAndSendRawMiddlewareBuilder.build([account1, account2])
|
||||||
)
|
)
|
||||||
|
|
||||||
contract_interface = load_contract_metadata(self.contract_path, "Flow")
|
contract_interface = load_contract_metadata(self.contract_path, "Flow")
|
||||||
|
@ -44,7 +44,7 @@ class TestFramework:
|
|||||||
self.num_blockchain_nodes = 1
|
self.num_blockchain_nodes = 1
|
||||||
self.num_nodes = 1
|
self.num_nodes = 1
|
||||||
self.blockchain_nodes = []
|
self.blockchain_nodes = []
|
||||||
self.nodes = []
|
self.nodes: list[ZgsNode] = []
|
||||||
self.contract = None
|
self.contract = None
|
||||||
self.blockchain_node_configs = {}
|
self.blockchain_node_configs = {}
|
||||||
self.zgs_node_configs = {}
|
self.zgs_node_configs = {}
|
||||||
|
Loading…
Reference in New Issue
Block a user