mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
3 Commits
b447b8f894
...
e3e6bf4201
Author | SHA1 | Date | |
---|---|---|---|
![]() |
e3e6bf4201 | ||
![]() |
395aeabde7 | ||
![]() |
3957f8b28d |
@ -13,6 +13,7 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic, SnappyTransform};
|
|||||||
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
|
use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use libp2p::gossipsub::error::PublishError;
|
use libp2p::gossipsub::error::PublishError;
|
||||||
|
use libp2p::gossipsub::TopicScoreParams;
|
||||||
use libp2p::{
|
use libp2p::{
|
||||||
core::{
|
core::{
|
||||||
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
|
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
|
||||||
@ -226,7 +227,30 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
|
|||||||
|
|
||||||
// trace!(behaviour_log, "Using peer score params"; "params" => ?params);
|
// trace!(behaviour_log, "Using peer score params"; "params" => ?params);
|
||||||
|
|
||||||
let params = libp2p::gossipsub::PeerScoreParams::default();
|
let mut params = libp2p::gossipsub::PeerScoreParams::default();
|
||||||
|
let get_hash = |kind: GossipKind| -> TopicHash {
|
||||||
|
let topic: Topic = GossipTopic::new(kind, GossipEncoding::default()).into();
|
||||||
|
topic.hash()
|
||||||
|
};
|
||||||
|
params
|
||||||
|
.topics
|
||||||
|
.insert(get_hash(GossipKind::FindFile), TopicScoreParams::default());
|
||||||
|
params.topics.insert(
|
||||||
|
get_hash(GossipKind::FindChunks),
|
||||||
|
TopicScoreParams::default(),
|
||||||
|
);
|
||||||
|
params.topics.insert(
|
||||||
|
get_hash(GossipKind::AnnounceFile),
|
||||||
|
TopicScoreParams::default(),
|
||||||
|
);
|
||||||
|
params.topics.insert(
|
||||||
|
get_hash(GossipKind::AnnounceShardConfig),
|
||||||
|
TopicScoreParams::default(),
|
||||||
|
);
|
||||||
|
params.topics.insert(
|
||||||
|
get_hash(GossipKind::AnnounceChunks),
|
||||||
|
TopicScoreParams::default(),
|
||||||
|
);
|
||||||
|
|
||||||
// Set up a scoring update interval
|
// Set up a scoring update interval
|
||||||
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
|
||||||
|
@ -269,8 +269,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
|
|
||||||
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
|
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
|
||||||
if self.check_tx_completed(old_tx_seq)? {
|
if self.check_tx_completed(old_tx_seq)? {
|
||||||
self.copy_tx_data(old_tx_seq, vec![tx.seq])?;
|
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
||||||
self.tx_store.finalize_tx(tx.seq)?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -293,7 +292,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
// Check if there are other same-root transaction not finalized.
|
// Check if there are other same-root transaction not finalized.
|
||||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||||
self.copy_tx_data(tx_seq, same_root_seq_list[1..].to_vec())?;
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||||
}
|
}
|
||||||
self.tx_store.finalize_tx(tx_seq)?;
|
self.tx_store.finalize_tx(tx_seq)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -329,7 +328,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
// Check if there are other same-root transaction not finalized.
|
// Check if there are other same-root transaction not finalized.
|
||||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||||
self.copy_tx_data(tx_seq, same_root_seq_list[1..].to_vec())?;
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||||
}
|
}
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
@ -1059,8 +1058,9 @@ impl LogManager {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn copy_tx_data(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
||||||
let mut merkle = self.merkle.write();
|
let mut merkle = self.merkle.write();
|
||||||
|
let shard_config = self.flow_store.get_shard_config();
|
||||||
// We have all the data need for this tx, so just copy them.
|
// We have all the data need for this tx, so just copy them.
|
||||||
let old_tx = self
|
let old_tx = self
|
||||||
.get_tx_by_seq_number(from_tx_seq)?
|
.get_tx_by_seq_number(from_tx_seq)?
|
||||||
@ -1074,6 +1074,12 @@ impl LogManager {
|
|||||||
let tx = self
|
let tx = self
|
||||||
.get_tx_by_seq_number(seq)?
|
.get_tx_by_seq_number(seq)?
|
||||||
.ok_or_else(|| anyhow!("to tx missing"))?;
|
.ok_or_else(|| anyhow!("to tx missing"))?;
|
||||||
|
// Data for `tx` is not available due to sharding.
|
||||||
|
if sector_to_segment(tx.start_entry_index) % shard_config.num_shard
|
||||||
|
!= sector_to_segment(old_tx.start_entry_index) % shard_config.num_shard
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
to_tx_offset_list.push((tx.seq, tx.start_entry_index - old_tx.start_entry_index));
|
to_tx_offset_list.push((tx.seq, tx.start_entry_index - old_tx.start_entry_index));
|
||||||
}
|
}
|
||||||
if to_tx_offset_list.is_empty() {
|
if to_tx_offset_list.is_empty() {
|
||||||
@ -1085,7 +1091,7 @@ impl LogManager {
|
|||||||
old_tx.start_entry_index,
|
old_tx.start_entry_index,
|
||||||
old_tx.start_entry_index + old_tx.num_entries() as u64,
|
old_tx.start_entry_index + old_tx.num_entries() as u64,
|
||||||
PORA_CHUNK_SIZE,
|
PORA_CHUNK_SIZE,
|
||||||
self.flow_store.get_shard_config(),
|
shard_config,
|
||||||
) {
|
) {
|
||||||
let batch_data = self
|
let batch_data = self
|
||||||
.get_chunk_by_flow_index(batch_start, batch_end - batch_start)?
|
.get_chunk_by_flow_index(batch_start, batch_end - batch_start)?
|
||||||
|
Loading…
Reference in New Issue
Block a user