mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-04-04 15:35:18 +00:00 
			
		
		
		
	Compare commits
	
		
			No commits in common. "f21d691812f0a923e84840b6406e66907bf27bfa" and "40104de8918b5e0696d7cdc1de1eae30634bdc08" have entirely different histories.
		
	
	
		
			f21d691812
			...
			40104de891
		
	
		
| @ -96,8 +96,8 @@ pub use service::{load_private_key, Context, Libp2pEvent, Service, NETWORK_KEY_F | ||||
| /// Defines the current P2P protocol version.
 | ||||
| /// - v1: Broadcast FindFile & AnnounceFile messages in the whole network, which caused network too heavey.
 | ||||
| /// - v2: Publish NewFile to neighbors only and announce file via RPC message.
 | ||||
| pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 1]; | ||||
| pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 1]; | ||||
| pub const PROTOCOL_VERSION_V1: [u8; 3] = [0, 1, 0]; | ||||
| pub const PROTOCOL_VERSION_V2: [u8; 3] = [0, 2, 0]; | ||||
| 
 | ||||
| /// Application level requests sent to the network.
 | ||||
| #[derive(Debug, Clone, Copy)] | ||||
|  | ||||
| @ -700,7 +700,7 @@ impl LogManager { | ||||
|         data_db_source: Arc<dyn ZgsKeyValueDB>, | ||||
|         config: LogConfig, | ||||
|     ) -> Result<Self> { | ||||
|         let tx_store = TransactionStore::new(flow_db_source.clone(), data_db_source.clone())?; | ||||
|         let tx_store = TransactionStore::new(data_db_source.clone())?; | ||||
|         let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone())); | ||||
|         let data_db = Arc::new(FlowDBStore::new(data_db_source.clone())); | ||||
|         let flow_store = Arc::new(FlowStore::new( | ||||
|  | ||||
| @ -57,24 +57,19 @@ pub struct BlockHashAndSubmissionIndex { | ||||
| } | ||||
| 
 | ||||
| pub struct TransactionStore { | ||||
|     flow_kvdb: Arc<dyn ZgsKeyValueDB>, | ||||
|     data_kvdb: Arc<dyn ZgsKeyValueDB>, | ||||
|     kvdb: Arc<dyn ZgsKeyValueDB>, | ||||
|     /// This is always updated before writing the database to ensure no intermediate states.
 | ||||
|     next_tx_seq: AtomicU64, | ||||
| } | ||||
| 
 | ||||
| impl TransactionStore { | ||||
|     pub fn new( | ||||
|         flow_kvdb: Arc<dyn ZgsKeyValueDB>, | ||||
|         data_kvdb: Arc<dyn ZgsKeyValueDB>, | ||||
|     ) -> Result<Self> { | ||||
|         let next_tx_seq = flow_kvdb | ||||
|     pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Result<Self> { | ||||
|         let next_tx_seq = kvdb | ||||
|             .get(COL_TX, NEXT_TX_KEY.as_bytes())? | ||||
|             .map(|a| decode_tx_seq(&a)) | ||||
|             .unwrap_or(Ok(0))?; | ||||
|         Ok(Self { | ||||
|             flow_kvdb, | ||||
|             data_kvdb, | ||||
|             kvdb, | ||||
|             next_tx_seq: AtomicU64::new(next_tx_seq), | ||||
|         }) | ||||
|     } | ||||
| @ -91,7 +86,7 @@ impl TransactionStore { | ||||
|             return Ok(old_tx_seq_list); | ||||
|         } | ||||
| 
 | ||||
|         let mut db_tx = self.flow_kvdb.transaction(); | ||||
|         let mut db_tx = self.kvdb.transaction(); | ||||
|         if !tx.data.is_empty() { | ||||
|             tx.size = tx.data.len() as u64; | ||||
|             let mut padded_data = tx.data.clone(); | ||||
| @ -118,7 +113,7 @@ impl TransactionStore { | ||||
|             &new_tx_seq_list.as_ssz_bytes(), | ||||
|         ); | ||||
|         self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); | ||||
|         self.flow_kvdb.write(db_tx)?; | ||||
|         self.kvdb.write(db_tx)?; | ||||
|         metrics::TX_STORE_PUT.update_since(start_time); | ||||
|         Ok(old_tx_seq_list) | ||||
|     } | ||||
| @ -128,7 +123,7 @@ impl TransactionStore { | ||||
|         if seq >= self.next_tx_seq() { | ||||
|             return Ok(None); | ||||
|         } | ||||
|         let value = try_option!(self.flow_kvdb.get(COL_TX, &seq.to_be_bytes())?); | ||||
|         let value = try_option!(self.kvdb.get(COL_TX, &seq.to_be_bytes())?); | ||||
|         let tx = Transaction::from_ssz_bytes(&value).map_err(Error::from)?; | ||||
|         metrics::TX_BY_SEQ_NUMBER.update_since(start_time); | ||||
|         Ok(Some(tx)) | ||||
| @ -137,16 +132,15 @@ impl TransactionStore { | ||||
|     pub fn remove_tx_after(&self, min_seq: u64) -> Result<Vec<Transaction>> { | ||||
|         let mut removed_txs = Vec::new(); | ||||
|         let max_seq = self.next_tx_seq(); | ||||
|         let mut flow_db_tx = self.flow_kvdb.transaction(); | ||||
|         let mut data_db_tx = self.data_kvdb.transaction(); | ||||
|         let mut db_tx = self.kvdb.transaction(); | ||||
|         let mut modified_merkle_root_map = HashMap::new(); | ||||
|         for seq in min_seq..max_seq { | ||||
|             let Some(tx) = self.get_tx_by_seq_number(seq)? else { | ||||
|                 error!(?seq, ?max_seq, "Transaction missing before the end"); | ||||
|                 break; | ||||
|             }; | ||||
|             flow_db_tx.delete(COL_TX, &seq.to_be_bytes()); | ||||
|             data_db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes()); | ||||
|             db_tx.delete(COL_TX, &seq.to_be_bytes()); | ||||
|             db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes()); | ||||
|             // We only remove tx when the blockchain reorgs.
 | ||||
|             // If a tx is reverted, all data after it will also be reverted, so we call remove
 | ||||
|             // all indices after it.
 | ||||
| @ -161,25 +155,24 @@ impl TransactionStore { | ||||
|         } | ||||
|         for (merkle_root, tx_seq_list) in modified_merkle_root_map { | ||||
|             if tx_seq_list.is_empty() { | ||||
|                 flow_db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes()); | ||||
|                 db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes()); | ||||
|             } else { | ||||
|                 flow_db_tx.put( | ||||
|                 db_tx.put( | ||||
|                     COL_TX_DATA_ROOT_INDEX, | ||||
|                     merkle_root.as_bytes(), | ||||
|                     &tx_seq_list.as_ssz_bytes(), | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|         flow_db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes()); | ||||
|         db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes()); | ||||
|         self.next_tx_seq.store(min_seq, Ordering::SeqCst); | ||||
|         self.data_kvdb.write(data_db_tx)?; | ||||
|         self.flow_kvdb.write(flow_db_tx)?; | ||||
|         self.kvdb.write(db_tx)?; | ||||
|         Ok(removed_txs) | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result<Vec<u64>> { | ||||
|         let value = match self | ||||
|             .flow_kvdb | ||||
|             .kvdb | ||||
|             .get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())? | ||||
|         { | ||||
|             Some(v) => v, | ||||
| @ -190,7 +183,7 @@ impl TransactionStore { | ||||
| 
 | ||||
|     #[instrument(skip(self))] | ||||
|     pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { | ||||
|         Ok(self.data_kvdb.put( | ||||
|         Ok(self.kvdb.put( | ||||
|             COL_TX_COMPLETED, | ||||
|             &tx_seq.to_be_bytes(), | ||||
|             &[TxStatus::Finalized.into()], | ||||
| @ -199,7 +192,7 @@ impl TransactionStore { | ||||
| 
 | ||||
|     #[instrument(skip(self))] | ||||
|     pub fn prune_tx(&self, tx_seq: u64) -> Result<()> { | ||||
|         Ok(self.data_kvdb.put( | ||||
|         Ok(self.kvdb.put( | ||||
|             COL_TX_COMPLETED, | ||||
|             &tx_seq.to_be_bytes(), | ||||
|             &[TxStatus::Pruned.into()], | ||||
| @ -207,9 +200,7 @@ impl TransactionStore { | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> { | ||||
|         let value = try_option!(self | ||||
|             .data_kvdb | ||||
|             .get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?); | ||||
|         let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?); | ||||
|         match value.first() { | ||||
|             Some(v) => Ok(Some(TxStatus::try_from(*v)?)), | ||||
|             None => Ok(None), | ||||
| @ -248,14 +239,14 @@ impl TransactionStore { | ||||
|                 (progress.1, p).as_ssz_bytes(), | ||||
|             )); | ||||
|         } | ||||
|         Ok(self.flow_kvdb.puts(items)?) | ||||
|         Ok(self.kvdb.puts(items)?) | ||||
|     } | ||||
| 
 | ||||
|     #[instrument(skip(self))] | ||||
|     pub fn get_progress(&self) -> Result<Option<(u64, H256)>> { | ||||
|         Ok(Some( | ||||
|             <(u64, H256)>::from_ssz_bytes(&try_option!(self | ||||
|                 .flow_kvdb | ||||
|                 .kvdb | ||||
|                 .get(COL_MISC, LOG_SYNC_PROGRESS_KEY.as_bytes())?)) | ||||
|             .map_err(Error::from)?, | ||||
|         )) | ||||
| @ -263,7 +254,7 @@ impl TransactionStore { | ||||
| 
 | ||||
|     #[instrument(skip(self))] | ||||
|     pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { | ||||
|         Ok(self.flow_kvdb.put( | ||||
|         Ok(self.kvdb.put( | ||||
|             COL_MISC, | ||||
|             LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(), | ||||
|             &block_number.as_ssz_bytes(), | ||||
| @ -274,7 +265,7 @@ impl TransactionStore { | ||||
|     pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> { | ||||
|         Ok(Some( | ||||
|             <u64>::from_ssz_bytes(&try_option!(self | ||||
|                 .flow_kvdb | ||||
|                 .kvdb | ||||
|                 .get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?)) | ||||
|             .map_err(Error::from)?, | ||||
|         )) | ||||
| @ -286,7 +277,7 @@ impl TransactionStore { | ||||
|     ) -> Result<Option<(H256, Option<u64>)>> { | ||||
|         Ok(Some( | ||||
|             <(H256, Option<u64>)>::from_ssz_bytes(&try_option!(self | ||||
|                 .flow_kvdb | ||||
|                 .kvdb | ||||
|                 .get(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?)) | ||||
|             .map_err(Error::from)?, | ||||
|         )) | ||||
| @ -294,7 +285,7 @@ impl TransactionStore { | ||||
| 
 | ||||
|     pub fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>> { | ||||
|         let mut block_numbers = vec![]; | ||||
|         for r in self.flow_kvdb.iter(COL_BLOCK_PROGRESS) { | ||||
|         for r in self.kvdb.iter(COL_BLOCK_PROGRESS) { | ||||
|             let (key, val) = r?; | ||||
|             let block_number = | ||||
|                 u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?); | ||||
| @ -314,7 +305,7 @@ impl TransactionStore { | ||||
| 
 | ||||
|     pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { | ||||
|         Ok(self | ||||
|             .flow_kvdb | ||||
|             .kvdb | ||||
|             .delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?) | ||||
|     } | ||||
| 
 | ||||
|  | ||||
| @ -42,7 +42,7 @@ class ContractProxy: | ||||
| 
 | ||||
|         contract = self._get_contract(node_idx) | ||||
|      | ||||
|         return getattr(contract.events, event_name).create_filter(from_block=0, to_block="latest").get_all_entries() | ||||
|         return getattr(contract.events, event_name).create_filter(fromBlock =0, toBlock="latest").get_all_entries() | ||||
| 
 | ||||
|     def transfer(self, value, node_idx = 0): | ||||
|         tx_params = copy(TX_PARAMS) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user