mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 08:37:27 +00:00 
			
		
		
		
	Compare commits
	
		
			24 Commits
		
	
	
		
			f47ac0e03a
			...
			e31dc973f2
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					e31dc973f2 | ||
| 
						 | 
					275a43e032 | ||
| 
						 | 
					d5541abdd8 | ||
| 
						 | 
					3cd3a336c2 | ||
| 
						 | 
					0f3cca9140 | ||
| 
						 | 
					cc6cf0fdb2 | ||
| 
						 | 
					51a2305c2d | ||
| 
						 | 
					98c62b314e | ||
| 
						 | 
					da903fefe7 | ||
| 
						 | 
					98438fd0e5 | ||
| 
						 | 
					247b1aaf8f | ||
| 
						 | 
					3e46e10a72 | ||
| 
						 | 
					43afc79a4a | ||
| 
						 | 
					0f93b035f1 | ||
| 
						 | 
					cebc3c8247 | ||
| 
						 | 
					25524bd9d2 | ||
| 
						 | 
					35c4760724 | ||
| 
						 | 
					09b34fbf07 | ||
| 
						 | 
					ceb165d79b | ||
| 
						 | 
					98ec24b863 | ||
| 
						 | 
					245ee0d903 | ||
| 
						 | 
					364640adc8 | ||
| 
						 | 
					d12bf66129 | ||
| 
						 | 
					e06936f381 | 
							
								
								
									
										3
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										3
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@ -224,6 +224,7 @@ dependencies = [
 | 
			
		||||
 "eth2_ssz_derive",
 | 
			
		||||
 "ethereum-types 0.14.1",
 | 
			
		||||
 "lazy_static",
 | 
			
		||||
 "metrics",
 | 
			
		||||
 "once_cell",
 | 
			
		||||
 "serde",
 | 
			
		||||
 "tiny-keccak",
 | 
			
		||||
@ -7272,8 +7273,10 @@ dependencies = [
 | 
			
		||||
 "kvdb",
 | 
			
		||||
 "kvdb-memorydb",
 | 
			
		||||
 "kvdb-rocksdb",
 | 
			
		||||
 "lazy_static",
 | 
			
		||||
 "merkle_light",
 | 
			
		||||
 "merkle_tree",
 | 
			
		||||
 "metrics",
 | 
			
		||||
 "parking_lot 0.12.3",
 | 
			
		||||
 "rand 0.8.5",
 | 
			
		||||
 "rayon",
 | 
			
		||||
 | 
			
		||||
@ -13,3 +13,5 @@ serde = { version = "1.0.137", features = ["derive"] }
 | 
			
		||||
lazy_static = "1.4.0"
 | 
			
		||||
tracing = "0.1.36"
 | 
			
		||||
once_cell = "1.19.0"
 | 
			
		||||
 | 
			
		||||
metrics = { workspace = true }
 | 
			
		||||
 | 
			
		||||
@ -1,4 +1,5 @@
 | 
			
		||||
mod merkle_tree;
 | 
			
		||||
mod metrics;
 | 
			
		||||
mod proof;
 | 
			
		||||
mod sha3;
 | 
			
		||||
 | 
			
		||||
@ -7,6 +8,7 @@ use std::cmp::Ordering;
 | 
			
		||||
use std::collections::{BTreeMap, HashMap};
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::marker::PhantomData;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use tracing::{trace, warn};
 | 
			
		||||
 | 
			
		||||
pub use crate::merkle_tree::{
 | 
			
		||||
@ -138,15 +140,18 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append(&mut self, new_leaf: E) {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if new_leaf == E::null() {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        self.layers[0].push(new_leaf);
 | 
			
		||||
        self.recompute_after_append_leaves(self.leaves() - 1);
 | 
			
		||||
        metrics::APPEND.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append_list(&mut self, mut leaf_list: Vec<E>) {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if leaf_list.contains(&E::null()) {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
@ -154,6 +159,8 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        let start_index = self.leaves();
 | 
			
		||||
        self.layers[0].append(&mut leaf_list);
 | 
			
		||||
        self.recompute_after_append_leaves(start_index);
 | 
			
		||||
 | 
			
		||||
        metrics::APPEND_LIST.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Append a leaf list by providing their intermediate node hash.
 | 
			
		||||
@ -162,6 +169,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
    /// Other nodes in the subtree will be set to `null` nodes.
 | 
			
		||||
    /// TODO: Optimize to avoid storing the `null` nodes?
 | 
			
		||||
    pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if subtree_root == E::null() {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            bail!("subtree_root is null");
 | 
			
		||||
@ -169,10 +177,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
        let start_index = self.leaves();
 | 
			
		||||
        self.append_subtree_inner(subtree_depth, subtree_root)?;
 | 
			
		||||
        self.recompute_after_append_subtree(start_index, subtree_depth - 1);
 | 
			
		||||
 | 
			
		||||
        metrics::APPEND_SUBTREE.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if subtree_list.iter().any(|(_, root)| root == &E::null()) {
 | 
			
		||||
            // appending null is not allowed.
 | 
			
		||||
            bail!("subtree_list contains null");
 | 
			
		||||
@ -182,12 +193,14 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            self.append_subtree_inner(subtree_depth, subtree_root)?;
 | 
			
		||||
            self.recompute_after_append_subtree(start_index, subtree_depth - 1);
 | 
			
		||||
        }
 | 
			
		||||
        metrics::APPEND_SUBTREE_LIST.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Change the value of the last leaf and return the new merkle root.
 | 
			
		||||
    /// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
 | 
			
		||||
    pub fn update_last(&mut self, updated_leaf: E) {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        if updated_leaf == E::null() {
 | 
			
		||||
            // updating to null is not allowed.
 | 
			
		||||
            return;
 | 
			
		||||
@ -199,6 +212,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
 | 
			
		||||
            *self.layers[0].last_mut().unwrap() = updated_leaf;
 | 
			
		||||
        }
 | 
			
		||||
        self.recompute_after_append_leaves(self.leaves() - 1);
 | 
			
		||||
        metrics::UPDATE_LAST.update_since(start_time);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Fill an unknown `null` leaf with its real value.
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										11
									
								
								common/append_merkle/src/metrics.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								common/append_merkle/src/metrics.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,11 @@
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
 | 
			
		||||
use metrics::{register_timer, Timer};
 | 
			
		||||
 | 
			
		||||
lazy_static::lazy_static! {
 | 
			
		||||
    pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
 | 
			
		||||
    pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
 | 
			
		||||
    pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
 | 
			
		||||
    pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
 | 
			
		||||
    pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
 | 
			
		||||
}
 | 
			
		||||
@ -20,6 +20,8 @@ pub struct GossipCache {
 | 
			
		||||
    topic_msgs: HashMap<GossipTopic, HashMap<Vec<u8>, Key>>,
 | 
			
		||||
    /// Timeout for Example messages.
 | 
			
		||||
    example: Option<Duration>,
 | 
			
		||||
    /// Timeout for NewFile messages.
 | 
			
		||||
    new_file: Option<Duration>,
 | 
			
		||||
    /// Timeout for FindFile messages.
 | 
			
		||||
    find_file: Option<Duration>,
 | 
			
		||||
    /// Timeout for FindChunks messages.
 | 
			
		||||
@ -37,6 +39,8 @@ pub struct GossipCacheBuilder {
 | 
			
		||||
    default_timeout: Option<Duration>,
 | 
			
		||||
    /// Timeout for Example messages.
 | 
			
		||||
    example: Option<Duration>,
 | 
			
		||||
    /// Timeout for NewFile messages.
 | 
			
		||||
    new_file: Option<Duration>,
 | 
			
		||||
    /// Timeout for blocks FindFile messages.
 | 
			
		||||
    find_file: Option<Duration>,
 | 
			
		||||
    /// Timeout for blocks FindChunks messages.
 | 
			
		||||
@ -64,6 +68,12 @@ impl GossipCacheBuilder {
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Timeout for NewFile messages.
 | 
			
		||||
    pub fn new_file_timeout(mut self, timeout: Duration) -> Self {
 | 
			
		||||
        self.new_file = Some(timeout);
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Timeout for FindFile messages.
 | 
			
		||||
    pub fn find_file_timeout(mut self, timeout: Duration) -> Self {
 | 
			
		||||
        self.find_file = Some(timeout);
 | 
			
		||||
@ -98,6 +108,7 @@ impl GossipCacheBuilder {
 | 
			
		||||
        let GossipCacheBuilder {
 | 
			
		||||
            default_timeout,
 | 
			
		||||
            example,
 | 
			
		||||
            new_file,
 | 
			
		||||
            find_file,
 | 
			
		||||
            find_chunks,
 | 
			
		||||
            announce_file,
 | 
			
		||||
@ -109,6 +120,7 @@ impl GossipCacheBuilder {
 | 
			
		||||
            expirations: DelayQueue::default(),
 | 
			
		||||
            topic_msgs: HashMap::default(),
 | 
			
		||||
            example: example.or(default_timeout),
 | 
			
		||||
            new_file: new_file.or(default_timeout),
 | 
			
		||||
            find_file: find_file.or(default_timeout),
 | 
			
		||||
            find_chunks: find_chunks.or(default_timeout),
 | 
			
		||||
            announce_file: announce_file.or(default_timeout),
 | 
			
		||||
@ -129,6 +141,7 @@ impl GossipCache {
 | 
			
		||||
    pub fn insert(&mut self, topic: GossipTopic, data: Vec<u8>) {
 | 
			
		||||
        let expire_timeout = match topic.kind() {
 | 
			
		||||
            GossipKind::Example => self.example,
 | 
			
		||||
            GossipKind::NewFile => self.new_file,
 | 
			
		||||
            GossipKind::FindFile => self.find_file,
 | 
			
		||||
            GossipKind::FindChunks => self.find_chunks,
 | 
			
		||||
            GossipKind::AnnounceFile => self.announce_file,
 | 
			
		||||
 | 
			
		||||
@ -6,6 +6,7 @@ use crate::peer_manager::{
 | 
			
		||||
    ConnectionDirection, PeerManager, PeerManagerEvent,
 | 
			
		||||
};
 | 
			
		||||
use crate::rpc::methods::DataByHashRequest;
 | 
			
		||||
use crate::rpc::methods::FileAnnouncement;
 | 
			
		||||
use crate::rpc::methods::GetChunksRequest;
 | 
			
		||||
use crate::rpc::*;
 | 
			
		||||
use crate::service::Context as ServiceContext;
 | 
			
		||||
@ -232,6 +233,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
 | 
			
		||||
            let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
 | 
			
		||||
            topic.hash()
 | 
			
		||||
        };
 | 
			
		||||
        params
 | 
			
		||||
            .topics
 | 
			
		||||
            .insert(get_hash(GossipKind::NewFile), TopicScoreParams::default());
 | 
			
		||||
        params
 | 
			
		||||
            .topics
 | 
			
		||||
            .insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
 | 
			
		||||
@ -543,6 +547,9 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
 | 
			
		||||
            Request::DataByHash { .. } => {
 | 
			
		||||
                metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_by_hash"])
 | 
			
		||||
            }
 | 
			
		||||
            Request::AnnounceFile { .. } => {
 | 
			
		||||
                metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["announce_file"])
 | 
			
		||||
            }
 | 
			
		||||
            Request::GetChunks { .. } => {
 | 
			
		||||
                metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["get_chunks"])
 | 
			
		||||
            }
 | 
			
		||||
@ -755,6 +762,9 @@ where
 | 
			
		||||
                    InboundRequest::DataByHash(req) => {
 | 
			
		||||
                        self.propagate_request(peer_request_id, peer_id, Request::DataByHash(req))
 | 
			
		||||
                    }
 | 
			
		||||
                    InboundRequest::AnnounceFile(req) => {
 | 
			
		||||
                        self.propagate_request(peer_request_id, peer_id, Request::AnnounceFile(req))
 | 
			
		||||
                    }
 | 
			
		||||
                    InboundRequest::GetChunks(req) => {
 | 
			
		||||
                        self.propagate_request(peer_request_id, peer_id, Request::GetChunks(req))
 | 
			
		||||
                    }
 | 
			
		||||
@ -969,6 +979,8 @@ pub enum Request {
 | 
			
		||||
    Status(StatusMessage),
 | 
			
		||||
    /// A data by hash request.
 | 
			
		||||
    DataByHash(DataByHashRequest),
 | 
			
		||||
    /// An AnnounceFile message.
 | 
			
		||||
    AnnounceFile(FileAnnouncement),
 | 
			
		||||
    /// A GetChunks request.
 | 
			
		||||
    GetChunks(GetChunksRequest),
 | 
			
		||||
}
 | 
			
		||||
@ -978,6 +990,7 @@ impl std::convert::From<Request> for OutboundRequest {
 | 
			
		||||
        match req {
 | 
			
		||||
            Request::Status(s) => OutboundRequest::Status(s),
 | 
			
		||||
            Request::DataByHash(r) => OutboundRequest::DataByHash(r),
 | 
			
		||||
            Request::AnnounceFile(r) => OutboundRequest::AnnounceFile(r),
 | 
			
		||||
            Request::GetChunks(r) => OutboundRequest::GetChunks(r),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -93,7 +93,10 @@ pub use peer_manager::{
 | 
			
		||||
};
 | 
			
		||||
pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_FILENAME};
 | 
			
		||||
 | 
			
		||||
pub const PROTOCOL_VERSION: [u8; 3] = [0, 1, 0];
 | 
			
		||||
/// Defines the current P2P protocol version.
 | 
			
		||||
/// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
 | 
			
		||||
/// - v2: Publish NewFile to neighbors only and announce file via RPC message.
 | 
			
		||||
pub const PROTOCOL_VERSION: [u8; 3] = [0, 2, 0];
 | 
			
		||||
 | 
			
		||||
/// Application level requests sent to the network.
 | 
			
		||||
#[derive(Debug, Clone, Copy)]
 | 
			
		||||
 | 
			
		||||
@ -460,6 +460,7 @@ impl PeerManager {
 | 
			
		||||
                    Protocol::Goodbye => PeerAction::LowToleranceError,
 | 
			
		||||
                    Protocol::Status => PeerAction::LowToleranceError,
 | 
			
		||||
                    Protocol::DataByHash => PeerAction::MidToleranceError,
 | 
			
		||||
                    Protocol::AnnounceFile => PeerAction::MidToleranceError,
 | 
			
		||||
                    Protocol::GetChunks => PeerAction::MidToleranceError,
 | 
			
		||||
                },
 | 
			
		||||
            },
 | 
			
		||||
@ -474,6 +475,7 @@ impl PeerManager {
 | 
			
		||||
                    Protocol::Goodbye => return,
 | 
			
		||||
                    Protocol::Status => PeerAction::LowToleranceError,
 | 
			
		||||
                    Protocol::DataByHash => return,
 | 
			
		||||
                    Protocol::AnnounceFile => return,
 | 
			
		||||
                    Protocol::GetChunks => return,
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
@ -488,6 +490,7 @@ impl PeerManager {
 | 
			
		||||
                    Protocol::Goodbye => return,
 | 
			
		||||
                    Protocol::Status => return,
 | 
			
		||||
                    Protocol::DataByHash => PeerAction::MidToleranceError,
 | 
			
		||||
                    Protocol::AnnounceFile => PeerAction::MidToleranceError,
 | 
			
		||||
                    Protocol::GetChunks => PeerAction::MidToleranceError,
 | 
			
		||||
                },
 | 
			
		||||
            },
 | 
			
		||||
 | 
			
		||||
@ -159,6 +159,7 @@ impl Encoder<OutboundRequest> for SSZSnappyOutboundCodec {
 | 
			
		||||
            OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
 | 
			
		||||
            OutboundRequest::Ping(req) => req.as_ssz_bytes(),
 | 
			
		||||
            OutboundRequest::DataByHash(req) => req.hashes.as_ssz_bytes(),
 | 
			
		||||
            OutboundRequest::AnnounceFile(req) => req.as_ssz_bytes(),
 | 
			
		||||
            OutboundRequest::GetChunks(req) => req.as_ssz_bytes(),
 | 
			
		||||
        };
 | 
			
		||||
        // SSZ encoded bytes should be within `max_packet_size`
 | 
			
		||||
@ -346,6 +347,9 @@ fn handle_v1_request(
 | 
			
		||||
        Protocol::DataByHash => Ok(Some(InboundRequest::DataByHash(DataByHashRequest {
 | 
			
		||||
            hashes: VariableList::from_ssz_bytes(decoded_buffer)?,
 | 
			
		||||
        }))),
 | 
			
		||||
        Protocol::AnnounceFile => Ok(Some(InboundRequest::AnnounceFile(
 | 
			
		||||
            FileAnnouncement::from_ssz_bytes(decoded_buffer)?,
 | 
			
		||||
        ))),
 | 
			
		||||
        Protocol::GetChunks => Ok(Some(InboundRequest::GetChunks(
 | 
			
		||||
            GetChunksRequest::from_ssz_bytes(decoded_buffer)?,
 | 
			
		||||
        ))),
 | 
			
		||||
@ -373,6 +377,10 @@ fn handle_v1_response(
 | 
			
		||||
        Protocol::DataByHash => Ok(Some(RPCResponse::DataByHash(Box::new(
 | 
			
		||||
            ZgsData::from_ssz_bytes(decoded_buffer)?,
 | 
			
		||||
        )))),
 | 
			
		||||
        // This case should be unreachable as `AnnounceFile` has no response.
 | 
			
		||||
        Protocol::AnnounceFile => Err(RPCError::InvalidData(
 | 
			
		||||
            "AnnounceFile RPC message has no valid response".to_string(),
 | 
			
		||||
        )),
 | 
			
		||||
        Protocol::GetChunks => Ok(Some(RPCResponse::Chunks(
 | 
			
		||||
            ChunkArrayWithProof::from_ssz_bytes(decoded_buffer)?,
 | 
			
		||||
        ))),
 | 
			
		||||
 | 
			
		||||
@ -178,6 +178,14 @@ pub struct DataByHashRequest {
 | 
			
		||||
    pub hashes: VariableList<Hash256, MaxRequestBlocks>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// The message of `AnnounceFile` RPC message.
 | 
			
		||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
 | 
			
		||||
pub struct FileAnnouncement {
 | 
			
		||||
    pub tx_id: TxID,
 | 
			
		||||
    pub num_shard: usize,
 | 
			
		||||
    pub shard_id: usize,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Request a chunk array from a peer.
 | 
			
		||||
#[derive(Encode, Decode, Clone, Debug, PartialEq, Eq)]
 | 
			
		||||
pub struct GetChunksRequest {
 | 
			
		||||
 | 
			
		||||
@ -118,6 +118,7 @@ impl<Id: ReqId> RPC<Id> {
 | 
			
		||||
            .n_every(Protocol::Status, 5, Duration::from_secs(15))
 | 
			
		||||
            .one_every(Protocol::Goodbye, Duration::from_secs(10))
 | 
			
		||||
            .n_every(Protocol::DataByHash, 128, Duration::from_secs(10))
 | 
			
		||||
            .n_every(Protocol::AnnounceFile, 256, Duration::from_secs(10))
 | 
			
		||||
            .n_every(Protocol::GetChunks, 4096, Duration::from_secs(10))
 | 
			
		||||
            .build()
 | 
			
		||||
            .expect("Configuration parameters are valid");
 | 
			
		||||
 | 
			
		||||
@ -34,6 +34,7 @@ pub enum OutboundRequest {
 | 
			
		||||
    Goodbye(GoodbyeReason),
 | 
			
		||||
    Ping(Ping),
 | 
			
		||||
    DataByHash(DataByHashRequest),
 | 
			
		||||
    AnnounceFile(FileAnnouncement),
 | 
			
		||||
    GetChunks(GetChunksRequest),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -72,6 +73,11 @@ impl OutboundRequest {
 | 
			
		||||
                Version::V1,
 | 
			
		||||
                Encoding::SSZSnappy,
 | 
			
		||||
            )],
 | 
			
		||||
            OutboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
 | 
			
		||||
                Protocol::AnnounceFile,
 | 
			
		||||
                Version::V1,
 | 
			
		||||
                Encoding::SSZSnappy,
 | 
			
		||||
            )],
 | 
			
		||||
            OutboundRequest::GetChunks(_) => vec![ProtocolId::new(
 | 
			
		||||
                Protocol::GetChunks,
 | 
			
		||||
                Version::V1,
 | 
			
		||||
@ -89,6 +95,7 @@ impl OutboundRequest {
 | 
			
		||||
            OutboundRequest::Goodbye(_) => 0,
 | 
			
		||||
            OutboundRequest::Ping(_) => 1,
 | 
			
		||||
            OutboundRequest::DataByHash(req) => req.hashes.len() as u64,
 | 
			
		||||
            OutboundRequest::AnnounceFile(_) => 0,
 | 
			
		||||
            OutboundRequest::GetChunks(_) => 1,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -100,6 +107,7 @@ impl OutboundRequest {
 | 
			
		||||
            OutboundRequest::Goodbye(_) => Protocol::Goodbye,
 | 
			
		||||
            OutboundRequest::Ping(_) => Protocol::Ping,
 | 
			
		||||
            OutboundRequest::DataByHash(_) => Protocol::DataByHash,
 | 
			
		||||
            OutboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
 | 
			
		||||
            OutboundRequest::GetChunks(_) => Protocol::GetChunks,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -114,6 +122,7 @@ impl OutboundRequest {
 | 
			
		||||
            OutboundRequest::Status(_) => unreachable!(),
 | 
			
		||||
            OutboundRequest::Goodbye(_) => unreachable!(),
 | 
			
		||||
            OutboundRequest::Ping(_) => unreachable!(),
 | 
			
		||||
            OutboundRequest::AnnounceFile(_) => unreachable!(),
 | 
			
		||||
            OutboundRequest::GetChunks(_) => unreachable!(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -170,6 +179,9 @@ impl std::fmt::Display for OutboundRequest {
 | 
			
		||||
            OutboundRequest::DataByHash(req) => {
 | 
			
		||||
                write!(f, "Data by hash: {:?}", req)
 | 
			
		||||
            }
 | 
			
		||||
            OutboundRequest::AnnounceFile(req) => {
 | 
			
		||||
                write!(f, "AnnounceFile: {:?}", req)
 | 
			
		||||
            }
 | 
			
		||||
            OutboundRequest::GetChunks(req) => {
 | 
			
		||||
                write!(f, "GetChunks: {:?}", req)
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -91,6 +91,8 @@ pub enum Protocol {
 | 
			
		||||
    /// TODO
 | 
			
		||||
    DataByHash,
 | 
			
		||||
 | 
			
		||||
    /// The file announce protocol.
 | 
			
		||||
    AnnounceFile,
 | 
			
		||||
    /// The Chunk sync protocol.
 | 
			
		||||
    GetChunks,
 | 
			
		||||
}
 | 
			
		||||
@ -115,6 +117,7 @@ impl std::fmt::Display for Protocol {
 | 
			
		||||
            Protocol::Goodbye => "goodbye",
 | 
			
		||||
            Protocol::Ping => "ping",
 | 
			
		||||
            Protocol::DataByHash => "data_by_hash",
 | 
			
		||||
            Protocol::AnnounceFile => "announce_file",
 | 
			
		||||
            Protocol::GetChunks => "get_chunks",
 | 
			
		||||
        };
 | 
			
		||||
        f.write_str(repr)
 | 
			
		||||
@ -155,6 +158,7 @@ impl UpgradeInfo for RPCProtocol {
 | 
			
		||||
            ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
 | 
			
		||||
            ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
 | 
			
		||||
            ProtocolId::new(Protocol::DataByHash, Version::V1, Encoding::SSZSnappy),
 | 
			
		||||
            ProtocolId::new(Protocol::AnnounceFile, Version::V1, Encoding::SSZSnappy),
 | 
			
		||||
            ProtocolId::new(Protocol::GetChunks, Version::V1, Encoding::SSZSnappy),
 | 
			
		||||
        ]
 | 
			
		||||
    }
 | 
			
		||||
@ -216,6 +220,10 @@ impl ProtocolId {
 | 
			
		||||
                // TODO
 | 
			
		||||
                RpcLimits::new(1, *DATA_BY_HASH_REQUEST_MAX)
 | 
			
		||||
            }
 | 
			
		||||
            Protocol::AnnounceFile => RpcLimits::new(
 | 
			
		||||
                <FileAnnouncement as Encode>::ssz_fixed_len(),
 | 
			
		||||
                <FileAnnouncement as Encode>::ssz_fixed_len(),
 | 
			
		||||
            ),
 | 
			
		||||
            Protocol::GetChunks => RpcLimits::new(
 | 
			
		||||
                <GetChunksRequest as Encode>::ssz_fixed_len(),
 | 
			
		||||
                <GetChunksRequest as Encode>::ssz_fixed_len(),
 | 
			
		||||
@ -243,6 +251,7 @@ impl ProtocolId {
 | 
			
		||||
                <ZgsData as Encode>::ssz_fixed_len(),
 | 
			
		||||
            ),
 | 
			
		||||
 | 
			
		||||
            Protocol::AnnounceFile => RpcLimits::new(0, 0), // AnnounceFile request has no response
 | 
			
		||||
            Protocol::GetChunks => RpcLimits::new(*CHUNKS_RESPONSE_MIN, *CHUNKS_RESPONSE_MAX),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -325,6 +334,7 @@ pub enum InboundRequest {
 | 
			
		||||
    Goodbye(GoodbyeReason),
 | 
			
		||||
    Ping(Ping),
 | 
			
		||||
    DataByHash(DataByHashRequest),
 | 
			
		||||
    AnnounceFile(FileAnnouncement),
 | 
			
		||||
    GetChunks(GetChunksRequest),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -363,6 +373,11 @@ impl InboundRequest {
 | 
			
		||||
                Version::V1,
 | 
			
		||||
                Encoding::SSZSnappy,
 | 
			
		||||
            )],
 | 
			
		||||
            InboundRequest::AnnounceFile(_) => vec![ProtocolId::new(
 | 
			
		||||
                Protocol::AnnounceFile,
 | 
			
		||||
                Version::V1,
 | 
			
		||||
                Encoding::SSZSnappy,
 | 
			
		||||
            )],
 | 
			
		||||
            InboundRequest::GetChunks(_) => vec![ProtocolId::new(
 | 
			
		||||
                Protocol::GetChunks,
 | 
			
		||||
                Version::V1,
 | 
			
		||||
@ -380,6 +395,7 @@ impl InboundRequest {
 | 
			
		||||
            InboundRequest::Goodbye(_) => 0,
 | 
			
		||||
            InboundRequest::DataByHash(req) => req.hashes.len() as u64,
 | 
			
		||||
            InboundRequest::Ping(_) => 1,
 | 
			
		||||
            InboundRequest::AnnounceFile(_) => 0,
 | 
			
		||||
            InboundRequest::GetChunks(_) => 1,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -391,6 +407,7 @@ impl InboundRequest {
 | 
			
		||||
            InboundRequest::Goodbye(_) => Protocol::Goodbye,
 | 
			
		||||
            InboundRequest::Ping(_) => Protocol::Ping,
 | 
			
		||||
            InboundRequest::DataByHash(_) => Protocol::DataByHash,
 | 
			
		||||
            InboundRequest::AnnounceFile(_) => Protocol::AnnounceFile,
 | 
			
		||||
            InboundRequest::GetChunks(_) => Protocol::GetChunks,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -405,6 +422,7 @@ impl InboundRequest {
 | 
			
		||||
            InboundRequest::Status(_) => unreachable!(),
 | 
			
		||||
            InboundRequest::Goodbye(_) => unreachable!(),
 | 
			
		||||
            InboundRequest::Ping(_) => unreachable!(),
 | 
			
		||||
            InboundRequest::AnnounceFile(_) => unreachable!(),
 | 
			
		||||
            InboundRequest::GetChunks(_) => unreachable!(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -523,6 +541,9 @@ impl std::fmt::Display for InboundRequest {
 | 
			
		||||
            InboundRequest::DataByHash(req) => {
 | 
			
		||||
                write!(f, "Data by hash: {:?}", req)
 | 
			
		||||
            }
 | 
			
		||||
            InboundRequest::AnnounceFile(req) => {
 | 
			
		||||
                write!(f, "Announce File: {:?}", req)
 | 
			
		||||
            }
 | 
			
		||||
            InboundRequest::GetChunks(req) => {
 | 
			
		||||
                write!(f, "Get Chunks: {:?}", req)
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -68,6 +68,8 @@ pub struct RPCRateLimiter {
 | 
			
		||||
    status_rl: Limiter<PeerId>,
 | 
			
		||||
    /// DataByHash rate limiter.
 | 
			
		||||
    data_by_hash_rl: Limiter<PeerId>,
 | 
			
		||||
    /// AnnounceFile rate limiter.
 | 
			
		||||
    announce_file_rl: Limiter<PeerId>,
 | 
			
		||||
    /// GetChunks rate limiter.
 | 
			
		||||
    get_chunks_rl: Limiter<PeerId>,
 | 
			
		||||
}
 | 
			
		||||
@ -91,6 +93,8 @@ pub struct RPCRateLimiterBuilder {
 | 
			
		||||
    status_quota: Option<Quota>,
 | 
			
		||||
    /// Quota for the DataByHash protocol.
 | 
			
		||||
    data_by_hash_quota: Option<Quota>,
 | 
			
		||||
    /// Quota for the AnnounceFile protocol.
 | 
			
		||||
    announce_file_quota: Option<Quota>,
 | 
			
		||||
    /// Quota for the GetChunks protocol.
 | 
			
		||||
    get_chunks_quota: Option<Quota>,
 | 
			
		||||
}
 | 
			
		||||
@ -109,6 +113,7 @@ impl RPCRateLimiterBuilder {
 | 
			
		||||
            Protocol::Status => self.status_quota = q,
 | 
			
		||||
            Protocol::Goodbye => self.goodbye_quota = q,
 | 
			
		||||
            Protocol::DataByHash => self.data_by_hash_quota = q,
 | 
			
		||||
            Protocol::AnnounceFile => self.announce_file_quota = q,
 | 
			
		||||
            Protocol::GetChunks => self.get_chunks_quota = q,
 | 
			
		||||
        }
 | 
			
		||||
        self
 | 
			
		||||
@ -145,6 +150,9 @@ impl RPCRateLimiterBuilder {
 | 
			
		||||
        let data_by_hash_quota = self
 | 
			
		||||
            .data_by_hash_quota
 | 
			
		||||
            .ok_or("DataByHash quota not specified")?;
 | 
			
		||||
        let announce_file_quota = self
 | 
			
		||||
            .announce_file_quota
 | 
			
		||||
            .ok_or("AnnounceFile quota not specified")?;
 | 
			
		||||
        let get_chunks_quota = self
 | 
			
		||||
            .get_chunks_quota
 | 
			
		||||
            .ok_or("GetChunks quota not specified")?;
 | 
			
		||||
@ -154,6 +162,7 @@ impl RPCRateLimiterBuilder {
 | 
			
		||||
        let status_rl = Limiter::from_quota(status_quota)?;
 | 
			
		||||
        let goodbye_rl = Limiter::from_quota(goodbye_quota)?;
 | 
			
		||||
        let data_by_hash_rl = Limiter::from_quota(data_by_hash_quota)?;
 | 
			
		||||
        let announce_file_rl = Limiter::from_quota(announce_file_quota)?;
 | 
			
		||||
        let get_chunks_rl = Limiter::from_quota(get_chunks_quota)?;
 | 
			
		||||
 | 
			
		||||
        // check for peers to prune every 30 seconds, starting in 30 seconds
 | 
			
		||||
@ -166,6 +175,7 @@ impl RPCRateLimiterBuilder {
 | 
			
		||||
            status_rl,
 | 
			
		||||
            goodbye_rl,
 | 
			
		||||
            data_by_hash_rl,
 | 
			
		||||
            announce_file_rl,
 | 
			
		||||
            get_chunks_rl,
 | 
			
		||||
            init_time: Instant::now(),
 | 
			
		||||
        })
 | 
			
		||||
@ -210,6 +220,7 @@ impl RPCRateLimiter {
 | 
			
		||||
            Protocol::Status => &mut self.status_rl,
 | 
			
		||||
            Protocol::Goodbye => &mut self.goodbye_rl,
 | 
			
		||||
            Protocol::DataByHash => &mut self.data_by_hash_rl,
 | 
			
		||||
            Protocol::AnnounceFile => &mut self.announce_file_rl,
 | 
			
		||||
            Protocol::GetChunks => &mut self.get_chunks_rl,
 | 
			
		||||
        };
 | 
			
		||||
        check(limiter)
 | 
			
		||||
 | 
			
		||||
@ -7,7 +7,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
 | 
			
		||||
 | 
			
		||||
pub use globals::NetworkGlobals;
 | 
			
		||||
pub use pubsub::{
 | 
			
		||||
    AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature,
 | 
			
		||||
    AnnounceChunks, AnnounceFile, AnnounceShardConfig, FindChunks, FindFile, HasSignature, NewFile,
 | 
			
		||||
    PubsubMessage, SignedAnnounceChunks, SignedAnnounceFile, SignedAnnounceShardConfig,
 | 
			
		||||
    SignedMessage, SnappyTransform,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@ -114,9 +114,22 @@ impl ssz::Decode for WrappedPeerId {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Published when file uploaded or completed to sync from other peers.
 | 
			
		||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
 | 
			
		||||
pub struct NewFile {
 | 
			
		||||
    pub tx_id: TxID,
 | 
			
		||||
    pub num_shard: usize,
 | 
			
		||||
    pub shard_id: usize,
 | 
			
		||||
    pub timestamp: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
 | 
			
		||||
pub struct FindFile {
 | 
			
		||||
    pub tx_id: TxID,
 | 
			
		||||
    pub num_shard: usize,
 | 
			
		||||
    pub shard_id: usize,
 | 
			
		||||
    /// Indicates whether publish to neighboar nodes only.
 | 
			
		||||
    pub neighbors_only: bool,
 | 
			
		||||
    pub timestamp: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -205,6 +218,7 @@ type SignedAnnounceFiles = Vec<SignedAnnounceFile>;
 | 
			
		||||
#[derive(Debug, Clone, PartialEq, Eq)]
 | 
			
		||||
pub enum PubsubMessage {
 | 
			
		||||
    ExampleMessage(u64),
 | 
			
		||||
    NewFile(NewFile),
 | 
			
		||||
    FindFile(FindFile),
 | 
			
		||||
    FindChunks(FindChunks),
 | 
			
		||||
    AnnounceFile(Vec<SignedAnnounceFile>),
 | 
			
		||||
@ -283,6 +297,7 @@ impl PubsubMessage {
 | 
			
		||||
    pub fn kind(&self) -> GossipKind {
 | 
			
		||||
        match self {
 | 
			
		||||
            PubsubMessage::ExampleMessage(_) => GossipKind::Example,
 | 
			
		||||
            PubsubMessage::NewFile(_) => GossipKind::NewFile,
 | 
			
		||||
            PubsubMessage::FindFile(_) => GossipKind::FindFile,
 | 
			
		||||
            PubsubMessage::FindChunks(_) => GossipKind::FindChunks,
 | 
			
		||||
            PubsubMessage::AnnounceFile(_) => GossipKind::AnnounceFile,
 | 
			
		||||
@ -309,6 +324,9 @@ impl PubsubMessage {
 | 
			
		||||
                    GossipKind::Example => Ok(PubsubMessage::ExampleMessage(
 | 
			
		||||
                        u64::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
 | 
			
		||||
                    )),
 | 
			
		||||
                    GossipKind::NewFile => Ok(PubsubMessage::NewFile(
 | 
			
		||||
                        NewFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
 | 
			
		||||
                    )),
 | 
			
		||||
                    GossipKind::FindFile => Ok(PubsubMessage::FindFile(
 | 
			
		||||
                        FindFile::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?,
 | 
			
		||||
                    )),
 | 
			
		||||
@ -341,6 +359,7 @@ impl PubsubMessage {
 | 
			
		||||
        // messages for us.
 | 
			
		||||
        match &self {
 | 
			
		||||
            PubsubMessage::ExampleMessage(data) => data.as_ssz_bytes(),
 | 
			
		||||
            PubsubMessage::NewFile(data) => data.as_ssz_bytes(),
 | 
			
		||||
            PubsubMessage::FindFile(data) => data.as_ssz_bytes(),
 | 
			
		||||
            PubsubMessage::FindChunks(data) => data.as_ssz_bytes(),
 | 
			
		||||
            PubsubMessage::AnnounceFile(data) => data.as_ssz_bytes(),
 | 
			
		||||
@ -356,6 +375,9 @@ impl std::fmt::Display for PubsubMessage {
 | 
			
		||||
            PubsubMessage::ExampleMessage(msg) => {
 | 
			
		||||
                write!(f, "Example message: {}", msg)
 | 
			
		||||
            }
 | 
			
		||||
            PubsubMessage::NewFile(msg) => {
 | 
			
		||||
                write!(f, "NewFile message: {:?}", msg)
 | 
			
		||||
            }
 | 
			
		||||
            PubsubMessage::FindFile(msg) => {
 | 
			
		||||
                write!(f, "FindFile message: {:?}", msg)
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -8,13 +8,15 @@ use strum::AsRefStr;
 | 
			
		||||
pub const TOPIC_PREFIX: &str = "eth2";
 | 
			
		||||
pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy";
 | 
			
		||||
pub const EXAMPLE_TOPIC: &str = "example";
 | 
			
		||||
pub const NEW_FILE_TOPIC: &str = "new_file";
 | 
			
		||||
pub const FIND_FILE_TOPIC: &str = "find_file";
 | 
			
		||||
pub const FIND_CHUNKS_TOPIC: &str = "find_chunks";
 | 
			
		||||
pub const ANNOUNCE_FILE_TOPIC: &str = "announce_file";
 | 
			
		||||
pub const ANNOUNCE_CHUNKS_TOPIC: &str = "announce_chunks";
 | 
			
		||||
pub const ANNOUNCE_SHARD_CONFIG_TOPIC: &str = "announce_shard_config";
 | 
			
		||||
 | 
			
		||||
pub const CORE_TOPICS: [GossipKind; 4] = [
 | 
			
		||||
pub const CORE_TOPICS: [GossipKind; 5] = [
 | 
			
		||||
    GossipKind::NewFile,
 | 
			
		||||
    GossipKind::FindFile,
 | 
			
		||||
    GossipKind::FindChunks,
 | 
			
		||||
    GossipKind::AnnounceFile,
 | 
			
		||||
@ -37,6 +39,7 @@ pub struct GossipTopic {
 | 
			
		||||
#[strum(serialize_all = "snake_case")]
 | 
			
		||||
pub enum GossipKind {
 | 
			
		||||
    Example,
 | 
			
		||||
    NewFile,
 | 
			
		||||
    FindFile,
 | 
			
		||||
    FindChunks,
 | 
			
		||||
    AnnounceFile,
 | 
			
		||||
@ -77,6 +80,7 @@ impl GossipTopic {
 | 
			
		||||
 | 
			
		||||
            let kind = match topic_parts[2] {
 | 
			
		||||
                EXAMPLE_TOPIC => GossipKind::Example,
 | 
			
		||||
                NEW_FILE_TOPIC => GossipKind::NewFile,
 | 
			
		||||
                FIND_FILE_TOPIC => GossipKind::FindFile,
 | 
			
		||||
                FIND_CHUNKS_TOPIC => GossipKind::FindChunks,
 | 
			
		||||
                ANNOUNCE_FILE_TOPIC => GossipKind::AnnounceFile,
 | 
			
		||||
@ -106,6 +110,7 @@ impl From<GossipTopic> for String {
 | 
			
		||||
 | 
			
		||||
        let kind = match topic.kind {
 | 
			
		||||
            GossipKind::Example => EXAMPLE_TOPIC,
 | 
			
		||||
            GossipKind::NewFile => NEW_FILE_TOPIC,
 | 
			
		||||
            GossipKind::FindFile => FIND_FILE_TOPIC,
 | 
			
		||||
            GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
 | 
			
		||||
            GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
 | 
			
		||||
@ -125,6 +130,7 @@ impl std::fmt::Display for GossipTopic {
 | 
			
		||||
 | 
			
		||||
        let kind = match self.kind {
 | 
			
		||||
            GossipKind::Example => EXAMPLE_TOPIC,
 | 
			
		||||
            GossipKind::NewFile => NEW_FILE_TOPIC,
 | 
			
		||||
            GossipKind::FindFile => FIND_FILE_TOPIC,
 | 
			
		||||
            GossipKind::FindChunks => FIND_CHUNKS_TOPIC,
 | 
			
		||||
            GossipKind::AnnounceFile => ANNOUNCE_FILE_TOPIC,
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,8 @@ use std::{ops::Neg, sync::Arc};
 | 
			
		||||
use chunk_pool::ChunkPoolMessage;
 | 
			
		||||
use file_location_cache::FileLocationCache;
 | 
			
		||||
use network::multiaddr::Protocol;
 | 
			
		||||
use network::types::{AnnounceShardConfig, SignedAnnounceShardConfig};
 | 
			
		||||
use network::rpc::methods::FileAnnouncement;
 | 
			
		||||
use network::types::{AnnounceShardConfig, NewFile, SignedAnnounceShardConfig};
 | 
			
		||||
use network::{
 | 
			
		||||
    rpc::StatusMessage,
 | 
			
		||||
    types::{
 | 
			
		||||
@ -29,6 +30,11 @@ use crate::peer_manager::PeerManager;
 | 
			
		||||
use crate::Config;
 | 
			
		||||
 | 
			
		||||
lazy_static::lazy_static! {
 | 
			
		||||
    /// Timeout to publish NewFile message to neighbor nodes.
 | 
			
		||||
    pub static ref NEW_FILE_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
 | 
			
		||||
    /// Timeout to publish FindFile message to neighbor nodes.
 | 
			
		||||
    pub static ref FIND_FILE_NEIGHBORS_TIMEOUT: chrono::Duration = chrono::Duration::seconds(30);
 | 
			
		||||
    /// Timeout to publish FindFile message in the whole network.
 | 
			
		||||
    pub static ref FIND_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
 | 
			
		||||
    pub static ref ANNOUNCE_FILE_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
 | 
			
		||||
    pub static ref ANNOUNCE_SHARD_CONFIG_TIMEOUT: chrono::Duration = chrono::Duration::minutes(5);
 | 
			
		||||
@ -219,6 +225,25 @@ impl Libp2pEventHandler {
 | 
			
		||||
                });
 | 
			
		||||
                metrics::LIBP2P_HANDLE_GET_CHUNKS_REQUEST.mark(1);
 | 
			
		||||
            }
 | 
			
		||||
            Request::AnnounceFile(announcement) => {
 | 
			
		||||
                match ShardConfig::new(announcement.shard_id, announcement.num_shard) {
 | 
			
		||||
                    Ok(v) => {
 | 
			
		||||
                        self.file_location_cache.insert_peer_config(peer_id, v);
 | 
			
		||||
 | 
			
		||||
                        self.send_to_sync(SyncMessage::AnnounceFile {
 | 
			
		||||
                            peer_id,
 | 
			
		||||
                            request_id,
 | 
			
		||||
                            announcement,
 | 
			
		||||
                        });
 | 
			
		||||
                    }
 | 
			
		||||
                    Err(_) => self.send_to_network(NetworkMessage::ReportPeer {
 | 
			
		||||
                        peer_id,
 | 
			
		||||
                        action: PeerAction::Fatal,
 | 
			
		||||
                        source: ReportSource::RPC,
 | 
			
		||||
                        msg: "Invalid shard config in AnnounceFile RPC message",
 | 
			
		||||
                    }),
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            Request::DataByHash(_) => {
 | 
			
		||||
                // ignore
 | 
			
		||||
            }
 | 
			
		||||
@ -316,9 +341,13 @@ impl Libp2pEventHandler {
 | 
			
		||||
 | 
			
		||||
        match message {
 | 
			
		||||
            PubsubMessage::ExampleMessage(_) => MessageAcceptance::Ignore,
 | 
			
		||||
            PubsubMessage::NewFile(msg) => {
 | 
			
		||||
                metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE.mark(1);
 | 
			
		||||
                self.on_new_file(propagation_source, msg).await
 | 
			
		||||
            }
 | 
			
		||||
            PubsubMessage::FindFile(msg) => {
 | 
			
		||||
                metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE.mark(1);
 | 
			
		||||
                self.on_find_file(msg).await
 | 
			
		||||
                self.on_find_file(propagation_source, msg).await
 | 
			
		||||
            }
 | 
			
		||||
            PubsubMessage::FindChunks(msg) => {
 | 
			
		||||
                metrics::LIBP2P_HANDLE_PUBSUB_FIND_CHUNKS.mark(1);
 | 
			
		||||
@ -348,6 +377,63 @@ impl Libp2pEventHandler {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Handle NewFile pubsub message `msg` that published by `from` peer.
 | 
			
		||||
    async fn on_new_file(&self, from: PeerId, msg: NewFile) -> MessageAcceptance {
 | 
			
		||||
        // verify timestamp
 | 
			
		||||
        let d = duration_since(
 | 
			
		||||
            msg.timestamp,
 | 
			
		||||
            metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY.clone(),
 | 
			
		||||
        );
 | 
			
		||||
        if d < TOLERABLE_DRIFT.neg() || d > *NEW_FILE_TIMEOUT {
 | 
			
		||||
            debug!(?d, ?msg, "Invalid timestamp, ignoring NewFile message");
 | 
			
		||||
            metrics::LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT.mark(1);
 | 
			
		||||
            self.send_to_network(NetworkMessage::ReportPeer {
 | 
			
		||||
                peer_id: from,
 | 
			
		||||
                action: PeerAction::LowToleranceError,
 | 
			
		||||
                source: ReportSource::Gossipsub,
 | 
			
		||||
                msg: "Received out of date NewFile message",
 | 
			
		||||
            });
 | 
			
		||||
            return MessageAcceptance::Ignore;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // verify announced shard config
 | 
			
		||||
        let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
 | 
			
		||||
            Ok(v) => v,
 | 
			
		||||
            Err(_) => return MessageAcceptance::Reject,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // ignore if shard config mismatch
 | 
			
		||||
        let my_shard_config = self.store.get_store().get_shard_config();
 | 
			
		||||
        if !my_shard_config.intersect(&announced_shard_config) {
 | 
			
		||||
            return MessageAcceptance::Ignore;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // ignore if already exists
 | 
			
		||||
        match self.store.check_tx_completed(msg.tx_id.seq).await {
 | 
			
		||||
            Ok(true) => return MessageAcceptance::Ignore,
 | 
			
		||||
            Ok(false) => {}
 | 
			
		||||
            Err(err) => {
 | 
			
		||||
                warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx completed");
 | 
			
		||||
                return MessageAcceptance::Ignore;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // ignore if already pruned
 | 
			
		||||
        match self.store.check_tx_pruned(msg.tx_id.seq).await {
 | 
			
		||||
            Ok(true) => return MessageAcceptance::Ignore,
 | 
			
		||||
            Ok(false) => {}
 | 
			
		||||
            Err(err) => {
 | 
			
		||||
                warn!(?err, tx_seq = %msg.tx_id.seq, "Failed to check tx pruned");
 | 
			
		||||
                return MessageAcceptance::Ignore;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // notify sync layer to handle in advance
 | 
			
		||||
        self.send_to_sync(SyncMessage::NewFile { from, msg });
 | 
			
		||||
 | 
			
		||||
        MessageAcceptance::Ignore
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn construct_announced_ip(&self) -> Option<Multiaddr> {
 | 
			
		||||
        // public address configured
 | 
			
		||||
        if let Some(ip) = self.config.public_address {
 | 
			
		||||
@ -485,27 +571,69 @@ impl Libp2pEventHandler {
 | 
			
		||||
        Some(PubsubMessage::AnnounceShardConfig(signed))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn on_find_file(&self, msg: FindFile) -> MessageAcceptance {
 | 
			
		||||
        let FindFile { tx_id, timestamp } = msg;
 | 
			
		||||
    async fn on_find_file(&self, from: PeerId, msg: FindFile) -> MessageAcceptance {
 | 
			
		||||
        let FindFile {
 | 
			
		||||
            tx_id, timestamp, ..
 | 
			
		||||
        } = msg;
 | 
			
		||||
 | 
			
		||||
        // verify timestamp
 | 
			
		||||
        let d = duration_since(
 | 
			
		||||
            timestamp,
 | 
			
		||||
            metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY.clone(),
 | 
			
		||||
        );
 | 
			
		||||
        if d < TOLERABLE_DRIFT.neg() || d > *FIND_FILE_TIMEOUT {
 | 
			
		||||
        let timeout = if msg.neighbors_only {
 | 
			
		||||
            *FIND_FILE_NEIGHBORS_TIMEOUT
 | 
			
		||||
        } else {
 | 
			
		||||
            *FIND_FILE_TIMEOUT
 | 
			
		||||
        };
 | 
			
		||||
        if d < TOLERABLE_DRIFT.neg() || d > timeout {
 | 
			
		||||
            debug!(%timestamp, ?d, "Invalid timestamp, ignoring FindFile message");
 | 
			
		||||
            metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_TIMEOUT.mark(1);
 | 
			
		||||
            if msg.neighbors_only {
 | 
			
		||||
                self.send_to_network(NetworkMessage::ReportPeer {
 | 
			
		||||
                    peer_id: from,
 | 
			
		||||
                    action: PeerAction::LowToleranceError,
 | 
			
		||||
                    source: ReportSource::Gossipsub,
 | 
			
		||||
                    msg: "Received out of date FindFile message",
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
            return MessageAcceptance::Ignore;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // verify announced shard config
 | 
			
		||||
        let announced_shard_config = match ShardConfig::new(msg.shard_id, msg.num_shard) {
 | 
			
		||||
            Ok(v) => v,
 | 
			
		||||
            Err(_) => return MessageAcceptance::Reject,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // handle on shard config mismatch
 | 
			
		||||
        let my_shard_config = self.store.get_store().get_shard_config();
 | 
			
		||||
        if !my_shard_config.intersect(&announced_shard_config) {
 | 
			
		||||
            return if msg.neighbors_only {
 | 
			
		||||
                MessageAcceptance::Ignore
 | 
			
		||||
            } else {
 | 
			
		||||
                MessageAcceptance::Accept
 | 
			
		||||
            };
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // check if we have it
 | 
			
		||||
        if matches!(self.store.check_tx_completed(tx_id.seq).await, Ok(true)) {
 | 
			
		||||
            if let Ok(Some(tx)) = self.store.get_tx_by_seq_number(tx_id.seq).await {
 | 
			
		||||
                if tx.id() == tx_id {
 | 
			
		||||
                    trace!(?tx_id, "Found file locally, responding to FindFile query");
 | 
			
		||||
 | 
			
		||||
                    if self.publish_file(tx_id).await.is_some() {
 | 
			
		||||
                    if msg.neighbors_only {
 | 
			
		||||
                        // announce file via RPC to avoid flooding pubsub message
 | 
			
		||||
                        self.send_to_network(NetworkMessage::SendRequest {
 | 
			
		||||
                            peer_id: from,
 | 
			
		||||
                            request: Request::AnnounceFile(FileAnnouncement {
 | 
			
		||||
                                tx_id,
 | 
			
		||||
                                num_shard: my_shard_config.num_shard,
 | 
			
		||||
                                shard_id: my_shard_config.shard_id,
 | 
			
		||||
                            }),
 | 
			
		||||
                            request_id: RequestId::Router(Instant::now()),
 | 
			
		||||
                        });
 | 
			
		||||
                    } else if self.publish_file(tx_id).await.is_some() {
 | 
			
		||||
                        metrics::LIBP2P_HANDLE_PUBSUB_FIND_FILE_STORE.mark(1);
 | 
			
		||||
                        return MessageAcceptance::Ignore;
 | 
			
		||||
                    }
 | 
			
		||||
@ -513,6 +641,11 @@ impl Libp2pEventHandler {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // do not forward to whole network if only find file from neighbor nodes
 | 
			
		||||
        if msg.neighbors_only {
 | 
			
		||||
            return MessageAcceptance::Ignore;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // try from cache
 | 
			
		||||
        if let Some(mut msg) = self.file_location_cache.get_one(tx_id) {
 | 
			
		||||
            trace!(?tx_id, "Found file in cache, responding to FindFile query");
 | 
			
		||||
@ -834,7 +967,7 @@ impl Libp2pEventHandler {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
 | 
			
		||||
    async fn publish_file(&self, tx_id: TxID) -> Option<bool> {
 | 
			
		||||
        match self.file_batcher.write().await.add(tx_id) {
 | 
			
		||||
            Some(batch) => {
 | 
			
		||||
                let announcement = self.construct_announce_file_message(batch).await?;
 | 
			
		||||
@ -1203,7 +1336,13 @@ mod tests {
 | 
			
		||||
    ) -> MessageAcceptance {
 | 
			
		||||
        let (alice, bob) = (PeerId::random(), PeerId::random());
 | 
			
		||||
        let id = MessageId::new(b"dummy message");
 | 
			
		||||
        let message = PubsubMessage::FindFile(FindFile { tx_id, timestamp });
 | 
			
		||||
        let message = PubsubMessage::FindFile(FindFile {
 | 
			
		||||
            tx_id,
 | 
			
		||||
            num_shard: 1,
 | 
			
		||||
            shard_id: 0,
 | 
			
		||||
            neighbors_only: false,
 | 
			
		||||
            timestamp,
 | 
			
		||||
        });
 | 
			
		||||
        handler.on_pubsub_message(alice, bob, &id, message).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -44,6 +44,11 @@ lazy_static::lazy_static! {
 | 
			
		||||
    pub static ref LIBP2P_HANDLE_RESPONSE_ERROR: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_response_error", "qps");
 | 
			
		||||
    pub static ref LIBP2P_HANDLE_RESPONSE_ERROR_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_response_error", "latency", 1024);
 | 
			
		||||
 | 
			
		||||
    // libp2p_event_handler: new file
 | 
			
		||||
    pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "qps");
 | 
			
		||||
    pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_new_file", "latency", 1024);
 | 
			
		||||
    pub static ref LIBP2P_HANDLE_PUBSUB_NEW_FILE_TIMEOUT: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_new_file", "timeout");
 | 
			
		||||
 | 
			
		||||
    // libp2p_event_handler: find & announce file
 | 
			
		||||
    pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE: Arc<dyn Meter> = register_meter_with_group("router_libp2p_handle_pubsub_find_file", "qps");
 | 
			
		||||
    pub static ref LIBP2P_HANDLE_PUBSUB_FIND_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register_with_group("router_libp2p_handle_pubsub_find_file", "latency", 1024);
 | 
			
		||||
 | 
			
		||||
@ -5,11 +5,14 @@ use chunk_pool::ChunkPoolMessage;
 | 
			
		||||
use file_location_cache::FileLocationCache;
 | 
			
		||||
use futures::{channel::mpsc::Sender, prelude::*};
 | 
			
		||||
use miner::MinerMessage;
 | 
			
		||||
use network::types::NewFile;
 | 
			
		||||
use network::PubsubMessage;
 | 
			
		||||
use network::{
 | 
			
		||||
    BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
 | 
			
		||||
    Service as LibP2PService, Swarm,
 | 
			
		||||
};
 | 
			
		||||
use pruner::PrunerMessage;
 | 
			
		||||
use shared_types::timestamp_now;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use storage::log_store::Store as LogStore;
 | 
			
		||||
use storage_async::Store;
 | 
			
		||||
@ -44,6 +47,8 @@ pub struct RouterService {
 | 
			
		||||
    /// Stores potentially created UPnP mappings to be removed on shutdown. (TCP port and UDP
 | 
			
		||||
    /// port).
 | 
			
		||||
    upnp_mappings: (Option<u16>, Option<u16>),
 | 
			
		||||
 | 
			
		||||
    store: Arc<dyn LogStore>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl RouterService {
 | 
			
		||||
@ -63,7 +68,6 @@ impl RouterService {
 | 
			
		||||
        local_keypair: Keypair,
 | 
			
		||||
        config: Config,
 | 
			
		||||
    ) {
 | 
			
		||||
        let store = Store::new(store, executor.clone());
 | 
			
		||||
        let peers = Arc::new(RwLock::new(PeerManager::new(config.clone())));
 | 
			
		||||
 | 
			
		||||
        // create the network service and spawn the task
 | 
			
		||||
@ -81,11 +85,12 @@ impl RouterService {
 | 
			
		||||
                sync_send,
 | 
			
		||||
                chunk_pool_send,
 | 
			
		||||
                local_keypair,
 | 
			
		||||
                store,
 | 
			
		||||
                Store::new(store.clone(), executor.clone()),
 | 
			
		||||
                file_location_cache,
 | 
			
		||||
                peers,
 | 
			
		||||
            ),
 | 
			
		||||
            upnp_mappings: (None, None),
 | 
			
		||||
            store,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // spawn service
 | 
			
		||||
@ -328,14 +333,15 @@ impl RouterService {
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            NetworkMessage::AnnounceLocalFile { tx_id } => {
 | 
			
		||||
                if self
 | 
			
		||||
                    .libp2p_event_handler
 | 
			
		||||
                    .publish_file(tx_id)
 | 
			
		||||
                    .await
 | 
			
		||||
                    .is_some()
 | 
			
		||||
                {
 | 
			
		||||
                    metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
 | 
			
		||||
                }
 | 
			
		||||
                let shard_config = self.store.get_shard_config();
 | 
			
		||||
                let msg = PubsubMessage::NewFile(NewFile {
 | 
			
		||||
                    tx_id,
 | 
			
		||||
                    num_shard: shard_config.num_shard,
 | 
			
		||||
                    shard_id: shard_config.shard_id,
 | 
			
		||||
                    timestamp: timestamp_now(),
 | 
			
		||||
                });
 | 
			
		||||
                self.libp2p.swarm.behaviour_mut().publish(vec![msg]);
 | 
			
		||||
                metrics::SERVICE_ROUTE_NETWORK_MESSAGE_ANNOUNCE_LOCAL_FILE.mark(1);
 | 
			
		||||
            }
 | 
			
		||||
            NetworkMessage::UPnPMappingEstablished {
 | 
			
		||||
                tcp_socket,
 | 
			
		||||
 | 
			
		||||
@ -31,6 +31,8 @@ parking_lot = "0.12.3"
 | 
			
		||||
serde_json = "1.0.127"
 | 
			
		||||
tokio = { version = "1.38.0", features = ["full"] }
 | 
			
		||||
task_executor = { path = "../../common/task_executor" }
 | 
			
		||||
lazy_static = "1.4.0"
 | 
			
		||||
metrics = { workspace = true }
 | 
			
		||||
 | 
			
		||||
[dev-dependencies]
 | 
			
		||||
rand = "0.8.5"
 | 
			
		||||
 | 
			
		||||
@ -1,13 +1,14 @@
 | 
			
		||||
use super::load_chunk::EntryBatch;
 | 
			
		||||
use super::seal_task_manager::SealTaskManager;
 | 
			
		||||
use super::{MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
use crate::config::ShardConfig;
 | 
			
		||||
use crate::error::Error;
 | 
			
		||||
use crate::log_store::load_chunk::EntryBatch;
 | 
			
		||||
use crate::log_store::log_manager::{
 | 
			
		||||
    bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
 | 
			
		||||
    COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
 | 
			
		||||
};
 | 
			
		||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
 | 
			
		||||
use crate::log_store::seal_task_manager::SealTaskManager;
 | 
			
		||||
use crate::log_store::{
 | 
			
		||||
    metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
 | 
			
		||||
};
 | 
			
		||||
use crate::{try_option, ZgsKeyValueDB};
 | 
			
		||||
use anyhow::{anyhow, bail, Result};
 | 
			
		||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
 | 
			
		||||
@ -20,6 +21,7 @@ use std::cmp::Ordering;
 | 
			
		||||
use std::collections::BTreeMap;
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use std::{cmp, mem};
 | 
			
		||||
use tracing::{debug, error, trace};
 | 
			
		||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
 | 
			
		||||
@ -40,7 +42,11 @@ impl FlowStore {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
 | 
			
		||||
        self.db.put_batch_root_list(root_map)
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let res = self.db.put_batch_root_list(root_map);
 | 
			
		||||
 | 
			
		||||
        metrics::PUT_BATCH_ROOT_LIST.update_since(start_time);
 | 
			
		||||
        res
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn insert_subtree_list_for_batch(
 | 
			
		||||
@ -48,6 +54,7 @@ impl FlowStore {
 | 
			
		||||
        batch_index: usize,
 | 
			
		||||
        subtree_list: Vec<(usize, usize, DataRoot)>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut batch = self
 | 
			
		||||
            .db
 | 
			
		||||
            .get_entry_batch(batch_index as u64)?
 | 
			
		||||
@ -55,6 +62,8 @@ impl FlowStore {
 | 
			
		||||
        batch.set_subtree_list(subtree_list);
 | 
			
		||||
        self.db.put_entry_raw(vec![(batch_index as u64, batch)])?;
 | 
			
		||||
 | 
			
		||||
        metrics::INSERT_SUBTREE_LIST.update_since(start_time);
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -73,7 +82,10 @@ impl FlowStore {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
 | 
			
		||||
        self.db.put_mpt_node_list(node_list)
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let res = self.db.put_mpt_node_list(node_list);
 | 
			
		||||
        metrics::PUT_MPT_NODE.update_since(start_time);
 | 
			
		||||
        res
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
 | 
			
		||||
@ -222,6 +234,7 @@ impl FlowWrite for FlowStore {
 | 
			
		||||
    /// Return the roots of completed chunks. The order is guaranteed to be increasing
 | 
			
		||||
    /// by chunk index.
 | 
			
		||||
    fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut to_seal_set = self.seal_manager.to_seal_set.write();
 | 
			
		||||
        trace!("append_entries: {} {}", data.start_index, data.data.len());
 | 
			
		||||
        if data.data.len() % BYTES_PER_SECTOR != 0 {
 | 
			
		||||
@ -264,6 +277,8 @@ impl FlowWrite for FlowStore {
 | 
			
		||||
 | 
			
		||||
            batch_list.push((chunk_index, batch));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        metrics::APPEND_ENTRIES.update_since(start_time);
 | 
			
		||||
        self.db.put_entry_batch_list(batch_list)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -373,6 +388,7 @@ impl FlowDBStore {
 | 
			
		||||
        &self,
 | 
			
		||||
        batch_list: Vec<(u64, EntryBatch)>,
 | 
			
		||||
    ) -> Result<Vec<(u64, DataRoot)>> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut completed_batches = Vec::new();
 | 
			
		||||
        let mut tx = self.kvdb.transaction();
 | 
			
		||||
        for (batch_index, batch) in batch_list {
 | 
			
		||||
@ -393,6 +409,7 @@ impl FlowDBStore {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        self.kvdb.write(tx)?;
 | 
			
		||||
        metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
 | 
			
		||||
        Ok(completed_batches)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -23,10 +23,12 @@ use std::collections::BTreeMap;
 | 
			
		||||
use std::path::Path;
 | 
			
		||||
use std::sync::mpsc;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use tracing::{debug, error, info, instrument, trace, warn};
 | 
			
		||||
 | 
			
		||||
use super::tx_store::BlockHashAndSubmissionIndex;
 | 
			
		||||
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
use crate::log_store::metrics;
 | 
			
		||||
use crate::log_store::tx_store::BlockHashAndSubmissionIndex;
 | 
			
		||||
use crate::log_store::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
 | 
			
		||||
/// 256 Bytes
 | 
			
		||||
pub const ENTRY_SIZE: usize = 256;
 | 
			
		||||
@ -880,6 +882,7 @@ impl LogManager {
 | 
			
		||||
        if merkle_list.is_empty() {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
 | 
			
		||||
        self.pad_tx(tx_start_index, &mut *merkle)?;
 | 
			
		||||
 | 
			
		||||
@ -925,12 +928,15 @@ impl LogManager {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        self.flow_store.put_batch_root_list(batch_root_map)?;
 | 
			
		||||
 | 
			
		||||
        metrics::APPEND_SUBTREE_LIST.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(skip(self, merkle))]
 | 
			
		||||
    fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
 | 
			
		||||
        // Check if we need to pad the flow.
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let mut tx_start_flow_index =
 | 
			
		||||
            merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
 | 
			
		||||
        let pad_size = tx_start_index - tx_start_flow_index;
 | 
			
		||||
@ -1020,6 +1026,8 @@ impl LogManager {
 | 
			
		||||
            merkle.pora_chunks_merkle.leaves(),
 | 
			
		||||
            merkle.last_chunk_merkle.leaves()
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        metrics::PAD_TX.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -1148,6 +1156,8 @@ impl LogManager {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
 | 
			
		||||
        let mut merkle = self.merkle.write();
 | 
			
		||||
        let shard_config = self.flow_store.get_shard_config();
 | 
			
		||||
        // We have all the data need for this tx, so just copy them.
 | 
			
		||||
@ -1196,6 +1206,8 @@ impl LogManager {
 | 
			
		||||
        for (seq, _) in to_tx_offset_list {
 | 
			
		||||
            self.tx_store.finalize_tx(seq)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -1268,6 +1280,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
 | 
			
		||||
    let start_time = Instant::now();
 | 
			
		||||
    if leaf_data.len() % ENTRY_SIZE != 0 {
 | 
			
		||||
        bail!("merkle_tree: mismatched data size");
 | 
			
		||||
    }
 | 
			
		||||
@ -1283,6 +1296,8 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
 | 
			
		||||
            .map(Sha3Algorithm::leaf)
 | 
			
		||||
            .collect()
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
 | 
			
		||||
    Ok(r)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										33
									
								
								node/storage/src/log_store/metrics.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								node/storage/src/log_store/metrics.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,33 @@
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
 | 
			
		||||
use metrics::{register_timer, Timer};
 | 
			
		||||
 | 
			
		||||
lazy_static::lazy_static! {
 | 
			
		||||
    pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
 | 
			
		||||
 | 
			
		||||
    pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_check_tx_completed");
 | 
			
		||||
 | 
			
		||||
    pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_append_subtree_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_data_to_merkle_leaves");
 | 
			
		||||
 | 
			
		||||
    pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_log_manager_copy_tx_and_finalize");
 | 
			
		||||
 | 
			
		||||
    pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
 | 
			
		||||
 | 
			
		||||
    pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_flow_store_insert_subtree_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
 | 
			
		||||
 | 
			
		||||
    pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
 | 
			
		||||
        register_timer("log_store_flow_store_put_entry_batch_list");
 | 
			
		||||
 | 
			
		||||
    pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
 | 
			
		||||
}
 | 
			
		||||
@ -15,6 +15,7 @@ pub mod config;
 | 
			
		||||
mod flow_store;
 | 
			
		||||
mod load_chunk;
 | 
			
		||||
pub mod log_manager;
 | 
			
		||||
mod metrics;
 | 
			
		||||
mod seal_task_manager;
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests;
 | 
			
		||||
 | 
			
		||||
@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
 | 
			
		||||
    data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
 | 
			
		||||
    COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
 | 
			
		||||
};
 | 
			
		||||
use crate::log_store::metrics;
 | 
			
		||||
use crate::{try_option, LogManager, ZgsKeyValueDB};
 | 
			
		||||
use anyhow::{anyhow, Result};
 | 
			
		||||
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
 | 
			
		||||
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::sync::atomic::{AtomicU64, Ordering};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Instant;
 | 
			
		||||
use tracing::{error, instrument};
 | 
			
		||||
 | 
			
		||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
 | 
			
		||||
@ -51,6 +53,8 @@ impl TransactionStore {
 | 
			
		||||
    #[instrument(skip(self))]
 | 
			
		||||
    /// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
 | 
			
		||||
    pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
 | 
			
		||||
        let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
 | 
			
		||||
        if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
 | 
			
		||||
            // The last tx is inserted again, so no need to process it.
 | 
			
		||||
@ -86,6 +90,7 @@ impl TransactionStore {
 | 
			
		||||
        );
 | 
			
		||||
        self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
 | 
			
		||||
        self.kvdb.write(db_tx)?;
 | 
			
		||||
        metrics::TX_STORE_PUT.update_since(start_time);
 | 
			
		||||
        Ok(old_tx_seq_list)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -175,8 +180,12 @@ impl TransactionStore {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
 | 
			
		||||
        Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
 | 
			
		||||
            == Some(vec![TX_STATUS_FINALIZED]))
 | 
			
		||||
        let start_time = Instant::now();
 | 
			
		||||
        let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
 | 
			
		||||
            == Some(vec![TX_STATUS_FINALIZED]);
 | 
			
		||||
 | 
			
		||||
        metrics::CHECK_TX_COMPLETED.update_since(start_time);
 | 
			
		||||
        Ok(res)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
 | 
			
		||||
 | 
			
		||||
@ -59,14 +59,13 @@ impl RandomBatcher {
 | 
			
		||||
    pub async fn start(mut self, catched_up: Arc<AtomicBool>) {
 | 
			
		||||
        info!("Start to sync files");
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            // disable file sync until catched up
 | 
			
		||||
            if !catched_up.load(Ordering::Relaxed) {
 | 
			
		||||
                trace!("Cannot sync file in catch-up phase");
 | 
			
		||||
                sleep(self.config.auto_sync_idle_interval).await;
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
        // wait for log entry sync catched up
 | 
			
		||||
        while !catched_up.load(Ordering::Relaxed) {
 | 
			
		||||
            trace!("Cannot sync file in catch-up phase");
 | 
			
		||||
            sleep(self.config.auto_sync_idle_interval).await;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok(state) = self.get_state().await {
 | 
			
		||||
                metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
 | 
			
		||||
                metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
 | 
			
		||||
 | 
			
		||||
@ -9,18 +9,23 @@ use storage_async::Store;
 | 
			
		||||
use task_executor::TaskExecutor;
 | 
			
		||||
use tokio::sync::{
 | 
			
		||||
    broadcast,
 | 
			
		||||
    mpsc::{unbounded_channel, UnboundedSender},
 | 
			
		||||
    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
 | 
			
		||||
    oneshot,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use crate::{Config, SyncSender};
 | 
			
		||||
 | 
			
		||||
use super::{batcher_random::RandomBatcher, batcher_serial::SerialBatcher, sync_store::SyncStore};
 | 
			
		||||
use super::{
 | 
			
		||||
    batcher_random::RandomBatcher,
 | 
			
		||||
    batcher_serial::SerialBatcher,
 | 
			
		||||
    sync_store::{Queue, SyncStore},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
pub struct AutoSyncManager {
 | 
			
		||||
    pub serial: SerialBatcher,
 | 
			
		||||
    pub serial: Option<SerialBatcher>,
 | 
			
		||||
    pub random: RandomBatcher,
 | 
			
		||||
    pub file_announcement_send: UnboundedSender<u64>,
 | 
			
		||||
    pub new_file_send: UnboundedSender<u64>,
 | 
			
		||||
    pub catched_up: Arc<AtomicBool>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -33,42 +38,80 @@ impl AutoSyncManager {
 | 
			
		||||
        log_sync_recv: broadcast::Receiver<LogSyncEvent>,
 | 
			
		||||
        catch_up_end_recv: oneshot::Receiver<()>,
 | 
			
		||||
    ) -> Result<Self> {
 | 
			
		||||
        let (send, recv) = unbounded_channel();
 | 
			
		||||
        let sync_store = Arc::new(SyncStore::new(store.clone()));
 | 
			
		||||
        let (file_announcement_send, file_announcement_recv) = unbounded_channel();
 | 
			
		||||
        let (new_file_send, new_file_recv) = unbounded_channel();
 | 
			
		||||
        let sync_store = if config.neighbors_only {
 | 
			
		||||
            // use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
 | 
			
		||||
            Arc::new(SyncStore::new_with_name(
 | 
			
		||||
                store.clone(),
 | 
			
		||||
                "pendingv2",
 | 
			
		||||
                "readyv2",
 | 
			
		||||
            ))
 | 
			
		||||
        } else {
 | 
			
		||||
            Arc::new(SyncStore::new(store.clone()))
 | 
			
		||||
        };
 | 
			
		||||
        let catched_up = Arc::new(AtomicBool::new(false));
 | 
			
		||||
 | 
			
		||||
        // sync in sequence
 | 
			
		||||
        let serial =
 | 
			
		||||
            SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
 | 
			
		||||
                .await?;
 | 
			
		||||
        // handle new file
 | 
			
		||||
        executor.spawn(
 | 
			
		||||
            serial
 | 
			
		||||
                .clone()
 | 
			
		||||
                .start(recv, log_sync_recv, catched_up.clone()),
 | 
			
		||||
            "auto_sync_serial",
 | 
			
		||||
            Self::handle_new_file(new_file_recv, sync_store.clone()),
 | 
			
		||||
            "auto_sync_handle_new_file",
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        // sync in sequence
 | 
			
		||||
        let serial = if config.neighbors_only {
 | 
			
		||||
            None
 | 
			
		||||
        } else {
 | 
			
		||||
            let serial =
 | 
			
		||||
                SerialBatcher::new(config, store.clone(), sync_send.clone(), sync_store.clone())
 | 
			
		||||
                    .await?;
 | 
			
		||||
            executor.spawn(
 | 
			
		||||
                serial
 | 
			
		||||
                    .clone()
 | 
			
		||||
                    .start(file_announcement_recv, log_sync_recv, catched_up.clone()),
 | 
			
		||||
                "auto_sync_serial",
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            Some(serial)
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // sync randomly
 | 
			
		||||
        let random = RandomBatcher::new(config, store, sync_send, sync_store);
 | 
			
		||||
        executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");
 | 
			
		||||
 | 
			
		||||
        // handle on catched up notification
 | 
			
		||||
        let catched_up_cloned = catched_up.clone();
 | 
			
		||||
        executor.spawn(
 | 
			
		||||
            async move {
 | 
			
		||||
                if catch_up_end_recv.await.is_ok() {
 | 
			
		||||
                    info!("log entry catched up");
 | 
			
		||||
                    catched_up_cloned.store(true, Ordering::Relaxed);
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            Self::listen_catch_up(catch_up_end_recv, catched_up.clone()),
 | 
			
		||||
            "auto_sync_wait_for_catchup",
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            serial,
 | 
			
		||||
            random,
 | 
			
		||||
            file_announcement_send: send,
 | 
			
		||||
            file_announcement_send,
 | 
			
		||||
            new_file_send,
 | 
			
		||||
            catched_up,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn handle_new_file(
 | 
			
		||||
        mut new_file_recv: UnboundedReceiver<u64>,
 | 
			
		||||
        sync_store: Arc<SyncStore>,
 | 
			
		||||
    ) {
 | 
			
		||||
        while let Some(tx_seq) = new_file_recv.recv().await {
 | 
			
		||||
            if let Err(err) = sync_store.insert(tx_seq, Queue::Ready).await {
 | 
			
		||||
                warn!(?err, %tx_seq, "Failed to insert new file to ready queue");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn listen_catch_up(
 | 
			
		||||
        catch_up_end_recv: oneshot::Receiver<()>,
 | 
			
		||||
        catched_up: Arc<AtomicBool>,
 | 
			
		||||
    ) {
 | 
			
		||||
        if catch_up_end_recv.await.is_ok() {
 | 
			
		||||
            info!("log entry catched up");
 | 
			
		||||
            catched_up.store(true, Ordering::Relaxed);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -42,6 +42,14 @@ impl SyncStore {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn new_with_name(store: Store, pending: &'static str, ready: &'static str) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            store: Arc::new(RwLock::new(store)),
 | 
			
		||||
            pending_txs: TxStore::new(pending),
 | 
			
		||||
            ready_txs: TxStore::new(ready),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Returns the number of pending txs and ready txs.
 | 
			
		||||
    pub async fn stat(&self) -> Result<(usize, usize)> {
 | 
			
		||||
        let async_store = self.store.read().await;
 | 
			
		||||
 | 
			
		||||
@ -14,12 +14,13 @@ use shared_types::{timestamp_now, ChunkArrayWithProof, TxID, CHUNK_SIZE};
 | 
			
		||||
use ssz::Encode;
 | 
			
		||||
use std::{sync::Arc, time::Instant};
 | 
			
		||||
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
 | 
			
		||||
use storage_async::Store;
 | 
			
		||||
use storage_async::{ShardConfig, Store};
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug, PartialEq, Eq)]
 | 
			
		||||
pub enum FailureReason {
 | 
			
		||||
    DBError(String),
 | 
			
		||||
    TxReverted(TxID),
 | 
			
		||||
    TimeoutFindFile,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug, PartialEq, Eq)]
 | 
			
		||||
@ -159,11 +160,14 @@ impl SerialSyncController {
 | 
			
		||||
 | 
			
		||||
    /// Find more peers to sync chunks. Return whether `FindFile` pubsub message published,
 | 
			
		||||
    fn try_find_peers(&mut self) {
 | 
			
		||||
        let (published, num_new_peers) = if self.goal.is_all_chunks() {
 | 
			
		||||
            self.publish_find_file()
 | 
			
		||||
        } else {
 | 
			
		||||
        let (published, num_new_peers) = if !self.goal.is_all_chunks() {
 | 
			
		||||
            self.publish_find_chunks();
 | 
			
		||||
            (true, 0)
 | 
			
		||||
        } else if self.config.neighbors_only {
 | 
			
		||||
            self.do_publish_find_file();
 | 
			
		||||
            (true, 0)
 | 
			
		||||
        } else {
 | 
			
		||||
            self.publish_find_file()
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        info!(%self.tx_seq, %published, %num_new_peers, "Finding peers");
 | 
			
		||||
@ -199,14 +203,23 @@ impl SerialSyncController {
 | 
			
		||||
            return (false, num_new_peers);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.ctx.publish(PubsubMessage::FindFile(FindFile {
 | 
			
		||||
            tx_id: self.tx_id,
 | 
			
		||||
            timestamp: timestamp_now(),
 | 
			
		||||
        }));
 | 
			
		||||
        self.do_publish_find_file();
 | 
			
		||||
 | 
			
		||||
        (true, num_new_peers)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn do_publish_find_file(&self) {
 | 
			
		||||
        let shard_config = self.store.get_store().get_shard_config();
 | 
			
		||||
 | 
			
		||||
        self.ctx.publish(PubsubMessage::FindFile(FindFile {
 | 
			
		||||
            tx_id: self.tx_id,
 | 
			
		||||
            num_shard: shard_config.num_shard,
 | 
			
		||||
            shard_id: shard_config.shard_id,
 | 
			
		||||
            neighbors_only: self.config.neighbors_only,
 | 
			
		||||
            timestamp: timestamp_now(),
 | 
			
		||||
        }));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn publish_find_chunks(&self) {
 | 
			
		||||
        self.ctx.publish(PubsubMessage::FindChunks(FindChunks {
 | 
			
		||||
            tx_id: self.tx_id,
 | 
			
		||||
@ -337,6 +350,14 @@ impl SerialSyncController {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Triggered when any peer (TCP connected) announced file via RPC message.
 | 
			
		||||
    pub fn on_peer_announced(&mut self, peer_id: PeerId, shard_config: ShardConfig) {
 | 
			
		||||
        self.peers
 | 
			
		||||
            .add_new_peer_with_config(peer_id, Multiaddr::empty(), shard_config);
 | 
			
		||||
        self.peers
 | 
			
		||||
            .update_state_force(&peer_id, PeerState::Connected);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn on_dail_failed(&mut self, peer_id: PeerId, err: &DialError) {
 | 
			
		||||
        match err {
 | 
			
		||||
            DialError::ConnectionLimit(_) => {
 | 
			
		||||
@ -545,6 +566,9 @@ impl SerialSyncController {
 | 
			
		||||
                info!(%self.tx_seq, "Succeeded to finalize file");
 | 
			
		||||
                self.state = SyncState::Completed;
 | 
			
		||||
                metrics::SERIAL_SYNC_FILE_COMPLETED.update_since(self.since.0);
 | 
			
		||||
                // notify neighbor nodes about new file completed to sync
 | 
			
		||||
                self.ctx
 | 
			
		||||
                    .send(NetworkMessage::AnnounceLocalFile { tx_id: self.tx_id });
 | 
			
		||||
            }
 | 
			
		||||
            Ok(false) => {
 | 
			
		||||
                warn!(?self.tx_id, %self.tx_seq, "Transaction reverted during finalize_tx");
 | 
			
		||||
@ -637,12 +661,19 @@ impl SerialSyncController {
 | 
			
		||||
                    {
 | 
			
		||||
                        self.state = SyncState::FoundPeers;
 | 
			
		||||
                    } else {
 | 
			
		||||
                        // storage node may not have the specific file when `FindFile`
 | 
			
		||||
                        // gossip message received. In this case, just broadcast the
 | 
			
		||||
                        // `FindFile` message again.
 | 
			
		||||
                        // FindFile timeout
 | 
			
		||||
                        if since.elapsed() >= self.config.peer_find_timeout {
 | 
			
		||||
                            debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
 | 
			
		||||
                            self.try_find_peers();
 | 
			
		||||
                            if self.config.neighbors_only {
 | 
			
		||||
                                self.state = SyncState::Failed {
 | 
			
		||||
                                    reason: FailureReason::TimeoutFindFile,
 | 
			
		||||
                                };
 | 
			
		||||
                            } else {
 | 
			
		||||
                                // storage node may not have the specific file when `FindFile`
 | 
			
		||||
                                // gossip message received. In this case, just broadcast the
 | 
			
		||||
                                // `FindFile` message again.
 | 
			
		||||
                                debug!(%self.tx_seq, "Finding peer timeout and try to find peers again");
 | 
			
		||||
                                self.try_find_peers();
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
                        completed = true;
 | 
			
		||||
@ -1513,6 +1544,10 @@ mod tests {
 | 
			
		||||
 | 
			
		||||
        controller.on_response(peer_id, chunks).await;
 | 
			
		||||
        assert_eq!(*controller.get_status(), SyncState::Completed);
 | 
			
		||||
        assert!(matches!(
 | 
			
		||||
            network_recv.try_recv().unwrap(),
 | 
			
		||||
            NetworkMessage::AnnounceLocalFile { .. }
 | 
			
		||||
        ));
 | 
			
		||||
        assert!(network_recv.try_recv().is_err());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,10 @@ use std::{
 | 
			
		||||
#[serde(default)]
 | 
			
		||||
pub struct Config {
 | 
			
		||||
    // sync service config
 | 
			
		||||
    /// Indicates whether to sync file from neighbor nodes only.
 | 
			
		||||
    /// This is to avoid flooding file announcements in the whole network,
 | 
			
		||||
    /// which leads to high latency or even timeout to sync files.
 | 
			
		||||
    pub neighbors_only: bool,
 | 
			
		||||
    #[serde(deserialize_with = "deserialize_duration")]
 | 
			
		||||
    pub heartbeat_interval: Duration,
 | 
			
		||||
    pub auto_sync_enabled: bool,
 | 
			
		||||
@ -64,6 +68,7 @@ impl Default for Config {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            // sync service config
 | 
			
		||||
            neighbors_only: false,
 | 
			
		||||
            heartbeat_interval: Duration::from_secs(5),
 | 
			
		||||
            auto_sync_enabled: false,
 | 
			
		||||
            max_sync_files: 8,
 | 
			
		||||
 | 
			
		||||
@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result};
 | 
			
		||||
use file_location_cache::FileLocationCache;
 | 
			
		||||
use libp2p::swarm::DialError;
 | 
			
		||||
use log_entry_sync::LogSyncEvent;
 | 
			
		||||
use network::types::{AnnounceChunks, FindFile};
 | 
			
		||||
use network::rpc::methods::FileAnnouncement;
 | 
			
		||||
use network::types::{AnnounceChunks, FindFile, NewFile};
 | 
			
		||||
use network::PubsubMessage;
 | 
			
		||||
use network::{
 | 
			
		||||
    rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
 | 
			
		||||
@ -70,6 +71,15 @@ pub enum SyncMessage {
 | 
			
		||||
    AnnounceChunksGossip {
 | 
			
		||||
        msg: AnnounceChunks,
 | 
			
		||||
    },
 | 
			
		||||
    NewFile {
 | 
			
		||||
        from: PeerId,
 | 
			
		||||
        msg: NewFile,
 | 
			
		||||
    },
 | 
			
		||||
    AnnounceFile {
 | 
			
		||||
        peer_id: PeerId,
 | 
			
		||||
        request_id: PeerRequestId,
 | 
			
		||||
        announcement: FileAnnouncement,
 | 
			
		||||
    },
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
@ -265,6 +275,12 @@ impl SyncService {
 | 
			
		||||
            SyncMessage::AnnounceShardConfig { .. } => {
 | 
			
		||||
                // FIXME: Check if controllers need to be reset?
 | 
			
		||||
            }
 | 
			
		||||
            SyncMessage::NewFile { from, msg } => self.on_new_file_gossip(from, msg).await,
 | 
			
		||||
            SyncMessage::AnnounceFile {
 | 
			
		||||
                peer_id,
 | 
			
		||||
                announcement,
 | 
			
		||||
                ..
 | 
			
		||||
            } => self.on_announce_file(peer_id, announcement).await,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -279,7 +295,10 @@ impl SyncService {
 | 
			
		||||
                    Some(manager) => SyncServiceState {
 | 
			
		||||
                        num_syncing: self.controllers.len(),
 | 
			
		||||
                        catched_up: Some(manager.catched_up.load(Ordering::Relaxed)),
 | 
			
		||||
                        auto_sync_serial: Some(manager.serial.get_state().await),
 | 
			
		||||
                        auto_sync_serial: match &manager.serial {
 | 
			
		||||
                            Some(v) => Some(v.get_state().await),
 | 
			
		||||
                            None => None,
 | 
			
		||||
                        },
 | 
			
		||||
                        auto_sync_random: manager.random.get_state().await.ok(),
 | 
			
		||||
                    },
 | 
			
		||||
                    None => SyncServiceState {
 | 
			
		||||
@ -577,8 +596,12 @@ impl SyncService {
 | 
			
		||||
            Some(tx) => tx,
 | 
			
		||||
            None => bail!("Transaction not found"),
 | 
			
		||||
        };
 | 
			
		||||
        let shard_config = self.store.get_store().get_shard_config();
 | 
			
		||||
        self.ctx.publish(PubsubMessage::FindFile(FindFile {
 | 
			
		||||
            tx_id: tx.id(),
 | 
			
		||||
            num_shard: shard_config.num_shard,
 | 
			
		||||
            shard_id: shard_config.shard_id,
 | 
			
		||||
            neighbors_only: false,
 | 
			
		||||
            timestamp: timestamp_now(),
 | 
			
		||||
        }));
 | 
			
		||||
        Ok(())
 | 
			
		||||
@ -642,7 +665,10 @@ impl SyncService {
 | 
			
		||||
                            Some(s) => s,
 | 
			
		||||
                            None => {
 | 
			
		||||
                                debug!(%tx.seq, "No more data needed");
 | 
			
		||||
                                self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await?;
 | 
			
		||||
                                if self.store.finalize_tx_with_hash(tx.seq, tx.hash()).await? {
 | 
			
		||||
                                    self.ctx
 | 
			
		||||
                                        .send(NetworkMessage::AnnounceLocalFile { tx_id: tx.id() });
 | 
			
		||||
                                }
 | 
			
		||||
                                return Ok(());
 | 
			
		||||
                            }
 | 
			
		||||
                        };
 | 
			
		||||
@ -748,6 +774,34 @@ impl SyncService {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Handle on `NewFile` gossip message received.
 | 
			
		||||
    async fn on_new_file_gossip(&mut self, from: PeerId, msg: NewFile) {
 | 
			
		||||
        debug!(%from, ?msg, "Received NewFile gossip");
 | 
			
		||||
 | 
			
		||||
        if let Some(controller) = self.controllers.get_mut(&msg.tx_id.seq) {
 | 
			
		||||
            // Notify new peer announced if file already in sync
 | 
			
		||||
            if let Ok(shard_config) = ShardConfig::new(msg.shard_id, msg.num_shard) {
 | 
			
		||||
                controller.on_peer_announced(from, shard_config);
 | 
			
		||||
                controller.transition();
 | 
			
		||||
            }
 | 
			
		||||
        } else if let Some(manager) = &self.auto_sync_manager {
 | 
			
		||||
            let _ = manager.new_file_send.send(msg.tx_id.seq);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Handle on `AnnounceFile` RPC message received.
 | 
			
		||||
    async fn on_announce_file(&mut self, from: PeerId, announcement: FileAnnouncement) {
 | 
			
		||||
        // Notify new peer announced if file already in sync
 | 
			
		||||
        if let Some(controller) = self.controllers.get_mut(&announcement.tx_id.seq) {
 | 
			
		||||
            if let Ok(shard_config) =
 | 
			
		||||
                ShardConfig::new(announcement.shard_id, announcement.num_shard)
 | 
			
		||||
            {
 | 
			
		||||
                controller.on_peer_announced(from, shard_config);
 | 
			
		||||
                controller.transition();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Terminate file sync of `min_tx_seq`.
 | 
			
		||||
    /// If `is_reverted` is `true` (means confirmed transactions reverted),
 | 
			
		||||
    /// also terminate `tx_seq` greater than `min_tx_seq`
 | 
			
		||||
@ -1504,6 +1558,10 @@ mod tests {
 | 
			
		||||
        .await;
 | 
			
		||||
 | 
			
		||||
        wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
 | 
			
		||||
        assert!(matches!(
 | 
			
		||||
            runtime.network_recv.try_recv().unwrap(),
 | 
			
		||||
            NetworkMessage::AnnounceLocalFile { .. }
 | 
			
		||||
        ));
 | 
			
		||||
 | 
			
		||||
        assert!(!runtime.store.check_tx_completed(0).unwrap());
 | 
			
		||||
 | 
			
		||||
@ -1528,6 +1586,10 @@ mod tests {
 | 
			
		||||
        .await;
 | 
			
		||||
 | 
			
		||||
        wait_for_tx_finalized(runtime.store, tx_seq).await;
 | 
			
		||||
        assert!(matches!(
 | 
			
		||||
            runtime.network_recv.try_recv().unwrap(),
 | 
			
		||||
            NetworkMessage::AnnounceLocalFile { .. }
 | 
			
		||||
        ));
 | 
			
		||||
 | 
			
		||||
        sync_send
 | 
			
		||||
            .notify(SyncMessage::PeerDisconnected {
 | 
			
		||||
 | 
			
		||||
@ -232,6 +232,10 @@ batcher_announcement_capacity = 100
 | 
			
		||||
# all files, and sufficient disk space is required.
 | 
			
		||||
auto_sync_enabled = true
 | 
			
		||||
 | 
			
		||||
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
 | 
			
		||||
# announcements in the whole network, which leads to high latency or even timeout to sync files.
 | 
			
		||||
neighbors_only = true
 | 
			
		||||
 | 
			
		||||
#  Maximum number of files in sync from other peers simultaneously.
 | 
			
		||||
# max_sync_files = 8
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -244,6 +244,10 @@ batcher_announcement_capacity = 100
 | 
			
		||||
# all files, and sufficient disk space is required.
 | 
			
		||||
auto_sync_enabled = true
 | 
			
		||||
 | 
			
		||||
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
 | 
			
		||||
# announcements in the whole network, which leads to high latency or even timeout to sync files.
 | 
			
		||||
neighbors_only = true
 | 
			
		||||
 | 
			
		||||
#  Maximum number of files in sync from other peers simultaneously.
 | 
			
		||||
# max_sync_files = 8
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -246,6 +246,10 @@
 | 
			
		||||
# all files, and sufficient disk space is required.
 | 
			
		||||
# auto_sync_enabled = false
 | 
			
		||||
 | 
			
		||||
# Indicates whether to sync file from neighbor nodes only. This is to avoid flooding file
 | 
			
		||||
# announcements in the whole network, which leads to high latency or even timeout to sync files.
 | 
			
		||||
neighbors_only = true
 | 
			
		||||
 | 
			
		||||
#  Maximum number of files in sync from other peers simultaneously.
 | 
			
		||||
# max_sync_files = 8
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										34
									
								
								tests/sync_auto_random_v2_test.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										34
									
								
								tests/sync_auto_random_v2_test.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,34 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from test_framework.test_framework import TestFramework
 | 
			
		||||
from utility.utils import wait_until
 | 
			
		||||
 | 
			
		||||
class AutoRandomSyncV2Test(TestFramework):
 | 
			
		||||
    def setup_params(self):
 | 
			
		||||
        self.num_nodes = 4
 | 
			
		||||
 | 
			
		||||
        # Enable random auto sync v2
 | 
			
		||||
        for i in range(self.num_nodes):
 | 
			
		||||
            self.zgs_node_configs[i] = {
 | 
			
		||||
                "sync": {
 | 
			
		||||
                    "auto_sync_enabled": True,
 | 
			
		||||
                    "max_sequential_workers": 0,
 | 
			
		||||
                    "max_random_workers": 3,
 | 
			
		||||
                    "neighbors_only": True,
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
    def run_test(self):
 | 
			
		||||
        # Submit and upload files on node 0
 | 
			
		||||
        data_root_1 = self.__upload_file__(0, 256 * 1024)
 | 
			
		||||
        data_root_2 = self.__upload_file__(0, 256 * 1024)
 | 
			
		||||
 | 
			
		||||
        # Files should be available on other nodes via auto sync
 | 
			
		||||
        for i in range(1, self.num_nodes):
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1) is not None)
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_1)["finalized"])
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2) is not None)
 | 
			
		||||
            wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root_2)["finalized"])
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    AutoRandomSyncV2Test().main()
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user