Compare commits

...

3 Commits

Author SHA1 Message Date
0xroy
c590949b1b
Merge fd9c033176 into 9b68a8b7d7 2024-10-28 14:56:32 +08:00
Bo QIU
9b68a8b7d7
Implement file sync protocol V2 (#249)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
* Add new P2P protocol NewFile

* Publish NewFile message when any file finalized

* handle NewFile message in router

* handle NewFile in sync servic to write in db

* use propagation source to handle NewFile message

* Disable sequential sync and store new file in v2 sync store

* Add shard config in FindFile

* Add AnnounceFile RPC message in network layer

* do not propagate FindFile to whole network

* Mark peer connected if FileAnnouncement RPC message received

* fix unit test failures

* Change P2P protocol version

* Ignore py tests of sequential auto sync

* Add py test for auto sync v2

* fmt code

* remove dummy code in py test

* fix random test failure

* Add comments

* Enable file sync protocol v2 in config file by default
2024-10-28 14:56:08 +08:00
Roy Lu
fd9c033176 Updated README 2024-10-23 08:52:56 -07:00
29 changed files with 552 additions and 120 deletions

View File

@ -2,69 +2,32 @@
## Overview
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, in areas such as AI and gaming.
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
In addition, it forms the storage layer for the 0G data availability (DA) system, with the cross-layer integration abstracted away from Rollup and AppChain builders.
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/)
## System Architecture
## Integration
0G Storage consists of two main components:
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features:
1. **Data Publishing Lane**: Ensures quick data availability and verification through the 0G Consensus network.
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and reliability.
* File Merkle Tree Class
* Flow Contract Types
* RPC methods support
* File upload
* Support browser environment
* Tests for different environments (In Progress)
* File download (In Progress)
Across the two lanes, 0G Storage supports the following features:
## Deployment
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Incentivized Participation**: Utilizes the PoRA (Proof of Random Access) mining algorithm to incentivize storage network participants.
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node.
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/og-storage).
## Test
## Documentation
### Prerequisites
- If you want to run a node, please refer to the [Running a Node](https://docs.0g.ai/run-a-node/storage-node) guide.
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/build-with-0g/storage-sdk) guide.
* Required python version: 3.8, 3.9, 3.10, higher version is not guaranteed (e.g. failed to install `pysha3`).
* Install dependencies under root folder: `pip3 install -r requirements.txt`
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
### Dependencies
Python test framework will launch blockchain fullnodes at local for storage node to interact with. There are 2 kinds of fullnodes supported:
* Conflux eSpace node (by default).
* BSC node (geth).
For Conflux eSpace node, the test framework will automatically compile the binary at runtime, and copy the binary to `tests/tmp` folder. For BSC node, the test framework will automatically download the latest version binary from [github](https://github.com/bnb-chain/bsc/releases) to `tests/tmp` folder.
Alternatively, you could also manually copy specific version binaries (conflux or geth) to the `tests/tmp` folder. Note, do **NOT** copy released conflux binary on github, since block height of some CIPs are hardcoded.
For testing, it's also dependent on the following repos:
* [0G Storage Contract](https://github.com/0glabs/0g-storage-contracts): It essentially provides two abi interfaces for 0G Storage Node to interact with the on-chain contracts.
* ZgsFlow: It contains apis to submit chunk data.
* PoraMine: It contains apis to submit PoRA answers.
* [0G Storage Client](https://github.com/0glabs/0g-storage-client): It is used to interact with certain 0G Storage Nodes to upload/download files.
### Run Tests
Go to the `tests` folder and run the following command to run all tests:
```
python test_all.py
```
or, run any single test, e.g.
```
python sync_test.py
```
## Contributing
To make contributions to the project, please follow the guidelines [here](contributing.md).
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)

View File

@ -20,6 +20,8 @@ pub struct GossipCache {
topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>,
/// Timeout for Example messages.
example: Option<Duration>,
/// Timeout for NewFile messages.
new_file: Option<Duration>,
/// Timeout for FindFile messages.
find_file: Option<Duration>,
/// Timeout for FindChunks messages.
@ -37,6 +39,8 @@ pub struct GossipCacheBuilder {
default_timeout: Option<Duration>,
/// Timeout for Example messages.
example: Option<Duration>,
/// Timeout for NewFile messages.
new_file: Option<Duration>,
/// Timeout for blocks FindFile messages.
find_file: Option<Duration>,
/// Timeout for blocks FindChunks messages.
@ -64,6 +68,12 @@ impl GossipCacheBuilder {
self
}
/// Timeout for NewFile messages.
pub fn new_file_timeout(mut self, timeout: Duration) -> Self {
self.new_file = Some(timeout);
self
}
/// Timeout for FindFile messages.
pub fn find_file_timeout(mut self, timeout: Duration) -> Self {
self.find_file = Some(timeout);
@ -98,6 +108,7 @@ impl GossipCacheBuilder {
let GossipCacheBuilder {
default_timeout,
example,
new_file,
find_file,
find_chunks,
announce_file,
@ -109,6 +120,7 @@ impl GossipCacheBuilder {
expirations: DelayQueue::default(),
topic_msgs: HashMap::default(),
example: example.or(default_timeout),
new_file: new_file.or(default_timeout),
find_file: find_file.or(default_timeout),
find_chunks: find_chunks.or(default_timeout),
announce_file: announce_file.or(default_timeout),
@ -129,6 +141,7 @@ impl GossipCache {
pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
let expire_timeout = match topic.kind() {
GossipKind::Example => self.example,
GossipKind::NewFile => self.new_file,
GossipKind::FindFile => self.find_file,
GossipKind::FindChunks => self.find_chunks,
GossipKind::AnnounceFile => self.announce_file,

View File

@ -6,6 +6,7 @@ use crate::peer_manager::{
ConnectionDirection, PeerManager, PeerManagerEvent,
};
use crate::rpc::methods::DataByHashRequest;
use crate::rpc::methods::FileAnnouncement;
use crate::rpc::methods::GetChunksRequest;
use crate::rpc::*;
use crate::service::Context as ServiceContext;
@ -232,6 +233,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
topic.hash()
};
params
.topics
.insert(get_hash(GossipKind::NewFile), TopicScoreParams::default());
params
.topics
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
@ -543,6 +547,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
Request::DataByHash { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"])
}
Request::AnnounceFile { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"])
}
Request::GetChunks { .. } => {
metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"])
}
@ -755,6 +762,9 @@ where
InboundRequest::DataByHash(req) => {
self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req))
}
InboundRequest::AnnounceFile(req) => {
self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req))
}
InboundRequest::GetChunks(req) => {
self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req))
}
@ -969,6 +979,8 @@ pub enum Request {
Status(StatusMessage),
/// A data by hash request.
DataByHash(DataByHashRequest),
/// An AnnounceFile message.
AnnounceFile(FileAnnouncement),
/// A GetChunks request.
GetChunks(GetChunksRequest),
}
@ -978,6 +990,7 @@ impl std::convert::From<Request> for OutboundRequest {
match req {
Request::Status(s) => OutboundRequest::Status(s),
Request::DataByHash(r) => OutboundRequest::DataByHash(r),
Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r),
Request::GetChunks(r) => OutboundRequest::GetChunks(r),
}
}

View File

@ -93,7 +93,10 @@ pub use peer_manager::{
};
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0];
/// Defines the current P2P protocol version.
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0];
/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy)]

View File

@ -460,6 +460,7 @@ impl PeerManager {
Protocol::Goodbye => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
Protocol::DataByHash => PeerAction::MidToleranceError,
Protocol::AnnounceFile => PeerAction::MidToleranceError,
Protocol::GetChunks => PeerAction::MidToleranceError,
},
},
@ -474,6 +475,7 @@ impl PeerManager {
Protocol::Goodbye => return,
Protocol::Status => PeerAction::LowToleranceError,
Protocol::DataByHash => return,
Protocol::AnnounceFile => return,
Protocol::GetChunks => return,
}
}
@ -488,6 +490,7 @@ impl PeerManager {
Protocol::Goodbye => return,
Protocol::Status => return,
Protocol::DataByHash => PeerAction::MidToleranceError,
Protocol::AnnounceFile => PeerAction::MidToleranceError,
Protocol::GetChunks => PeerAction::MidToleranceError,
},
},

View File

@ -159,6 +159,7 @@ impl Encoder<OutboundRequest> for SSZSnappyOutboundCodec {
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(),
OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(),
OutboundRequest::GetChunks(req) => req.as_ssz_bytes(),
};
// SSZ encoded bytes should be within `max_packet_size`
@ -346,6 +347,9 @@ fn handle_v1_request(
Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest {
hashes: VariableList::from_ssz_bytes(decoded_buffer)?,
}))),
Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile(
FileAnnouncement::from_ssz_bytes(decoded_buffer)?,
))),
Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks(
GetChunksRequest::from_ssz_bytes(decoded_buffer)?,
))),
@ -373,6 +377,10 @@ fn handle_v1_response(
Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new(
ZgsData::from_ssz_bytes(decoded_buffer)?,
)))),
// This case should be unreachable as `AnnounceFile` has no response.
Protocol::AnnounceFile => Err(RPCError::InvalidData(
"AnnounceFile RPC message has no valid response".to_string(),
)),
Protocol::GetChunks => Ok(Some(RPCResponse::Chunks(
ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?,
))),

View File

@ -178,6 +178,14 @@ pub struct DataByHashRequest {
pub hashes: VariableList<Hash256, MaxRequestBlocks>,
}
// The message of `AnnounceFile` RPC message.
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
pub struct FileAnnouncement {
pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
}
/// Request a chunk array from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
pub struct GetChunksRequest {

View File

@ -118,6 +118,7 @@ impl<Id: ReqId> RPC<Id> {
.n_every(Protocol::Status, 5, Duration::from_secs(15))
.one_every(Protocol::Goodbye, Duration::from_secs(10))
.n_every(Protocol::DataByHash, 128, Duration::from_secs(10))
.n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10))
.n_every(Protocol::GetChunks, 4096, Duration::from_secs(10))
.build()
.expect("Configuration parameters are valid");

View File

@ -34,6 +34,7 @@ pub enum OutboundRequest {
Goodbye(GoodbyeReason),
Ping(Ping),
DataByHash(DataByHashRequest),
AnnounceFile(FileAnnouncement),
GetChunks(GetChunksRequest),
}
@ -72,6 +73,11 @@ impl OutboundRequest {
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
Protocol::AnnounceFile,
Version::V1,
Encoding::SSZSnappy,
)],
OutboundRequest::GetChunks(_) => vec![ProtocolId::new(
Protocol::GetChunks,
Version::V1,
@ -89,6 +95,7 @@ impl OutboundRequest {
OutboundRequest::Goodbye(_) => 0,
OutboundRequest::Ping(_) => 1,
OutboundRequest::DataByHash(req) => req.hashes.len() as u64,
OutboundRequest::AnnounceFile(_) => 0,
OutboundRequest::GetChunks(_) => 1,
}
}
@ -100,6 +107,7 @@ impl OutboundRequest {
OutboundRequest::Goodbye(_) => Protocol::Goodbye,
OutboundRequest::Ping(_) => Protocol::Ping,
OutboundRequest::DataByHash(_) => Protocol::DataByHash,
OutboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
OutboundRequest::GetChunks(_) => Protocol::GetChunks,
}
}
@ -114,6 +122,7 @@ impl OutboundRequest {
OutboundRequest::Status(_) => unreachable!(),
OutboundRequest::Goodbye(_) => unreachable!(),
OutboundRequest::Ping(_) => unreachable!(),
OutboundRequest::AnnounceFile(_) => unreachable!(),
OutboundRequest::GetChunks(_) => unreachable!(),
}
}
@ -170,6 +179,9 @@ impl std::fmt::Display for OutboundRequest {
OutboundRequest::DataByHash(req) => {
write!(f, "Data by hash: {:?}", req)
}
OutboundRequest::AnnounceFile(req) => {
write!(f, "AnnounceFile: {:?}", req)
}
OutboundRequest::GetChunks(req) => {
write!(f, "GetChunks: {:?}", req)
}

View File

@ -91,6 +91,8 @@ pub enum Protocol {
/// TODO
DataByHash,
/// The file announce protocol.
AnnounceFile,
/// The Chunk sync protocol.
GetChunks,
}
@ -115,6 +117,7 @@ impl std::fmt::Display for Protocol {
Protocol::Goodbye => "goodbye",
Protocol::Ping => "ping",
Protocol::DataByHash => "data_by_hash",
Protocol::AnnounceFile => "announce_file",
Protocol::GetChunks => "get_chunks",
};
f.write_str(repr)
@ -155,6 +158,7 @@ impl UpgradeInfo for RPCProtocol {
ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::AnnounceFile, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy),
]
}
@ -216,6 +220,10 @@ impl ProtocolId {
// TODO
RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX)
}
Protocol::AnnounceFile => RpcLimits::new(
<FileAnnouncement as Encode>::ssz_fixed_len(),
<FileAnnouncement as Encode>::ssz_fixed_len(),
),
Protocol::GetChunks => RpcLimits::new(
<GetChunksRequest as Encode>::ssz_fixed_len(),
<GetChunksRequest as Encode>::ssz_fixed_len(),
@ -243,6 +251,7 @@ impl ProtocolId {
<ZgsData as Encode>::ssz_fixed_len(),
),
Protocol::AnnounceFile => RpcLimits::new(0, 0), // AnnounceFile request has no response
Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX),
}
}
@ -325,6 +334,7 @@ pub enum InboundRequest {
Goodbye(GoodbyeReason),
Ping(Ping),
DataByHash(DataByHashRequest),
AnnounceFile(FileAnnouncement),
GetChunks(GetChunksRequest),
}
@ -363,6 +373,11 @@ impl InboundRequest {
Version::V1,
Encoding::SSZSnappy,
)],
InboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
Protocol::AnnounceFile,
Version::V1,
Encoding::SSZSnappy,
)],
InboundRequest::GetChunks(_) => vec![ProtocolId::new(
Protocol::GetChunks,
Version::V1,
@ -380,6 +395,7 @@ impl InboundRequest {
InboundRequest::Goodbye(_) => 0,
InboundRequest::DataByHash(req) => req.hashes.len() as u64,
InboundRequest::Ping(_) => 1,
InboundRequest::AnnounceFile(_) => 0,
InboundRequest::GetChunks(_) => 1,
}
}
@ -391,6 +407,7 @@ impl InboundRequest {
InboundRequest::Goodbye(_) => Protocol::Goodbye,
InboundRequest::Ping(_) => Protocol::Ping,
InboundRequest::DataByHash(_) => Protocol::DataByHash,
InboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
InboundRequest::GetChunks(_) => Protocol::GetChunks,
}
}
@ -405,6 +422,7 @@ impl InboundRequest {
InboundRequest::Status(_) => unreachable!(),
InboundRequest::Goodbye(_) => unreachable!(),
InboundRequest::Ping(_) => unreachable!(),
InboundRequest::AnnounceFile(_) => unreachable!(),
InboundRequest::GetChunks(_) => unreachable!(),
}
}
@ -523,6 +541,9 @@ impl std::fmt::Display for InboundRequest {
InboundRequest::DataByHash(req) => {
write!(f, "Data by hash: {:?}", req)
}
InboundRequest::AnnounceFile(req) => {
write!(f, "Announce File: {:?}", req)
}
InboundRequest::GetChunks(req) => {
write!(f, "Get Chunks: {:?}", req)
}

View File

@ -68,6 +68,8 @@ pub struct RPCRateLimiter {
status_rl: Limiter<PeerId>,
/// DataByHash rate limiter.
data_by_hash_rl: Limiter<PeerId>,
/// AnnounceFile rate limiter.
announce_file_rl: Limiter<PeerId>,
/// GetChunks rate limiter.
get_chunks_rl: Limiter<PeerId>,
}
@ -91,6 +93,8 @@ pub struct RPCRateLimiterBuilder {
status_quota: Option<Quota>,
/// Quota for the DataByHash protocol.
data_by_hash_quota: Option<Quota>,
/// Quota for the AnnounceFile protocol.
announce_file_quota: Option<Quota>,
/// Quota for the GetChunks protocol.
get_chunks_quota: Option<Quota>,
}
@ -109,6 +113,7 @@ impl RPCRateLimiterBuilder {
Protocol::Status => self.status_quota = q,
Protocol::Goodbye => self.goodbye_quota = q,
Protocol::DataByHash => self.data_by_hash_quota = q,
Protocol::AnnounceFile => self.announce_file_quota = q,
Protocol::GetChunks => self.get_chunks_quota = q,
}
self
@ -145,6 +150,9 @@ impl RPCRateLimiterBuilder {
let data_by_hash_quota = self
.data_by_hash_quota
.ok_or("DataByHash quota not specified")?;
let announce_file_quota = self
.announce_file_quota
.ok_or("AnnounceFile quota not specified")?;
let get_chunks_quota = self
.get_chunks_quota
.ok_or("GetChunks quota not specified")?;
@ -154,6 +162,7 @@ impl RPCRateLimiterBuilder {
let status_rl = Limiter::from_quota(status_quota)?;
let goodbye_rl = Limiter::from_quota(goodbye_quota)?;
let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?;
let announce_file_rl = Limiter::from_quota(announce_file_quota)?;
let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?;
// check for peers to prune every 30 seconds, starting in 30 seconds
@ -166,6 +175,7 @@ impl RPCRateLimiterBuilder {
status_rl,
goodbye_rl,
data_by_hash_rl,
announce_file_rl,
get_chunks_rl,
init_time: Instant::now(),
})
@ -210,6 +220,7 @@ impl RPCRateLimiter {
Protocol::Status => &mut self.status_rl,
Protocol::Goodbye => &mut self.goodbye_rl,
Protocol::DataByHash => &mut self.data_by_hash_rl,
Protocol::AnnounceFile => &mut self.announce_file_rl,
Protocol::GetChunks => &mut self.get_chunks_rl,
};
check(limiter)

View File

@ -7,7 +7,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use globals::NetworkGlobals;
pub use pubsub::{
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature,
AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
SignedMessage, SnappyTransform,
};

View File

@ -114,9 +114,22 @@ impl ssz::Decode for WrappedPeerId {
}
}
/// Published when file uploaded or completed to sync from other peers.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct NewFile {
pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
pub timestamp: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct FindFile {
pub tx_id: TxID,
pub num_shard: usize,
pub shard_id: usize,
/// Indicates whether publish to neighboar nodes only.
pub neighbors_only: bool,
pub timestamp: u32,
}
@ -205,6 +218,7 @@ type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PubsubMessage {
ExampleMessage(u64),
NewFile(NewFile),
FindFile(FindFile),
FindChunks(FindChunks),
AnnounceFile(Vec<SignedAnnounceFile>),
@ -283,6 +297,7 @@ impl PubsubMessage {
pub fn kind(&self) -> GossipKind {
match self {
PubsubMessage::ExampleMessage(_) => GossipKind::Example,
PubsubMessage::NewFile(_) => GossipKind::NewFile,
PubsubMessage::FindFile(_) => GossipKind::FindFile,
PubsubMessage::FindChunks(_) => GossipKind::FindChunks,
PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile,
@ -309,6 +324,9 @@ impl PubsubMessage {
GossipKind::Example => Ok(PubsubMessage::ExampleMessage(
u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)),
GossipKind::NewFile => Ok(PubsubMessage::NewFile(
NewFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)),
GossipKind::FindFile => Ok(PubsubMessage::FindFile(
FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
)),
@ -341,6 +359,7 @@ impl PubsubMessage {
// messages for us.
match &self {
PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(),
PubsubMessage::NewFile(data) => data.as_ssz_bytes(),
PubsubMessage::FindFile(data) => data.as_ssz_bytes(),
PubsubMessage::FindChunks(data) => data.as_ssz_bytes(),
PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(),
@ -356,6 +375,9 @@ impl std::fmt::Display for PubsubMessage {
PubsubMessage::ExampleMessage(msg) => {
write!(f, "Example message: {}", msg)
}
PubsubMessage::NewFile(msg) => {
write!(f, "NewFile message: {:?}", msg)
}
PubsubMessage::FindFile(msg) => {
write!(f, "FindFile message: {:?}", msg)
}

View File

@ -8,13 +8,15 @@ use strum::AsRefStr;
pub const TOPIC_PREFIX: &str = "eth2";
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
pub const EXAMPLE_TOPIC: &str = "example";
pub const NEW_FILE_TOPIC: &str = "new_file";
pub const FIND_FILE_TOPIC: &str = "find_file";
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
pub const CORE_TOPICS: [GossipKind; 4] = [
pub const CORE_TOPICS: [GossipKind; 5] = [
GossipKind::NewFile,
GossipKind::FindFile,
GossipKind::FindChunks,
GossipKind::AnnounceFile,
@ -37,6 +39,7 @@ pub struct GossipTopic {
#[strum(serialize_all = "snake_case")]
pub enum GossipKind {
Example,
NewFile,
FindFile,
FindChunks,
AnnounceFile,
@ -77,6 +80,7 @@ impl GossipTopic {
let kind = match topic_parts[2] {
EXAMPLE_TOPIC => GossipKind::Example,
NEW_FILE_TOPIC => GossipKind::NewFile,
FIND_FILE_TOPIC => GossipKind::FindFile,
FIND_CHUNKS_TOPIC => GossipKind::FindChunks,
ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile,
@ -106,6 +110,7 @@ impl From<GossipTopic> for String {
let kind = match topic.kind {
GossipKind::Example => EXAMPLE_TOPIC,
GossipKind::NewFile => NEW_FILE_TOPIC,
GossipKind::FindFile => FIND_FILE_TOPIC,
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
@ -125,6 +130,7 @@ impl std::fmt::Display for GossipTopic {
let kind = match self.kind {
GossipKind::Example => EXAMPLE_TOPIC,
GossipKind::NewFile => NEW_FILE_TOPIC,
GossipKind::FindFile => FIND_FILE_TOPIC,
GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,

View File

@ -5,7 +5,8 @@ use std::{ops::Neg, sync::Arc};
use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use network::multiaddr::Protocol;
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
use network::rpc::methods::FileAnnouncement;
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
use network::{
rpc::StatusMessage,
types::{
@ -29,6 +30,11 @@ use crate::peer_manager::PeerManager;
use crate::Config;
lazy_static::lazy_static! {
/// Timeout to publish NewFile message to neighbor nodes.
pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
/// Timeout to publish FindFile message to neighbor nodes.
pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
/// Timeout to publish FindFile message in the whole network.
pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
@ -219,6 +225,25 @@ impl Libp2pEventHandler {
});
metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1);
}
Request::AnnounceFile(announcement) => {
match ShardConfig::new(announcement.shard_id, announcement.num_shard) {
Ok(v) => {
self.file_location_cache.insert_peer_config(peer_id, v);
self.send_to_sync(SyncMessage::AnnounceFile {
peer_id,
request_id,
announcement,
});
}
Err(_) => self.send_to_network(NetworkMessage::ReportPeer {
peer_id,
action: PeerAction::Fatal,
source: ReportSource::RPC,
msg: "Invalid shard config in AnnounceFile RPC message",
}),
}
}
Request::DataByHash(_) => {
// ignore
}
@ -316,9 +341,13 @@ impl Libp2pEventHandler {
match message {
PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
PubsubMessage::NewFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
self.on_new_file(propagation_source, msg).await
}
PubsubMessage::FindFile(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
self.on_find_file(msg).await
self.on_find_file(propagation_source, msg).await
}
PubsubMessage::FindChunks(msg) => {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
@ -348,6 +377,63 @@ impl Libp2pEventHandler {
}
}
/// Handle NewFile pubsub message `msg` that published by `from` peer.
async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
// verify timestamp
let d = duration_since(
msg.timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *NEW_FILE_TIMEOUT {
debug!(?d, ?msg, "Invalid timestamp, ignoring NewFile message");
metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT.mark(1);
self.send_to_network(NetworkMessage::ReportPeer {
peer_id: from,
action: PeerAction::LowToleranceError,
source: ReportSource::Gossipsub,
msg: "Received out of date NewFile message",
});
return MessageAcceptance::Ignore;
}
// verify announced shard config
let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
Ok(v) => v,
Err(_) => return MessageAcceptance::Reject,
};
// ignore if shard config mismatch
let my_shard_config = self.store.get_store().get_shard_config();
if !my_shard_config.intersect(&announced_shard_config) {
return MessageAcceptance::Ignore;
}
// ignore if already exists
match self.store.check_tx_completed(msg.tx_id.seq).await {
Ok(true) => return MessageAcceptance::Ignore,
Ok(false) => {}
Err(err) => {
warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx completed");
return MessageAcceptance::Ignore;
}
}
// ignore if already pruned
match self.store.check_tx_pruned(msg.tx_id.seq).await {
Ok(true) => return MessageAcceptance::Ignore,
Ok(false) => {}
Err(err) => {
warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx pruned");
return MessageAcceptance::Ignore;
}
}
// notify sync layer to handle in advance
self.send_to_sync(SyncMessage::NewFile { from, msg });
MessageAcceptance::Ignore
}
async fn construct_announced_ip(&self) -> Option<Multiaddr> {
// public address configured
if let Some(ip) = self.config.public_address {
@ -485,27 +571,69 @@ impl Libp2pEventHandler {
Some(PubsubMessage::AnnounceShardConfig(signed))
}
async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance {
let FindFile { tx_id, timestamp } = msg;
async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
let FindFile {
tx_id, timestamp, ..
} = msg;
// verify timestamp
let d = duration_since(
timestamp,
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
);
if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
let timeout = if msg.neighbors_only {
*FIND_FILE_NEIGHBORS_TIMEOUT
} else {
*FIND_FILE_TIMEOUT
};
if d < TOLERABLE_DRIFT.neg() || d > timeout {
debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
if msg.neighbors_only {
self.send_to_network(NetworkMessage::ReportPeer {
peer_id: from,
action: PeerAction::LowToleranceError,
source: ReportSource::Gossipsub,
msg: "Received out of date FindFile message",
});
}
return MessageAcceptance::Ignore;
}
// verify announced shard config
let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
Ok(v) => v,
Err(_) => return MessageAcceptance::Reject,
};
// handle on shard config mismatch
let my_shard_config = self.store.get_store().get_shard_config();
if !my_shard_config.intersect(&announced_shard_config) {
return if msg.neighbors_only {
MessageAcceptance::Ignore
} else {
MessageAcceptance::Accept
};
}
// check if we have it
if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
if tx.id() == tx_id {
trace!(?tx_id, "Found file locally, responding to FindFile query");
if self.publish_file(tx_id).await.is_some() {
if msg.neighbors_only {
// announce file via RPC to avoid flooding pubsub message
self.send_to_network(NetworkMessage::SendRequest {
peer_id: from,
request: Request::AnnounceFile(FileAnnouncement {
tx_id,
num_shard: my_shard_config.num_shard,
shard_id: my_shard_config.shard_id,
}),
request_id: RequestId::Router(Instant::now()),
});
} else if self.publish_file(tx_id).await.is_some() {
metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1);
return MessageAcceptance::Ignore;
}
@ -513,6 +641,11 @@ impl Libp2pEventHandler {
}
}
// do not forward to whole network if only find file from neighbor nodes
if msg.neighbors_only {
return MessageAcceptance::Ignore;
}
// try from cache
if let Some(mut msg) = self.file_location_cache.get_one(tx_id) {
trace!(?tx_id, "Found file in cache, responding to FindFile query");
@ -834,7 +967,7 @@ impl Libp2pEventHandler {
}
}
pub async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
match self.file_batcher.write().await.add(tx_id) {
Some(batch) => {
let announcement = self.construct_announce_file_message(batch).await?;
@ -1203,7 +1336,13 @@ mod tests {
) -> MessageAcceptance {
let (alice, bob) = (PeerId::random(), PeerId::random());
let id = MessageId::new(b"dummy message");
let message = PubsubMessage::FindFile(FindFile { tx_id, timestamp });
let message = PubsubMessage::FindFile(FindFile {
tx_id,
num_shard: 1,
shard_id: 0,
neighbors_only: false,
timestamp,
});
handler.on_pubsub_message(alice, bob, &id, message).await
}

View File

@ -44,6 +44,11 @@ lazy_static::lazy_static! {
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
// libp2p_event_handler: new file
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "qps");
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_new_file", "latency", 1024);
pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "timeout");
// libp2p_event_handler: find & announce file
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps");
pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024);

View File

@ -5,11 +5,14 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::types::NewFile;
use network::PubsubMessage;
use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService, Swarm,
};
use pruner::PrunerMessage;
use shared_types::timestamp_now;
use std::sync::Arc;
use storage::log_store::Store as LogStore;
use storage_async::Store;
@ -44,6 +47,8 @@ pub struct RouterService {
/// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP
/// port).
upnp_mappings: (Option<u16>, Option<u16>),
store: Arc<dyn LogStore>,
}
impl RouterService {
@ -63,7 +68,6 @@ impl RouterService {
local_keypair: Keypair,
config: Config,
) {
let store = Store::new(store, executor.clone());
let peers = Arc::new(RwLock::new(PeerManager::new(config.clone())));
// create the network service and spawn the task
@ -81,11 +85,12 @@ impl RouterService {
sync_send,
chunk_pool_send,
local_keypair,
store,
Store::new(store.clone(), executor.clone()),
file_location_cache,
peers,
),
upnp_mappings: (None, None),
store,
};
// spawn service
@ -328,14 +333,15 @@ impl RouterService {
}
}
NetworkMessage::AnnounceLocalFile { tx_id } => {
if self
.libp2p_event_handler
.publish_file(tx_id)
.await
.is_some()
{
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
}
let shard_config = self.store.get_shard_config();
let msg = PubsubMessage::NewFile(NewFile {
tx_id,
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
timestamp: timestamp_now(),
});
self.libp2p.swarm.behaviour_mut().publish(vec![msg]);
metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
}
NetworkMessage::UPnPMappingEstablished {
tcp_socket,

View File

@ -59,14 +59,13 @@ impl RandomBatcher {
pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
info!("Start to sync files");
loop {
// disable file sync until catched up
if !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase");
sleep(self.config.auto_sync_idle_interval).await;
continue;
}
// wait for log entry sync catched up
while !catched_up.load(Ordering::Relaxed) {
trace!("Cannot sync file in catch-up phase");
sleep(self.config.auto_sync_idle_interval).await;
}
loop {
if let Ok(state) = self.get_state().await {
metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);

View File

@ -9,18 +9,23 @@ use storage_async::Store;
use task_executor::TaskExecutor;
use tokio::sync::{
broadcast,
mpsc::{unbounded_channel, UnboundedSender},
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::{Config, SyncSender};
use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore};
use super::{
batcher_random::RandomBatcher,
batcher_serial::SerialBatcher,
sync_store::{Queue, SyncStore},
};
pub struct AutoSyncManager {
pub serial: SerialBatcher,
pub serial: Option<SerialBatcher>,
pub random: RandomBatcher,
pub file_announcement_send: UnboundedSender<u64>,
pub new_file_send: UnboundedSender<u64>,
pub catched_up: Arc<AtomicBool>,
}
@ -33,42 +38,80 @@ impl AutoSyncManager {
log_sync_recv: broadcast::Receiver<LogSyncEvent>,
catch_up_end_recv: oneshot::Receiver<()>,
) -> Result<Self> {
let (send, recv) = unbounded_channel();
let sync_store = Arc::new(SyncStore::new(store.clone()));
let (file_announcement_send, file_announcement_recv) = unbounded_channel();
let (new_file_send, new_file_recv) = unbounded_channel();
let sync_store = if config.neighbors_only {
// use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
Arc::new(SyncStore::new_with_name(
store.clone(),
"pendingv2",
"readyv2",
))
} else {
Arc::new(SyncStore::new(store.clone()))
};
let catched_up = Arc::new(AtomicBool::new(false));
// sync in sequence
let serial =
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
.await?;
// handle new file
executor.spawn(
serial
.clone()
.start(recv, log_sync_recv, catched_up.clone()),
"auto_sync_serial",
Self::handle_new_file(new_file_recv, sync_store.clone()),
"auto_sync_handle_new_file",
);
// sync in sequence
let serial = if config.neighbors_only {
None
} else {
let serial =
SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
.await?;
executor.spawn(
serial
.clone()
.start(file_announcement_recv, log_sync_recv, catched_up.clone()),
"auto_sync_serial",
);
Some(serial)
};
// sync randomly
let random = RandomBatcher::new(config, store, sync_send, sync_store);
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
// handle on catched up notification
let catched_up_cloned = catched_up.clone();
executor.spawn(
async move {
if catch_up_end_recv.await.is_ok() {
info!("log entry catched up");
catched_up_cloned.store(true, Ordering::Relaxed);
}
},
Self::listen_catch_up(catch_up_end_recv, catched_up.clone()),
"auto_sync_wait_for_catchup",
);
Ok(Self {
serial,
random,
file_announcement_send: send,
file_announcement_send,
new_file_send,
catched_up,
})
}
async fn handle_new_file(
mut new_file_recv: UnboundedReceiver<u64>,
sync_store: Arc<SyncStore>,
) {
while let Some(tx_seq) = new_file_recv.recv().await {
if let Err(err) = sync_store.insert(tx_seq, Queue::Ready).await {
warn!(?err, %tx_seq, "Failed to insert new file to ready queue");
}
}
}
async fn listen_catch_up(
catch_up_end_recv: oneshot::Receiver<()>,
catched_up: Arc<AtomicBool>,
) {
if catch_up_end_recv.await.is_ok() {
info!("log entry catched up");
catched_up.store(true, Ordering::Relaxed);
}
}
}

View File

@ -42,6 +42,14 @@ impl SyncStore {
}
}
pub fn new_with_name(store: Store, pending: &'static str, ready: &'static str) -> Self {
Self {
store: Arc::new(RwLock::new(store)),
pending_txs: TxStore::new(pending),
ready_txs: TxStore::new(ready),
}
}
/// Returns the number of pending txs and ready txs.
pub async fn stat(&self) -> Result<(usize, usize)> {
let async_store = self.store.read().await;

View File

@ -14,12 +14,13 @@ use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
use ssz::Encode;
use std::{sync::Arc, time::Instant};
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage_async::Store;
use storage_async::{ShardConfig, Store};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FailureReason {
DBError(String),
TxReverted(TxID),
TimeoutFindFile,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@ -159,11 +160,14 @@ impl SerialSyncController {
/// Find more peers to sync chunks. Return whether `FindFile` pubsub message published,
fn try_find_peers(&mut self) {
let (published, num_new_peers) = if self.goal.is_all_chunks() {
self.publish_find_file()
} else {
let (published, num_new_peers) = if !self.goal.is_all_chunks() {
self.publish_find_chunks();
(true, 0)
} else if self.config.neighbors_only {
self.do_publish_find_file();
(true, 0)
} else {
self.publish_find_file()
};
info!(%self.tx_seq, %published, %num_new_peers, "Finding peers");
@ -199,14 +203,23 @@ impl SerialSyncController {
return (false, num_new_peers);
}
self.ctx.publish(PubsubMessage::FindFile(FindFile {
tx_id: self.tx_id,
timestamp: timestamp_now(),
}));
self.do_publish_find_file();
(true, num_new_peers)
}
fn do_publish_find_file(&self) {
let shard_config = self.store.get_store().get_shard_config();
self.ctx.publish(PubsubMessage::FindFile(FindFile {
tx_id: self.tx_id,
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
neighbors_only: self.config.neighbors_only,
timestamp: timestamp_now(),
}));
}
fn publish_find_chunks(&self) {
self.ctx.publish(PubsubMessage::FindChunks(FindChunks {
tx_id: self.tx_id,
@ -337,6 +350,14 @@ impl SerialSyncController {
}
}
/// Triggered when any peer (TCP connected) announced file via RPC message.
pub fn on_peer_announced(&mut self, peer_id: PeerId, shard_config: ShardConfig) {
self.peers
.add_new_peer_with_config(peer_id, Multiaddr::empty(), shard_config);
self.peers
.update_state_force(&peer_id, PeerState::Connected);
}
pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
match err {
DialError::ConnectionLimit(_) => {
@ -545,6 +566,9 @@ impl SerialSyncController {
info!(%self.tx_seq, "Succeeded to finalize file");
self.state = SyncState::Completed;
metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
// notify neighbor nodes about new file completed to sync
self.ctx
.send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
}
Ok(false) => {
warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx");
@ -637,12 +661,19 @@ impl SerialSyncController {
{
self.state = SyncState::FoundPeers;
} else {
// storage node may not have the specific file when `FindFile`
// gossip message received. In this case, just broadcast the
// `FindFile` message again.
// FindFile timeout
if since.elapsed() >= self.config.peer_find_timeout {
debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
self.try_find_peers();
if self.config.neighbors_only {
self.state = SyncState::Failed {
reason: FailureReason::TimeoutFindFile,
};
} else {
// storage node may not have the specific file when `FindFile`
// gossip message received. In this case, just broadcast the
// `FindFile` message again.
debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
self.try_find_peers();
}
}
completed = true;
@ -1513,6 +1544,10 @@ mod tests {
controller.on_response(peer_id, chunks).await;
assert_eq!(*controller.get_status(), SyncState::Completed);
assert!(matches!(
network_recv.try_recv().unwrap(),
NetworkMessage::AnnounceLocalFile { .. }
));
assert!(network_recv.try_recv().is_err());
}

View File

@ -21,6 +21,10 @@ use std::{
#[serde(default)]
pub struct Config {
// sync service config
/// Indicates whether to sync file from neighbor nodes only.
/// This is to avoid flooding file announcements in the whole network,
/// which leads to high latency or even timeout to sync files.
pub neighbors_only: bool,
#[serde(deserialize_with = "deserialize_duration")]
pub heartbeat_interval: Duration,
pub auto_sync_enabled: bool,
@ -64,6 +68,7 @@ impl Default for Config {
fn default() -> Self {
Self {
// sync service config
neighbors_only: false,
heartbeat_interval: Duration::from_secs(5),
auto_sync_enabled: false,
max_sync_files: 8,

View File

@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result};
use file_location_cache::FileLocationCache;
use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent;
use network::types::{AnnounceChunks, FindFile};
use network::rpc::methods::FileAnnouncement;
use network::types::{AnnounceChunks, FindFile, NewFile};
use network::PubsubMessage;
use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
@ -70,6 +71,15 @@ pub enum SyncMessage {
AnnounceChunksGossip {
msg: AnnounceChunks,
},
NewFile {
from: PeerId,
msg: NewFile,
},
AnnounceFile {
peer_id: PeerId,
request_id: PeerRequestId,
announcement: FileAnnouncement,
},
}
#[derive(Debug)]
@ -265,6 +275,12 @@ impl SyncService {
SyncMessage::AnnounceShardConfig { .. } => {
// FIXME: Check if controllers need to be reset?
}
SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await,
SyncMessage::AnnounceFile {
peer_id,
announcement,
..
} => self.on_announce_file(peer_id, announcement).await,
}
}
@ -279,7 +295,10 @@ impl SyncService {
Some(manager) => SyncServiceState {
num_syncing: self.controllers.len(),
catched_up: Some(manager.catched_up.load(Ordering::Relaxed)),
auto_sync_serial: Some(manager.serial.get_state().await),
auto_sync_serial: match &manager.serial {
Some(v) => Some(v.get_state().await),
None => None,
},
auto_sync_random: manager.random.get_state().await.ok(),
},
None => SyncServiceState {
@ -577,8 +596,12 @@ impl SyncService {
Some(tx) => tx,
None => bail!("Transaction not found"),
};
let shard_config = self.store.get_store().get_shard_config();
self.ctx.publish(PubsubMessage::FindFile(FindFile {
tx_id: tx.id(),
num_shard: shard_config.num_shard,
shard_id: shard_config.shard_id,
neighbors_only: false,
timestamp: timestamp_now(),
}));
Ok(())
@ -642,7 +665,10 @@ impl SyncService {
Some(s) => s,
None => {
debug!(%tx.seq, "No more data needed");
self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?;
if self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await? {
self.ctx
.send(NetworkMessage::AnnounceLocalFile { tx_id: tx.id() });
}
return Ok(());
}
};
@ -748,6 +774,34 @@ impl SyncService {
}
}
/// Handle on `NewFile` gossip message received.
async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
debug!(%from, ?msg, "Received NewFile gossip");
if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) {
// Notify new peer announced if file already in sync
if let Ok(shard_config) = ShardConfig::new(msg.shard_id, msg.num_shard) {
controller.on_peer_announced(from, shard_config);
controller.transition();
}
} else if let Some(manager) = &self.auto_sync_manager {
let _ = manager.new_file_send.send(msg.tx_id.seq);
}
}
/// Handle on `AnnounceFile` RPC message received.
async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) {
// Notify new peer announced if file already in sync
if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
if let Ok(shard_config) =
ShardConfig::new(announcement.shard_id, announcement.num_shard)
{
controller.on_peer_announced(from, shard_config);
controller.transition();
}
}
}
/// Terminate file sync of `min_tx_seq`.
/// If `is_reverted` is `true` (means confirmed transactions reverted),
/// also terminate `tx_seq` greater than `min_tx_seq`
@ -1504,6 +1558,10 @@ mod tests {
.await;
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
assert!(matches!(
runtime.network_recv.try_recv().unwrap(),
NetworkMessage::AnnounceLocalFile { .. }
));
assert!(!runtime.store.check_tx_completed(0).unwrap());
@ -1528,6 +1586,10 @@ mod tests {
.await;
wait_for_tx_finalized(runtime.store, tx_seq).await;
assert!(matches!(
runtime.network_recv.try_recv().unwrap(),
NetworkMessage::AnnounceLocalFile { .. }
));
sync_send
.notify(SyncMessage::PeerDisconnected {

View File

@ -232,6 +232,10 @@ batcher_announcement_capacity = 100
# all files, and sufficient disk space is required.
auto_sync_enabled = true
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

View File

@ -244,6 +244,10 @@ batcher_announcement_capacity = 100
# all files, and sufficient disk space is required.
auto_sync_enabled = true
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

View File

@ -246,6 +246,10 @@
# all files, and sufficient disk space is required.
# auto_sync_enabled = false
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
# announcements in the whole network, which leads to high latency or even timeout to sync files.
neighbors_only = true
# Maximum number of files in sync from other peers simultaneously.
# max_sync_files = 8

View File

@ -0,0 +1,34 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoRandomSyncV2Test(TestFramework):
def setup_params(self):
self.num_nodes = 4
# Enable random auto sync v2
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True,
"max_sequential_workers": 0,
"max_random_workers": 3,
"neighbors_only": True,
}
}
def run_test(self):
# Submit and upload files on node 0
data_root_1 = self.__upload_file__(0, 256 * 1024)
data_root_2 = self.__upload_file__(0, 256 * 1024)
# Files should be available on other nodes via auto sync
for i in range(1, self.num_nodes):
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoRandomSyncV2Test().main()