mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-01-23 13:36:08 +00:00
feat: file location admin rpc; refactor: all_shards_available (#134)
* feat: find file rpc * refactor: all_shards_available * fix: fmt * chore: remove rpc trace
This commit is contained in:
parent
a5f95e2e7b
commit
085c34beb0
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -2353,6 +2353,7 @@ dependencies = [
|
||||
"parking_lot 0.12.3",
|
||||
"priority-queue",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"shared_types",
|
||||
"storage",
|
||||
"tracing",
|
||||
@ -6308,6 +6309,7 @@ dependencies = [
|
||||
"append_merkle",
|
||||
"base64 0.13.1",
|
||||
"chunk_pool",
|
||||
"file_location_cache",
|
||||
"futures",
|
||||
"futures-channel",
|
||||
"jsonrpsee",
|
||||
|
@ -12,3 +12,4 @@ rand = "0.8.5"
|
||||
tracing = "0.1.35"
|
||||
priority-queue = "1.2.3"
|
||||
shared_types = { path = "../shared_types" }
|
||||
serde = { version = "1.0.137", features = ["derive"] }
|
||||
|
@ -276,6 +276,13 @@ impl Default for FileLocationCache {
|
||||
}
|
||||
|
||||
impl FileLocationCache {
|
||||
pub fn new(config: Config) -> Self {
|
||||
FileLocationCache {
|
||||
cache: Mutex::new(FileCache::new(config)),
|
||||
peer_cache: Mutex::new(Default::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&self, announcement: SignedAnnounceFile) {
|
||||
let peer_id = *announcement.peer_id;
|
||||
// FIXME: Check validity.
|
||||
|
@ -1,8 +1,12 @@
|
||||
mod file_location_cache;
|
||||
pub mod test_util;
|
||||
|
||||
use serde::Deserialize;
|
||||
|
||||
pub use crate::file_location_cache::FileLocationCache;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct Config {
|
||||
pub max_entries_total: usize,
|
||||
pub max_entries_per_file: usize,
|
||||
|
@ -11,6 +11,7 @@ miner = {path = "../miner"}
|
||||
futures = "0.3.21"
|
||||
jsonrpsee = { version = "0.14.0", features = ["full"] }
|
||||
network = { path = "../network" }
|
||||
file_location_cache = { path = "../file_location_cache" }
|
||||
serde = { version = "1.0.137", features = ["derive"] }
|
||||
serde_json = "1.0.82"
|
||||
base64 = "0.13.0"
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::types::{NetworkInfo, PeerInfo};
|
||||
use crate::types::{LocationInfo, NetworkInfo, PeerInfo};
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::proc_macros::rpc;
|
||||
use std::collections::HashMap;
|
||||
@ -6,6 +6,9 @@ use sync::FileSyncInfo;
|
||||
|
||||
#[rpc(server, client, namespace = "admin")]
|
||||
pub trait Rpc {
|
||||
#[method(name = "findFile")]
|
||||
async fn find_file(&self, tx_seq: u64) -> RpcResult<()>;
|
||||
|
||||
#[method(name = "shutdown")]
|
||||
async fn shutdown(&self) -> RpcResult<()>;
|
||||
|
||||
@ -35,4 +38,7 @@ pub trait Rpc {
|
||||
|
||||
#[method(name = "getPeers")]
|
||||
async fn get_peers(&self) -> RpcResult<HashMap<String, PeerInfo>>;
|
||||
|
||||
#[method(name = "getFileLocation")]
|
||||
async fn get_file_location(&self, tx_seq: u64) -> RpcResult<Option<Vec<LocationInfo>>>;
|
||||
}
|
||||
|
@ -1,10 +1,13 @@
|
||||
use super::api::RpcServer;
|
||||
use crate::types::{NetworkInfo, PeerInfo};
|
||||
use crate::types::{LocationInfo, NetworkInfo, PeerInfo};
|
||||
use crate::{error, Context};
|
||||
use futures::prelude::*;
|
||||
use jsonrpsee::core::async_trait;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use network::{multiaddr::Protocol, Multiaddr};
|
||||
use std::collections::HashMap;
|
||||
use std::net::IpAddr;
|
||||
use storage::config::all_shards_available;
|
||||
use sync::{FileSyncInfo, SyncRequest, SyncResponse};
|
||||
use task_executor::ShutdownReason;
|
||||
|
||||
@ -14,6 +17,27 @@ pub struct RpcServerImpl {
|
||||
|
||||
#[async_trait]
|
||||
impl RpcServer for RpcServerImpl {
|
||||
#[tracing::instrument(skip(self), err)]
|
||||
async fn find_file(&self, tx_seq: u64) -> RpcResult<()> {
|
||||
info!("admin_findFile({tx_seq})");
|
||||
|
||||
let response = self
|
||||
.ctx
|
||||
.request_sync(SyncRequest::FindFile { tx_seq })
|
||||
.await?;
|
||||
|
||||
match response {
|
||||
SyncResponse::FindFile { err } => {
|
||||
if err.is_empty() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(error::internal_error(err))
|
||||
}
|
||||
}
|
||||
_ => Err(error::internal_error("unexpected response type")),
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), err)]
|
||||
async fn shutdown(&self) -> RpcResult<()> {
|
||||
info!("admin_shutdown()");
|
||||
@ -160,4 +184,47 @@ impl RpcServer for RpcServerImpl {
|
||||
.map(|(peer_id, info)| (peer_id.to_base58(), info.into()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn get_file_location(&self, tx_seq: u64) -> RpcResult<Option<Vec<LocationInfo>>> {
|
||||
let tx = match self.ctx.log_store.get_tx_by_seq_number(tx_seq).await? {
|
||||
Some(tx) => tx,
|
||||
None => {
|
||||
return Err(error::internal_error("tx not found"));
|
||||
}
|
||||
};
|
||||
let info: Vec<LocationInfo> = self
|
||||
.ctx
|
||||
.file_location_cache
|
||||
.get_all(tx.id())
|
||||
.iter()
|
||||
.map(|announcement| {
|
||||
let multiaddr: Multiaddr = announcement.at.clone().into();
|
||||
let found_ip: Option<IpAddr> =
|
||||
multiaddr
|
||||
.iter()
|
||||
.fold(None, |found_ip, protocol| match protocol {
|
||||
Protocol::Ip4(ip) => Some(ip.into()),
|
||||
Protocol::Ip6(ip) => Some(ip.into()),
|
||||
Protocol::Tcp(_port) => found_ip,
|
||||
_ => found_ip,
|
||||
});
|
||||
(
|
||||
found_ip,
|
||||
self.ctx
|
||||
.file_location_cache
|
||||
.get_peer_config(&announcement.peer_id.clone().into()),
|
||||
)
|
||||
})
|
||||
.filter(|(found_ip, shard_config)| shard_config.is_some() && found_ip.is_some())
|
||||
.map(|(found_ip, shard_config)| LocationInfo {
|
||||
ip: found_ip.unwrap(),
|
||||
shard_config: shard_config.unwrap(),
|
||||
})
|
||||
.collect();
|
||||
if all_shards_available(info.iter().map(|info| info.shard_config).collect()) {
|
||||
Ok(Some(info))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ mod zgs;
|
||||
use crate::miner::RpcServer as MinerRpcServer;
|
||||
use admin::RpcServer as AdminRpcServer;
|
||||
use chunk_pool::MemoryChunkPool;
|
||||
use file_location_cache::FileLocationCache;
|
||||
use futures::channel::mpsc::Sender;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
|
||||
@ -40,6 +41,7 @@ pub use zgs::RpcClient as ZgsRPCClient;
|
||||
#[derive(Clone)]
|
||||
pub struct Context {
|
||||
pub config: RPCConfig,
|
||||
pub file_location_cache: Arc<FileLocationCache>,
|
||||
pub network_globals: Arc<NetworkGlobals>,
|
||||
pub network_send: UnboundedSender<NetworkMessage>,
|
||||
pub sync_send: SyncSender,
|
||||
|
@ -13,6 +13,7 @@ use std::collections::HashSet;
|
||||
use std::hash::Hasher;
|
||||
use std::net::IpAddr;
|
||||
use std::time::Instant;
|
||||
use storage::config::ShardConfig;
|
||||
use storage::log_store::log_manager::bytes_to_entries;
|
||||
use storage::H256;
|
||||
|
||||
@ -287,6 +288,13 @@ impl From<&network::PeerInfo> for PeerInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LocationInfo {
|
||||
pub ip: IpAddr,
|
||||
pub shard_config: ShardConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Client {
|
||||
|
@ -126,8 +126,8 @@ impl ClientBuilder {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn with_file_location_cache(mut self) -> Self {
|
||||
let file_location_cache = Default::default();
|
||||
pub fn with_file_location_cache(mut self, config: file_location_cache::Config) -> Self {
|
||||
let file_location_cache = FileLocationCache::new(config);
|
||||
self.file_location_cache = Some(Arc::new(file_location_cache));
|
||||
self
|
||||
}
|
||||
@ -263,6 +263,7 @@ impl ClientBuilder {
|
||||
let network_send = require!("rpc", self, network).send.clone();
|
||||
let mine_send = self.miner.as_ref().map(|x| x.send.clone());
|
||||
let synced_tx_recv = require!("rpc", self, log_sync).send.subscribe();
|
||||
let file_location_cache = require!("rpc", self, file_location_cache).clone();
|
||||
|
||||
let (chunk_pool, chunk_pool_handler) =
|
||||
chunk_pool::unbounded(chunk_pool_config, async_store.clone(), network_send.clone());
|
||||
@ -273,6 +274,7 @@ impl ClientBuilder {
|
||||
let chunk_pool_clone = chunk_pool.clone();
|
||||
let ctx = rpc::Context {
|
||||
config: rpc_config,
|
||||
file_location_cache,
|
||||
network_globals: require!("rpc", self, network).globals.clone(),
|
||||
network_send,
|
||||
sync_send: require!("rpc", self, sync).send.clone(),
|
||||
|
@ -92,6 +92,9 @@ pub struct ZgsConfig {
|
||||
|
||||
// sync config, configured by [sync] section by `config` crate.
|
||||
pub sync: sync::Config,
|
||||
|
||||
// file location cache config, configured by [file_location_cache] section by `config` crate.
|
||||
pub file_location_cache: file_location_cache::Config,
|
||||
}
|
||||
|
||||
impl Deref for ZgsConfig {
|
||||
|
@ -24,7 +24,7 @@ async fn start_node(context: RuntimeContext, config: ZgsConfig) -> Result<Client
|
||||
.with_rocksdb_store(&storage_config)?
|
||||
.with_log_sync(log_sync_config)
|
||||
.await?
|
||||
.with_file_location_cache()
|
||||
.with_file_location_cache(config.file_location_cache)
|
||||
.with_network(&network_config)
|
||||
.await?
|
||||
.with_sync(config.sync)
|
||||
|
@ -1,6 +1,6 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use std::path::PathBuf;
|
||||
use std::{cell::RefCell, path::PathBuf, rc::Rc};
|
||||
|
||||
pub const SHARD_CONFIG_KEY: &str = "shard_config";
|
||||
|
||||
@ -60,6 +60,10 @@ impl ShardConfig {
|
||||
self.shard_id as u64
|
||||
}
|
||||
|
||||
pub fn is_valid(&self) -> bool {
|
||||
self.num_shard > 0 && self.num_shard.is_power_of_two() && self.shard_id < self.num_shard
|
||||
}
|
||||
|
||||
pub fn in_range(&self, segment_index: u64) -> bool {
|
||||
segment_index as usize % self.num_shard == self.shard_id
|
||||
}
|
||||
@ -87,3 +91,123 @@ impl ShardConfig {
|
||||
current + self.num_shard - shift
|
||||
}
|
||||
}
|
||||
|
||||
struct ShardSegmentTreeNode {
|
||||
pub num_shard: usize,
|
||||
pub covered: bool,
|
||||
pub childs: [Option<Rc<RefCell<ShardSegmentTreeNode>>>; 2],
|
||||
}
|
||||
|
||||
impl ShardSegmentTreeNode {
|
||||
pub fn new(num_shard: usize) -> Self {
|
||||
ShardSegmentTreeNode {
|
||||
num_shard,
|
||||
covered: false,
|
||||
childs: [None, None],
|
||||
}
|
||||
}
|
||||
|
||||
fn push_down(&mut self) {
|
||||
if self.childs[0].is_none() {
|
||||
for i in 0..2 {
|
||||
self.childs[i] = Some(Rc::new(RefCell::new(ShardSegmentTreeNode::new(
|
||||
self.num_shard << 1,
|
||||
))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update(&mut self) {
|
||||
let mut covered = true;
|
||||
for i in 0..2 {
|
||||
if let Some(child) = &self.childs[i] {
|
||||
covered = covered && child.borrow().covered;
|
||||
}
|
||||
}
|
||||
self.covered = covered;
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, num_shard: usize, shard_id: usize) {
|
||||
if self.covered {
|
||||
return;
|
||||
}
|
||||
if num_shard == self.num_shard {
|
||||
self.covered = true;
|
||||
return;
|
||||
}
|
||||
self.push_down();
|
||||
if let Some(child) = &self.childs[shard_id % 2] {
|
||||
child.borrow_mut().insert(num_shard, shard_id >> 1);
|
||||
}
|
||||
self.update();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn all_shards_available(shard_configs: Vec<ShardConfig>) -> bool {
|
||||
let mut root = ShardSegmentTreeNode::new(1);
|
||||
for shard_config in shard_configs.iter() {
|
||||
if !shard_config.is_valid() {
|
||||
continue;
|
||||
}
|
||||
root.insert(shard_config.num_shard, shard_config.shard_id);
|
||||
if root.covered {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::config::all_shards_available;
|
||||
|
||||
use super::ShardConfig;
|
||||
|
||||
#[test]
|
||||
fn test_all_shards_available() {
|
||||
assert!(all_shards_available(vec![
|
||||
ShardConfig {
|
||||
shard_id: 3,
|
||||
num_shard: 8
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 7,
|
||||
num_shard: 8
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 0,
|
||||
num_shard: 4
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 1,
|
||||
num_shard: 4
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 0,
|
||||
num_shard: 2
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 0,
|
||||
num_shard: 1 << 25
|
||||
},
|
||||
]));
|
||||
assert!(!all_shards_available(vec![
|
||||
ShardConfig {
|
||||
shard_id: 0,
|
||||
num_shard: 4
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 1,
|
||||
num_shard: 4
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 3,
|
||||
num_shard: 8
|
||||
},
|
||||
ShardConfig {
|
||||
shard_id: 0,
|
||||
num_shard: 2
|
||||
},
|
||||
]));
|
||||
}
|
||||
}
|
||||
|
@ -3,13 +3,13 @@ use network::{Multiaddr, PeerAction, PeerId};
|
||||
use rand::seq::IteratorRandom;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use shared_types::TxID;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::vec;
|
||||
use storage::config::ShardConfig;
|
||||
use storage::config::{all_shards_available, ShardConfig};
|
||||
|
||||
use crate::context::SyncNetworkContext;
|
||||
use crate::InstantWrapper;
|
||||
@ -169,39 +169,12 @@ impl SyncPeers {
|
||||
}
|
||||
|
||||
pub fn all_shards_available(&self, state: Vec<PeerState>) -> bool {
|
||||
let mut missing_shards = BTreeSet::new();
|
||||
missing_shards.insert(0);
|
||||
let mut num_shards = 1usize;
|
||||
for peer_id in &self.filter_peers(state) {
|
||||
let shard_config = self.peers.get(peer_id).unwrap().shard_config;
|
||||
match shard_config.num_shard.cmp(&num_shards) {
|
||||
Ordering::Equal => {
|
||||
missing_shards.remove(&shard_config.shard_id);
|
||||
}
|
||||
Ordering::Less => {
|
||||
let multi = num_shards / shard_config.num_shard;
|
||||
for i in 0..multi {
|
||||
let shard_id = shard_config.shard_id + i * shard_config.num_shard;
|
||||
missing_shards.remove(&shard_id);
|
||||
}
|
||||
}
|
||||
Ordering::Greater => {
|
||||
let multi = shard_config.num_shard / num_shards;
|
||||
let mut new_missing_shards = BTreeSet::new();
|
||||
for shard_id in &missing_shards {
|
||||
for i in 0..multi {
|
||||
new_missing_shards.insert(*shard_id + i * num_shards);
|
||||
}
|
||||
}
|
||||
new_missing_shards.remove(&shard_config.shard_id);
|
||||
|
||||
missing_shards = new_missing_shards;
|
||||
num_shards = shard_config.num_shard;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("all_shards_available: {} {:?}", num_shards, missing_shards);
|
||||
missing_shards.is_empty()
|
||||
let shard_configs = self
|
||||
.filter_peers(state)
|
||||
.iter()
|
||||
.map(|peer_id| self.peers.get(peer_id).unwrap().shard_config)
|
||||
.collect();
|
||||
all_shards_available(shard_configs)
|
||||
}
|
||||
|
||||
pub fn transition(&mut self) {
|
||||
|
@ -11,12 +11,13 @@ use anyhow::{bail, Result};
|
||||
use file_location_cache::FileLocationCache;
|
||||
use libp2p::swarm::DialError;
|
||||
use log_entry_sync::LogSyncEvent;
|
||||
use network::types::AnnounceChunks;
|
||||
use network::types::{AnnounceChunks, FindFile};
|
||||
use network::PubsubMessage;
|
||||
use network::{
|
||||
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
|
||||
PeerRequestId, SyncId as RequestId,
|
||||
};
|
||||
use shared_types::{bytes_to_chunks, ChunkArrayWithProof, TxID};
|
||||
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, TxID};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
@ -91,6 +92,9 @@ pub enum SyncRequest {
|
||||
FileSyncInfo {
|
||||
tx_seq: Option<u64>,
|
||||
},
|
||||
FindFile {
|
||||
tx_seq: u64,
|
||||
},
|
||||
TerminateFileSync {
|
||||
tx_seq: u64,
|
||||
is_reverted: bool,
|
||||
@ -102,6 +106,7 @@ pub enum SyncResponse {
|
||||
SyncStatus { status: Option<SyncState> },
|
||||
SyncFile { err: String },
|
||||
FileSyncInfo { result: HashMap<u64, FileSyncInfo> },
|
||||
FindFile { err: String },
|
||||
TerminateFileSync { count: usize },
|
||||
}
|
||||
|
||||
@ -345,6 +350,10 @@ impl SyncService {
|
||||
let count = self.on_terminate_file_sync(tx_seq, is_reverted);
|
||||
let _ = sender.send(SyncResponse::TerminateFileSync { count });
|
||||
}
|
||||
SyncRequest::FindFile { tx_seq } => {
|
||||
let result = self.on_find_file_request(tx_seq).await;
|
||||
let _ = sender.send(SyncResponse::FindFile { err: result });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -555,6 +564,30 @@ impl SyncService {
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_find_file_request(&mut self, tx_seq: u64) -> String {
|
||||
match self.on_find_file(tx_seq).await {
|
||||
Ok(()) => "".into(),
|
||||
Err(e) => e.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_find_file(&mut self, tx_seq: u64) -> Result<()> {
|
||||
// file already exists
|
||||
if self.store.check_tx_completed(tx_seq).await? {
|
||||
return Ok(());
|
||||
}
|
||||
// broadcast find file
|
||||
let tx = match self.store.get_tx_by_seq_number(tx_seq).await? {
|
||||
Some(tx) => tx,
|
||||
None => bail!("Transaction not found"),
|
||||
};
|
||||
self.ctx.publish(PubsubMessage::FindFile(FindFile {
|
||||
tx_id: tx.id(),
|
||||
timestamp: timestamp_now(),
|
||||
}));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_start_sync_file(
|
||||
&mut self,
|
||||
tx_seq: u64,
|
||||
|
@ -241,3 +241,24 @@
|
||||
|
||||
# Maximum threads to sync files in sequence.
|
||||
# max_sequential_workers = 8
|
||||
|
||||
#######################################################################
|
||||
### File Location Cache Options ###
|
||||
#######################################################################
|
||||
|
||||
# [file_location_cache]
|
||||
|
||||
# File location cache is a cache that maintains storage positions of files.
|
||||
# Storage location information is represented by the IP address of the storage node and the timestamp indicating when the node declared that it stores the corresponding file.
|
||||
# It has both a global capacity limit and a limit on the capacity for location information of each individual file.
|
||||
# When the cache is full, the storage position information with oldest timestamp will be replaced.
|
||||
|
||||
# Global cache capacity.
|
||||
# max_entries_total = 4096
|
||||
|
||||
# Location information capacity for each file.
|
||||
# max_entries_per_file = 4
|
||||
|
||||
# Validity period of location information.
|
||||
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
|
||||
# entry_expiration_time_secs = 3600
|
Loading…
Reference in New Issue
Block a user