mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-11-29 20:37:26 +00:00
debug api
This commit is contained in:
parent
bc21520ec5
commit
de620eee09
@ -62,6 +62,9 @@ pub trait Rpc {
|
||||
#[method(name = "getDataByNodeIndex")]
|
||||
async fn get_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<Vec<u8>>>>;
|
||||
|
||||
#[method(name = "getMetaDataByNodeIndex")]
|
||||
async fn get_meta_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<String>>;
|
||||
|
||||
#[method(name = "checkFileFinalized")]
|
||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
|
||||
|
||||
@ -90,7 +93,4 @@ pub trait Rpc {
|
||||
|
||||
#[method(name = "getFlowContext")]
|
||||
async fn get_flow_context(&self) -> RpcResult<(H256, u64)>;
|
||||
|
||||
#[method(name = "getChunkByIndex")]
|
||||
async fn get_chunk_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<u8>>>;
|
||||
}
|
||||
|
||||
@ -168,6 +168,20 @@ impl RpcServer for RpcServerImpl {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn get_meta_data_by_node_index(&self, node_index: u64) -> RpcResult<Option<String>> {
|
||||
debug!(%node_index, "zgs_getMetaDataByNodeIndex");
|
||||
|
||||
// Get the EntryBatch for the given segment/chunk index
|
||||
let entry_batch = self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_data_by_node_index(node_index)
|
||||
.await?;
|
||||
|
||||
let res = format!("{:?}", entry_batch);
|
||||
Ok(Some(res))
|
||||
}
|
||||
|
||||
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>> {
|
||||
debug!(?tx_seq_or_root, "zgs_checkFileFinalized");
|
||||
|
||||
@ -256,16 +270,6 @@ impl RpcServer for RpcServerImpl {
|
||||
async fn get_flow_context(&self) -> RpcResult<(H256, u64)> {
|
||||
Ok(self.ctx.log_store.get_context().await?)
|
||||
}
|
||||
|
||||
async fn get_chunk_by_node_index(&self, node_index: u64) -> RpcResult<Option<Vec<u8>>> {
|
||||
debug!(%node_index, "zgs_getChunkByNodeIndex");
|
||||
let chunk = self
|
||||
.ctx
|
||||
.log_store
|
||||
.get_chunk_by_flow_index(node_index, 1)
|
||||
.await?;
|
||||
Ok(chunk.map(|c| c.data))
|
||||
}
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
|
||||
@ -17,10 +17,11 @@ use append_merkle::{
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use kvdb::DBTransaction;
|
||||
use parking_lot::RwLock;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use shared_types::{ChunkArray, DataRoot, FlowProof};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
@ -419,11 +420,33 @@ pub struct PadPair {
|
||||
|
||||
pub struct FlowDBStore {
|
||||
kvdb: Arc<dyn ZgsKeyValueDB>,
|
||||
// Per-batch mutexes to prevent race conditions on individual batches
|
||||
batch_mutexes: RwLock<HashMap<u64, Arc<Mutex<()>>>>,
|
||||
}
|
||||
|
||||
impl FlowDBStore {
|
||||
pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Self {
|
||||
Self { kvdb }
|
||||
Self {
|
||||
kvdb,
|
||||
batch_mutexes: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_batch_mutex(&self, batch_index: u64) -> Arc<Mutex<()>> {
|
||||
let read_guard = self.batch_mutexes.read();
|
||||
if let Some(mutex) = read_guard.get(&batch_index) {
|
||||
return mutex.clone();
|
||||
}
|
||||
drop(read_guard);
|
||||
|
||||
let mut write_guard = self.batch_mutexes.write();
|
||||
// Double-check pattern
|
||||
if let Some(mutex) = write_guard.get(&batch_index) {
|
||||
return mutex.clone();
|
||||
}
|
||||
let mutex = Arc::new(Mutex::new(()));
|
||||
write_guard.insert(batch_index, mutex.clone());
|
||||
mutex
|
||||
}
|
||||
|
||||
fn put_entry_batch_list(
|
||||
@ -432,6 +455,18 @@ impl FlowDBStore {
|
||||
) -> Result<Vec<(u64, DataRoot)>> {
|
||||
let start_time = Instant::now();
|
||||
let mut completed_batches = Vec::new();
|
||||
|
||||
// Collect all mutexes and locks first to avoid deadlocks
|
||||
let batch_mutexes: Vec<_> = batch_list
|
||||
.iter()
|
||||
.map(|(batch_index, _)| (*batch_index, self.get_batch_mutex(*batch_index)))
|
||||
.collect();
|
||||
|
||||
let _locks: Vec<_> = batch_mutexes
|
||||
.iter()
|
||||
.map(|(_, mutex)| mutex.lock())
|
||||
.collect();
|
||||
|
||||
let mut tx = self.kvdb.transaction();
|
||||
|
||||
for (batch_index, batch) in batch_list {
|
||||
@ -446,11 +481,23 @@ impl FlowDBStore {
|
||||
}
|
||||
}
|
||||
self.kvdb.write(tx)?;
|
||||
// Locks are dropped here
|
||||
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
|
||||
Ok(completed_batches)
|
||||
}
|
||||
|
||||
fn put_entry_raw(&self, batch_list: Vec<(u64, EntryBatch)>) -> Result<()> {
|
||||
// Collect all mutexes and locks first to avoid deadlocks
|
||||
let batch_mutexes: Vec<_> = batch_list
|
||||
.iter()
|
||||
.map(|(batch_index, _)| (*batch_index, self.get_batch_mutex(*batch_index)))
|
||||
.collect();
|
||||
|
||||
let _locks: Vec<_> = batch_mutexes
|
||||
.iter()
|
||||
.map(|(_, mutex)| mutex.lock())
|
||||
.collect();
|
||||
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for (batch_index, batch) in batch_list {
|
||||
tx.put(
|
||||
@ -460,6 +507,7 @@ impl FlowDBStore {
|
||||
);
|
||||
}
|
||||
self.kvdb.write(tx)?;
|
||||
// Locks are dropped here
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user