mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-11-03 08:07:27 +00:00
copy data (#379)
* copy from middle to previous and afterward tx seqs instead of only from the first seq * fix action * fix issue * unnecessary loop * add commont * prevent attack on uploading multiple files to increase db read load * fill all middle seqs
This commit is contained in:
parent
3ba369e9e5
commit
1e18b454de
@ -390,7 +390,7 @@ impl<AppReqId: ReqId> Behaviour<AppReqId> {
|
|||||||
.gossipsub
|
.gossipsub
|
||||||
.publish(topic.clone().into(), message_data.clone())
|
.publish(topic.clone().into(), message_data.clone())
|
||||||
{
|
{
|
||||||
warn!(error = ?e, topic = ?topic.kind(), "Failed to publish message");
|
trace!(error = ?e, topic = ?topic.kind(), "Failed to publish message");
|
||||||
|
|
||||||
// add to metrics
|
// add to metrics
|
||||||
if let Some(v) = metrics::get_int_gauge(
|
if let Some(v) = metrics::get_int_gauge(
|
||||||
|
|||||||
@ -291,6 +291,7 @@ impl LogStoreWrite for LogManager {
|
|||||||
|
|
||||||
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
|
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
|
||||||
if self.check_tx_completed(old_tx_seq)? {
|
if self.check_tx_completed(old_tx_seq)? {
|
||||||
|
// copy and finalize once, then stop
|
||||||
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -315,8 +316,18 @@ impl LogStoreWrite for LogManager {
|
|||||||
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
// Check if there are other same-root transaction not finalized.
|
// Check if there are other same-root transaction not finalized.
|
||||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||||
|
// If this is the first tx with this data root, copy and finalize all same-root txs.
|
||||||
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||||
|
} else {
|
||||||
|
// If this is not the first tx with this data root, and the first one is not finalized.
|
||||||
|
let maybe_first_seq = same_root_seq_list.first().cloned();
|
||||||
|
if let Some(first_seq) = maybe_first_seq {
|
||||||
|
if !self.check_tx_completed(first_seq)? {
|
||||||
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.tx_store.finalize_tx(tx_seq)?;
|
self.tx_store.finalize_tx(tx_seq)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
@ -346,14 +357,25 @@ impl LogStoreWrite for LogManager {
|
|||||||
// TODO: Should we double check the tx merkle root?
|
// TODO: Should we double check the tx merkle root?
|
||||||
let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
|
let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size);
|
||||||
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
|
if self.check_data_completed(tx.start_entry_index, tx_end_index)? {
|
||||||
self.tx_store.finalize_tx(tx_seq)?;
|
|
||||||
let same_root_seq_list = self
|
let same_root_seq_list = self
|
||||||
.tx_store
|
.tx_store
|
||||||
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||||
// Check if there are other same-root transaction not finalized.
|
// Check if there are other same-root transaction not finalized.
|
||||||
|
|
||||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||||
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||||
|
} else {
|
||||||
|
// If this is not the first tx with this data root, copy and finalize the first one.
|
||||||
|
let maybe_first_seq = same_root_seq_list.first().cloned();
|
||||||
|
if let Some(first_seq) = maybe_first_seq {
|
||||||
|
if !self.check_tx_completed(first_seq)? {
|
||||||
|
self.copy_tx_and_finalize(tx_seq, same_root_seq_list)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.tx_store.finalize_tx(tx_seq)?;
|
||||||
|
|
||||||
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
|
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
@ -1173,8 +1195,8 @@ impl LogManager {
|
|||||||
let mut to_tx_offset_list = Vec::with_capacity(to_tx_seq_list.len());
|
let mut to_tx_offset_list = Vec::with_capacity(to_tx_seq_list.len());
|
||||||
|
|
||||||
for seq in to_tx_seq_list {
|
for seq in to_tx_seq_list {
|
||||||
// No need to copy data for completed tx.
|
// No need to copy data for completed tx and itself
|
||||||
if self.check_tx_completed(seq)? {
|
if self.check_tx_completed(seq)? || from_tx_seq == seq {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let tx = self
|
let tx = self
|
||||||
|
|||||||
@ -11,7 +11,7 @@ class PortMin:
|
|||||||
n = 11000
|
n = 11000
|
||||||
|
|
||||||
|
|
||||||
MAX_NODES = 100
|
MAX_NODES = 50
|
||||||
MAX_BLOCKCHAIN_NODES = 50
|
MAX_BLOCKCHAIN_NODES = 50
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user