From 5afabc401b3d12b31e3dd843c1a323698a71dccc Mon Sep 17 00:00:00 2001 From: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> Date: Sat, 23 Mar 2024 15:26:57 -0500 Subject: [PATCH] V1.4.12 (#142) * v1.4.12 * small logging adjust --- node/app/node.go | 1 - node/config/version.go | 2 +- .../ceremony_data_clock_consensus_engine.go | 19 +- node/consensus/ceremony/consensus_frames.go | 26 +- node/consensus/ceremony/message_handler.go | 6 + node/consensus/ceremony/peer_messaging.go | 20 +- node/consensus/master/broadcast_messaging.go | 14 + .../master/master_clock_consensus_engine.go | 7 +- .../ceremony/ceremony_execution_engine.go | 394 +++++++-------- node/main.go | 14 + node/store/clock.go | 458 ++++++++++++++---- node/store/data_proof.go | 70 +++ node/store/inmem.go | 4 + node/store/kvdb.go | 1 + node/store/pebble.go | 25 + 15 files changed, 726 insertions(+), 335 deletions(-) diff --git a/node/app/node.go b/node/app/node.go index 61c1496..cc15135 100644 --- a/node/app/node.go +++ b/node/app/node.go @@ -49,7 +49,6 @@ func newNode( logger.Info("running compaction") if err := clockStore.Compact( - bytes.Repeat([]byte{0xff}, 32), intrinsicFilter, ); err != nil { panic(err) diff --git a/node/config/version.go b/node/config/version.go index dbbbcff..95ac354 100644 --- a/node/config/version.go +++ b/node/config/version.go @@ -14,7 +14,7 @@ func GetMinimumVersion() []byte { } func GetVersion() []byte { - return []byte{0x01, 0x04, 0x0B} + return []byte{0x01, 0x04, 0x0C} } func GetVersionString() string { diff --git a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go index ead4442..2e332b0 100644 --- a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go +++ b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go @@ -320,6 +320,7 @@ func (e *CeremonyDataClockConsensusEngine) Start() <-chan error { panic(err) } + e.latestFrameReceived = frame.FrameNumber e.logger.Info( "preparing peer announce", zap.Uint64("frame_number", frame.FrameNumber), @@ -463,9 +464,12 @@ func (e *CeremonyDataClockConsensusEngine) runLoop() { latestFrame = dataFrame } - go func() { - e.frameChan <- latestFrame - }() + if e.latestFrameReceived < latestFrame.FrameNumber { + e.latestFrameReceived = latestFrame.FrameNumber + go func() { + e.frameChan <- latestFrame + }() + } var nextFrame *protobufs.ClockFrame if nextFrame, err = e.prove(latestFrame); err != nil { @@ -505,9 +509,12 @@ func (e *CeremonyDataClockConsensusEngine) runLoop() { } } - go func() { - e.frameChan <- latestFrame - }() + if e.latestFrameReceived < latestFrame.FrameNumber { + e.latestFrameReceived = latestFrame.FrameNumber + go func() { + e.frameChan <- latestFrame + }() + } var nextFrame *protobufs.ClockFrame if nextFrame, err = e.prove(latestFrame); err != nil { diff --git a/node/consensus/ceremony/consensus_frames.go b/node/consensus/ceremony/consensus_frames.go index cd0c76e..2b279d6 100644 --- a/node/consensus/ceremony/consensus_frames.go +++ b/node/consensus/ceremony/consensus_frames.go @@ -513,21 +513,23 @@ func (e *CeremonyDataClockConsensusEngine) collect( latest := currentFramePublished - // With the increase of network size, constrain down to top thirty - for i := 0; i < 30; i++ { + for { peerId, maxFrame, err := e.GetMostAheadPeer() - if err != nil { - e.logger.Warn("no peers available, skipping sync") - break - } else if peerId == nil { - e.logger.Info("currently up to date, skipping sync") - break - } else if maxFrame-2 > latest.FrameNumber { + if maxFrame > latest.FrameNumber { e.syncingStatus = SyncStatusSynchronizing - latest, err = e.sync(latest, maxFrame, peerId) - if err == nil { - break + if err != nil { + e.logger.Info("no peers available for sync, waiting") + time.Sleep(5 * time.Second) + } else if maxFrame-2 > latest.FrameNumber { + latest, err = e.sync(latest, maxFrame, peerId) + if err != nil { + time.Sleep(30 * time.Second) + } else { + break + } } + } else { + break } } diff --git a/node/consensus/ceremony/message_handler.go b/node/consensus/ceremony/message_handler.go index d46a834..dc27bab 100644 --- a/node/consensus/ceremony/message_handler.go +++ b/node/consensus/ceremony/message_handler.go @@ -331,6 +331,12 @@ func (e *CeremonyDataClockConsensusEngine) handleClockFrameData( zap.Uint64("frame_number", frame.FrameNumber), ) + if e.latestFrameReceived < frame.FrameNumber { + e.latestFrameReceived = frame.FrameNumber + go func() { + e.frameChan <- frame + }() + } e.dataTimeReel.Insert(frame, isSync) return nil } diff --git a/node/consensus/ceremony/peer_messaging.go b/node/consensus/ceremony/peer_messaging.go index 8b2f3cf..85e4b19 100644 --- a/node/consensus/ceremony/peer_messaging.go +++ b/node/consensus/ceremony/peer_messaging.go @@ -40,7 +40,7 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( ) error { e.currentReceivingSyncPeersMx.Lock() if e.currentReceivingSyncPeers > int( - memory.TotalMemory()/uint64(2147483648)-4, + memory.TotalMemory()/uint64(4294967296)-4, ) { e.currentReceivingSyncPeersMx.Unlock() @@ -117,7 +117,7 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( // account for skew if dist > 30000 { - e.logger.Warn( + e.logger.Debug( "peer provided challenge with too great of a distance", zap.Int64("distance", dist), ) @@ -128,7 +128,7 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( authentication.Authentication.Response.PublicKey.KeyValue, ) if err != nil { - e.logger.Warn( + e.logger.Debug( "peer provided invalid pubkey", zap.Binary( "public_key", @@ -139,7 +139,7 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( } if !(peer.ID(authentication.Authentication.PeerId)).MatchesPublicKey(key) { - e.logger.Warn( + e.logger.Debug( "peer id does not match pubkey", zap.Binary("peer_id", authentication.Authentication.PeerId), zap.Binary( @@ -155,7 +155,7 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( authentication.Authentication.Response.Signature, ) if err != nil || !b { - e.logger.Warn( + e.logger.Debug( "peer provided invalid signature", zap.Binary("peer_id", authentication.Authentication.PeerId), zap.Binary( @@ -174,7 +174,7 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( authentication.Authentication.PeerId, ) if manifest == nil || manifest.Bandwidth <= 1048576 { - e.logger.Warn( + e.logger.Debug( "peer manifest was null or bandwidth was low", zap.Binary("peer_id", authentication.Authentication.PeerId), ) @@ -243,7 +243,7 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( ParentSelector: selector.FillBytes(make([]byte, 32)), }, ) - rangeSubtract := uint64(16) + rangeSubtract := uint64(4) for { parentNumber := to - uint64(rangeSubtract) @@ -323,9 +323,9 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames( } for { - if to == 0 || to-from > 16 { - if max > from+15 { - to = from + 16 + if to == 0 || to-from > 4 { + if max > from+3 { + to = from + 4 } else { to = max + 1 } diff --git a/node/consensus/master/broadcast_messaging.go b/node/consensus/master/broadcast_messaging.go index be2fe90..53bde8d 100644 --- a/node/consensus/master/broadcast_messaging.go +++ b/node/consensus/master/broadcast_messaging.go @@ -127,6 +127,20 @@ func (e *MasterClockConsensusEngine) handleSelfTestReport( info := e.peerInfoManager.GetPeerInfo(peerID) if info != nil { info.MasterHeadFrame = report.MasterHeadFrame + if info.Bandwidth <= 1048576 { + go func() { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute) + defer cancel() + ch := e.pubSub.GetMultiaddrOfPeerStream(ctx, peerID) + select { + case <-ch: + go func() { + e.bandwidthTestCh <- peerID + }() + case <-ctx.Done(): + } + }() + } return nil } diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index 1d49306..7450c55 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -326,9 +326,10 @@ func (e *MasterClockConsensusEngine) performBandwidthTest(peerID []byte) { cc, err := e.pubSub.GetDirectChannel(peerID, "validation") if err != nil { e.logger.Info( - "could not connect for validation", + "could not connect to peer for validation", zap.String("peer_id", base58.Encode(peerID)), ) + // tag: dusk – nuke this peer for now e.pubSub.SetPeerScore(peerID, -1000) return @@ -360,7 +361,7 @@ func (e *MasterClockConsensusEngine) performBandwidthTest(peerID []byte) { if !bytes.Equal(verification, validation.Validation) { e.logger.Info( - "provided invalid verification", + "peer provided invalid verification", zap.String("peer_id", base58.Encode(peerID)), ) // tag: dusk – nuke this peer for now @@ -370,7 +371,7 @@ func (e *MasterClockConsensusEngine) performBandwidthTest(peerID []byte) { if end-start > 2000 { e.logger.Info( - "slow bandwidth, scoring out", + "peer has slow bandwidth, scoring out", zap.String("peer_id", base58.Encode(peerID)), ) // tag: dusk – nuke this peer for now diff --git a/node/execution/intrinsics/ceremony/ceremony_execution_engine.go b/node/execution/intrinsics/ceremony/ceremony_execution_engine.go index e656d49..1fe04b1 100644 --- a/node/execution/intrinsics/ceremony/ceremony_execution_engine.go +++ b/node/execution/intrinsics/ceremony/ceremony_execution_engine.go @@ -571,243 +571,245 @@ func (e *CeremonyExecutionEngine) ProcessMessage( func (e *CeremonyExecutionEngine) RunWorker() { frameChan := e.clock.GetFrameChannel() for { - frame := <-frameChan - e.activeClockFrame = frame - e.logger.Info( - "evaluating next frame", - zap.Uint64( - "frame_number", - frame.FrameNumber, - ), - ) - app, err := application.MaterializeApplicationFromFrame(frame) - if err != nil { - e.logger.Error( - "error while materializing application from frame", - zap.Error(err), - ) - panic(err) - } - - _, _, reward := app.RewardTrie.Get(e.provingKeyAddress) - _, _, retro := app.RewardTrie.Get(e.peerIdHash) - e.logger.Info( - "current application state", - zap.Uint64("my_balance", reward+retro), - zap.String("lobby_state", app.LobbyState.String()), - ) - - switch app.LobbyState { - case application.CEREMONY_APPLICATION_STATE_OPEN: - e.alreadyPublishedShare = false - e.alreadyPublishedTranscript = false - alreadyJoined := false - for _, join := range app.LobbyJoins { - if bytes.Equal( - join.PublicKeySignatureEd448.PublicKey.KeyValue, - e.proverPublicKey, - ) { - alreadyJoined = true - break - } - } - + select { + case frame := <-frameChan: + e.activeClockFrame = frame e.logger.Info( - "lobby open for joins", - zap.Int("joined_participants", len(app.LobbyJoins)), - zap.Int( - "preferred_participants", - len(app.NextRoundPreferredParticipants), + "evaluating next frame", + zap.Uint64( + "frame_number", + frame.FrameNumber, ), - zap.Bool("in_lobby", alreadyJoined), - zap.Uint64("state_count", app.StateCount), ) - - if !alreadyJoined { - e.logger.Info( - "joining lobby", - zap.Binary("proving_key", e.proverPublicKey), + app, err := application.MaterializeApplicationFromFrame(frame) + if err != nil { + e.logger.Error( + "error while materializing application from frame", + zap.Error(err), ) - if err := e.announceJoin(frame); err != nil { - e.logger.Error( - "failed to announce join", - zap.Error(err), - ) - } - - e.logger.Info("preparing contribution") - // Calculate this now after announcing, this gives 10 frames of buffer - e.ensureSecrets(app) - } - case application.CEREMONY_APPLICATION_STATE_IN_PROGRESS: - inRound := false - for _, p := range app.ActiveParticipants { - if bytes.Equal( - p.PublicKeySignatureEd448.PublicKey.KeyValue, - e.proverPublicKey, - ) { - inRound = true - break - } - } - - if len(e.activeSecrets) == 0 && inRound { - // If we ended up in the scenario where we do not have any secrets - // available but we're in the round, we should politely leave. - e.publishDroppedParticipant(e.proverPublicKey) - continue + panic(err) } + _, _, reward := app.RewardTrie.Get(e.provingKeyAddress) + _, _, retro := app.RewardTrie.Get(e.peerIdHash) e.logger.Info( - "round in progress", - zap.Any("participants", app.ActiveParticipants), - zap.Any( - "current_seen_attestations", - len(app.LatestSeenProverAttestations), - ), - zap.Any( - "current_dropped_attestations", - len(app.DroppedParticipantAttestations), - ), - zap.Any( - "preferred_participants_for_next_round", - len(app.NextRoundPreferredParticipants), - ), - zap.Bool("in_round", inRound), - zap.Uint64("current_sub_round", app.RoundCount), - zap.Uint64("stale_state_count", app.StateCount), + "current application state", + zap.Uint64("my_balance", reward+retro), + zap.String("lobby_state", app.LobbyState.String()), ) - shouldConnect := false - position := 0 - if len(e.peerChannels) == 0 && app.RoundCount == 1 && - len(app.ActiveParticipants) > 1 { - for i, p := range app.ActiveParticipants { + switch app.LobbyState { + case application.CEREMONY_APPLICATION_STATE_OPEN: + e.alreadyPublishedShare = false + e.alreadyPublishedTranscript = false + alreadyJoined := false + for _, join := range app.LobbyJoins { + if bytes.Equal( + join.PublicKeySignatureEd448.PublicKey.KeyValue, + e.proverPublicKey, + ) { + alreadyJoined = true + break + } + } + + e.logger.Info( + "lobby open for joins", + zap.Int("joined_participants", len(app.LobbyJoins)), + zap.Int( + "preferred_participants", + len(app.NextRoundPreferredParticipants), + ), + zap.Bool("in_lobby", alreadyJoined), + zap.Uint64("state_count", app.StateCount), + ) + + if !alreadyJoined { + e.logger.Info( + "joining lobby", + zap.Binary("proving_key", e.proverPublicKey), + ) + if err := e.announceJoin(frame); err != nil { + e.logger.Error( + "failed to announce join", + zap.Error(err), + ) + } + + e.logger.Info("preparing contribution") + // Calculate this now after announcing, this gives 10 frames of buffer + e.ensureSecrets(app) + } + case application.CEREMONY_APPLICATION_STATE_IN_PROGRESS: + inRound := false + for _, p := range app.ActiveParticipants { if bytes.Equal( p.PublicKeySignatureEd448.PublicKey.KeyValue, e.proverPublicKey, ) { - shouldConnect = true - position = i + inRound = true break } } - } - if shouldConnect { - e.logger.Info( - "connecting to peers", - zap.Any("participants", app.ActiveParticipants), - ) - err := e.connectToActivePeers(app, position) - if err != nil { - e.logger.Error("error while connecting to peers", zap.Error(err)) + if len(e.activeSecrets) == 0 && inRound { + // If we ended up in the scenario where we do not have any secrets + // available but we're in the round, we should politely leave. e.publishDroppedParticipant(e.proverPublicKey) continue } - } - if len(e.peerChannels) != 0 { - done := false - rounds := app.TranscriptRoundAdvanceCommits - if len(rounds) != 0 { - for _, c := range rounds[app.RoundCount-1].Commits { + e.logger.Info( + "round in progress", + zap.Any("participants", app.ActiveParticipants), + zap.Any( + "current_seen_attestations", + len(app.LatestSeenProverAttestations), + ), + zap.Any( + "current_dropped_attestations", + len(app.DroppedParticipantAttestations), + ), + zap.Any( + "preferred_participants_for_next_round", + len(app.NextRoundPreferredParticipants), + ), + zap.Bool("in_round", inRound), + zap.Uint64("current_sub_round", app.RoundCount), + zap.Uint64("stale_state_count", app.StateCount), + ) + + shouldConnect := false + position := 0 + if len(e.peerChannels) == 0 && app.RoundCount == 1 && + len(app.ActiveParticipants) > 1 { + for i, p := range app.ActiveParticipants { if bytes.Equal( - c.ProverSignature.PublicKey.KeyValue, + p.PublicKeySignatureEd448.PublicKey.KeyValue, e.proverPublicKey, ) { - done = true + shouldConnect = true + position = i + break } } } - if !done { + if shouldConnect { e.logger.Info( - "participating in round", + "connecting to peers", zap.Any("participants", app.ActiveParticipants), - zap.Uint64("current_round", app.RoundCount), ) - err := e.participateRound(app) + err := e.connectToActivePeers(app, position) if err != nil { - e.logger.Error("error while participating in round", zap.Error(err)) + e.logger.Error("error while connecting to peers", zap.Error(err)) e.publishDroppedParticipant(e.proverPublicKey) + continue } } - } else if len(app.ActiveParticipants) == 1 && - bytes.Equal( - app.ActiveParticipants[0].PublicKeySignatureEd448.PublicKey.KeyValue, - e.proverPublicKey, - ) { - if err = e.commitRound(e.activeSecrets); err != nil { - e.logger.Error("error while participating in round", zap.Error(err)) - } - } - case application.CEREMONY_APPLICATION_STATE_FINALIZING: - e.logger.Info( - "round contribution finalizing", - zap.Any("participants", len(app.ActiveParticipants)), - zap.Any( - "current_seen_attestations", - len(app.LatestSeenProverAttestations), - ), - zap.Any( - "current_dropped_attestations", - len(app.DroppedParticipantAttestations), - ), - zap.Any( - "preferred_participants_for_next_round", - len(app.NextRoundPreferredParticipants), - ), - zap.Int("finalized_shares", len(app.TranscriptShares)), - ) - for _, s := range app.TranscriptShares { - if bytes.Equal( - s.ProverSignature.PublicKey.KeyValue, - e.proverPublicKey, - ) { - e.alreadyPublishedShare = true - } - } + if len(e.peerChannels) != 0 { + done := false + rounds := app.TranscriptRoundAdvanceCommits + if len(rounds) != 0 { + for _, c := range rounds[app.RoundCount-1].Commits { + if bytes.Equal( + c.ProverSignature.PublicKey.KeyValue, + e.proverPublicKey, + ) { + done = true + } + } + } - shouldPublish := false - for _, p := range app.ActiveParticipants { - if bytes.Equal( - p.PublicKeySignatureEd448.PublicKey.KeyValue, - e.proverPublicKey, - ) { - shouldPublish = true - break + if !done { + e.logger.Info( + "participating in round", + zap.Any("participants", app.ActiveParticipants), + zap.Uint64("current_round", app.RoundCount), + ) + err := e.participateRound(app) + if err != nil { + e.logger.Error("error while participating in round", zap.Error(err)) + e.publishDroppedParticipant(e.proverPublicKey) + } + } + } else if len(app.ActiveParticipants) == 1 && + bytes.Equal( + app.ActiveParticipants[0].PublicKeySignatureEd448.PublicKey.KeyValue, + e.proverPublicKey, + ) { + if err = e.commitRound(e.activeSecrets); err != nil { + e.logger.Error("error while participating in round", zap.Error(err)) + } } - } + case application.CEREMONY_APPLICATION_STATE_FINALIZING: + e.logger.Info( + "round contribution finalizing", + zap.Any("participants", len(app.ActiveParticipants)), + zap.Any( + "current_seen_attestations", + len(app.LatestSeenProverAttestations), + ), + zap.Any( + "current_dropped_attestations", + len(app.DroppedParticipantAttestations), + ), + zap.Any( + "preferred_participants_for_next_round", + len(app.NextRoundPreferredParticipants), + ), + zap.Int("finalized_shares", len(app.TranscriptShares)), + ) - if !e.alreadyPublishedShare && shouldPublish { - if len(e.activeSecrets) == 0 { - e.publishDroppedParticipant(e.proverPublicKey) - continue + for _, s := range app.TranscriptShares { + if bytes.Equal( + s.ProverSignature.PublicKey.KeyValue, + e.proverPublicKey, + ) { + e.alreadyPublishedShare = true + } } - err := e.publishTranscriptShare(app) - if err != nil { - e.logger.Error( - "error while publishing transcript share", - zap.Error(err), - ) - } - } - case application.CEREMONY_APPLICATION_STATE_VALIDATING: - e.logger.Info("round contribution validating") - e.alreadyPublishedShare = false - for _, c := range e.peerChannels { - c.Close() - } - e.peerChannels = map[string]*p2p.PublicP2PChannel{} - if app.UpdatedTranscript != nil && !e.alreadyPublishedTranscript { - if err := e.publishTranscript(app); err != nil { - e.logger.Error( - "error while publishing transcript", - zap.Error(err), - ) + shouldPublish := false + for _, p := range app.ActiveParticipants { + if bytes.Equal( + p.PublicKeySignatureEd448.PublicKey.KeyValue, + e.proverPublicKey, + ) { + shouldPublish = true + break + } + } + + if !e.alreadyPublishedShare && shouldPublish { + if len(e.activeSecrets) == 0 { + e.publishDroppedParticipant(e.proverPublicKey) + continue + } + err := e.publishTranscriptShare(app) + if err != nil { + e.logger.Error( + "error while publishing transcript share", + zap.Error(err), + ) + } + } + case application.CEREMONY_APPLICATION_STATE_VALIDATING: + e.logger.Info("round contribution validating") + e.alreadyPublishedShare = false + for _, c := range e.peerChannels { + c.Close() + } + + e.peerChannels = map[string]*p2p.PublicP2PChannel{} + if app.UpdatedTranscript != nil && !e.alreadyPublishedTranscript { + if err := e.publishTranscript(app); err != nil { + e.logger.Error( + "error while publishing transcript", + zap.Error(err), + ) + } } } } diff --git a/node/main.go b/node/main.go index ef1309e..ec9b1b4 100644 --- a/node/main.go +++ b/node/main.go @@ -60,6 +60,11 @@ var ( false, "print the peer id to stdout from the config and exit", ) + cpuprofile = flag.String( + "cpuprofile", + "", + "write cpu profile to file", + ) memprofile = flag.String( "memprofile", "", @@ -84,6 +89,15 @@ func main() { }() } + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + if *balance { config, err := config.LoadConfig(*configDirectory, "") if err != nil { diff --git a/node/store/clock.go b/node/store/clock.go index 1e4d0a5..0fc6804 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -4,12 +4,14 @@ import ( "bytes" "encoding/binary" "math/big" + "sort" "github.com/cockroachdb/pebble" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/protobuf/proto" + "source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" "source.quilibrium.com/quilibrium/monorepo/node/tries" ) @@ -74,7 +76,6 @@ type ClockStore interface { ResetMasterClockFrames(filter []byte) error ResetDataClockFrames(filter []byte) error Compact( - masterFilter []byte, dataFilter []byte, ) error GetTotalDistance( @@ -281,6 +282,7 @@ const CLOCK_DATA_FRAME_DATA = 0x01 const CLOCK_DATA_FRAME_CANDIDATE_DATA = 0x02 const CLOCK_DATA_FRAME_FRECENCY_DATA = 0x03 const CLOCK_DATA_FRAME_DISTANCE_DATA = 0x04 +const CLOCK_COMPACTION_DATA = 0x05 const CLOCK_MASTER_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_MASTER_FRAME_DATA const CLOCK_MASTER_FRAME_INDEX_LATEST = 0x20 | CLOCK_MASTER_FRAME_DATA const CLOCK_MASTER_FRAME_INDEX_PARENT = 0x30 | CLOCK_MASTER_FRAME_DATA @@ -1200,85 +1202,362 @@ func (p *PebbleClockStore) ResetDataClockFrames(filter []byte) error { } func (p *PebbleClockStore) Compact( - masterFilter []byte, dataFilter []byte, ) error { - if masterFilter != nil { - if err := p.db.Compact( - clockMasterFrameKey(masterFilter, 0), - clockMasterFrameKey(masterFilter, 1000000), - true, - ); err != nil { - return errors.Wrap(err, "compact") + version, closer, err := p.db.Get([]byte{CLOCK_COMPACTION_DATA}) + cleared := true + if err != nil { + cleared = false + } else { + if bytes.Compare(version, config.GetVersion()) < 0 { + cleared = false } + closer.Close() } - // If this node has been around since the early days, this is going to free - // up a lot of cruft. - if err := p.db.DeleteRange( - clockDataCandidateFrameKey( - make([]byte, 32), - 0, - make([]byte, 32), - make([]byte, 32), - ), - clockDataCandidateFrameKey( - bytes.Repeat([]byte{0xff}, 32), - 1000000, - bytes.Repeat([]byte{0xff}, 32), - bytes.Repeat([]byte{0xff}, 32), - ), - ); err != nil { - return errors.Wrap(err, "compact") - } - if err := p.db.Compact( - clockDataCandidateFrameKey( - make([]byte, 32), - 0, - make([]byte, 32), - make([]byte, 32), - ), - clockDataCandidateFrameKey( - bytes.Repeat([]byte{0xff}, 32), - 1000000, - bytes.Repeat([]byte{0xff}, 32), - bytes.Repeat([]byte{0xff}, 32), - ), - true, - ); err != nil { - return errors.Wrap(err, "compact") - } - - if dataFilter != nil { - if err := p.db.Compact( - dataProofMetadataKey( - dataFilter, - make([]byte, 74), - ), - dataProofMetadataKey( - dataFilter, - bytes.Repeat([]byte{0xff}, 74), - ), - true, - ); err != nil { - return errors.Wrap(err, "compact") - } - - if err := p.db.Compact( - dataProofInclusionKey( - dataFilter, - make([]byte, 74), + if !cleared { + // If this node has been around since the early days, this is going to free + // up a lot of cruft. + if err := p.db.DeleteRange( + clockDataCandidateFrameKey( + make([]byte, 32), 0, + make([]byte, 32), + make([]byte, 32), ), - dataProofInclusionKey( - dataFilter, - bytes.Repeat([]byte{0xff}, 74), - 20000, + clockDataCandidateFrameKey( + bytes.Repeat([]byte{0xff}, 32), + 1000000, + bytes.Repeat([]byte{0xff}, 32), + bytes.Repeat([]byte{0xff}, 32), + ), + ); err != nil { + return errors.Wrap(err, "compact") + } + if err := p.db.Compact( + clockDataCandidateFrameKey( + make([]byte, 32), + 0, + make([]byte, 32), + make([]byte, 32), + ), + clockDataCandidateFrameKey( + bytes.Repeat([]byte{0xff}, 32), + 1000000, + bytes.Repeat([]byte{0xff}, 32), + bytes.Repeat([]byte{0xff}, 32), ), true, ); err != nil { return errors.Wrap(err, "compact") } + } + + if dataFilter != nil && !cleared { + parents := [][]byte{} + proofs := map[string]struct{}{} + commits := map[string]struct{}{} + data := map[string]struct{}{} + + idxValue, closer, err := p.db.Get(clockDataLatestIndex(dataFilter)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return errors.Wrap(err, "compact") + } + + return errors.Wrap(err, "compact") + } + + last := binary.BigEndian.Uint64(idxValue) + + closer.Close() + + for frameNumber := uint64(1); frameNumber <= last; frameNumber++ { + value, closer, err := p.db.Get(clockDataFrameKey(dataFilter, frameNumber)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return errors.Wrap(err, "compact") + } + + return errors.Wrap(err, "compact") + } + + frame := &protobufs.ClockFrame{} + + if len(value) == (len(dataFilter) + 42) { + frameValue, frameCloser, err := p.db.Get(value) + if err != nil { + return errors.Wrap(err, "compact") + } + if err := proto.Unmarshal(frameValue, frame); err != nil { + return errors.Wrap(err, "compact") + } + + selector, err := frame.GetSelector() + if err != nil { + panic(err) + } + + parents = append(parents, + clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes( + make([]byte, 32), + )), + ) + + closer.Close() + frameCloser.Close() + } else { + if err := proto.Unmarshal(value, frame); err != nil { + return errors.Wrap(err, "compact") + } + + selector, err := frame.GetSelector() + if err != nil { + panic(err) + } + + err = p.db.Set( + clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes( + make([]byte, 32), + )), + value, + ) + if err != nil { + return errors.Wrap(err, "compact") + } + + err = p.db.Set( + clockDataFrameKey(dataFilter, frameNumber), + clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes( + make([]byte, 32), + )), + ) + + parents = append(parents, + clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes( + make([]byte, 32), + )), + ) + + closer.Close() + } + + for i := 0; i < len(frame.Input[516:])/74; i++ { + p.logger.Info( + "preparing indexes for frame compaction", + zap.Uint64("frame_number", frameNumber), + zap.Uint64("max_frame_number", last), + ) + commit := frame.Input[516+(i*74) : 516+((i+1)*74)] + frameProofs, frameCommits, frameData, err := + internalListAggregateProofKeys( + p.db, + dataFilter, + commit, + frameNumber, + ) + if err != nil { + return errors.Wrap(err, "compact") + } + for _, proof := range frameProofs { + proof := proof + proofs[string(proof)] = struct{}{} + } + for _, comm := range frameCommits { + comm := comm + commits[string(comm)] = struct{}{} + } + for _, d := range frameData { + d := d + data[string(d)] = struct{}{} + } + } + } + + p.logger.Info("sorting indexes for bulk clear") + + sortedProofKeys := [][]byte{} + for k := range proofs { + k := k + sortedProofKeys = append(sortedProofKeys, []byte(k)) + } + proofs = nil + sort.Slice(sortedProofKeys, func(i, j int) bool { + return bytes.Compare(sortedProofKeys[i], sortedProofKeys[j]) < 0 + }) + + sortedCommitKeys := [][]byte{} + for k := range commits { + k := k + sortedCommitKeys = append(sortedCommitKeys, []byte(k)) + } + commits = nil + sort.Slice(sortedCommitKeys, func(i, j int) bool { + return bytes.Compare(sortedCommitKeys[i], sortedCommitKeys[j]) < 0 + }) + + sortedDataKeys := [][]byte{} + for k := range data { + k := k + sortedDataKeys = append(sortedDataKeys, []byte(k)) + } + data = nil + sort.Slice(sortedDataKeys, func(i, j int) bool { + return bytes.Compare(sortedDataKeys[i], sortedDataKeys[j]) < 0 + }) + + for i := uint64(0); i < uint64(len(parents)); i++ { + p.logger.Info( + "clearing orphaned frames for frame number", + zap.Uint64("frame_number", i+1), + zap.Int("max_frame_number", len(parents)), + ) + pre := clockDataParentIndexKey( + dataFilter, + i+1, + bytes.Repeat([]byte{0x00}, 32), + ) + + err := p.db.DeleteRange( + pre, + parents[i], + ) + if err != nil { + return errors.Wrap(err, "compact") + } + start := new(big.Int).SetBytes(parents[i]) + start.Add(start, big.NewInt(1)) + startBytes := start.FillBytes(make([]byte, len(parents[i]))) + post := clockDataParentIndexKey( + dataFilter, + i+1, + bytes.Repeat([]byte{0xff}, 32), + ) + + err = p.db.DeleteRange( + startBytes, + post, + ) + if err != nil { + return errors.Wrap(err, "compact") + } + } + + for i := -1; i < len(sortedProofKeys); i++ { + p.logger.Info( + "clearing orphaned proof metadata", + zap.Int("proof_range_index", i+1), + zap.Int("max_proof_range_index", len(sortedProofKeys)), + ) + var start, end []byte + if i == -1 { + start = dataProofMetadataKey( + dataFilter, + bytes.Repeat([]byte{0x00}, 74), + ) + } else { + startBI := new(big.Int).SetBytes(sortedProofKeys[i]) + startBI.Add(startBI, big.NewInt(1)) + start = startBI.FillBytes(make([]byte, len(sortedProofKeys[i]))) + } + + if i == len(sortedProofKeys)-1 { + end = dataProofMetadataKey( + dataFilter, + bytes.Repeat([]byte{0xff}, 74), + ) + } else { + end = sortedProofKeys[i+1] + } + err := p.db.DeleteRange( + start, + end, + ) + if err != nil { + return errors.Wrap(err, "compact") + } + } + + for i := -1; i < len(sortedCommitKeys); i++ { + p.logger.Info( + "clearing orphaned commits", + zap.Int("commit_range_index", i+1), + zap.Int("max_commit_range_index", len(sortedProofKeys)), + ) + var start, end []byte + if i == -1 { + start = dataProofInclusionKey( + dataFilter, + bytes.Repeat([]byte{0x00}, 74), + 0, + ) + } else { + start = make([]byte, len(sortedCommitKeys[i])) + copy(start[:], sortedCommitKeys[i][:]) + start[74] = 0xff + start[75] = 0xff + start[76] = 0xff + start[77] = 0xff + start[78] = 0xff + start[79] = 0xff + start[80] = 0xff + start[81] = 0xff + } + + if i == len(sortedCommitKeys)-1 { + end = dataProofInclusionKey( + dataFilter, + bytes.Repeat([]byte{0xff}, 74), + 0xffffffffffffffff, + ) + } else { + end = sortedCommitKeys[i+1] + } + + err := p.db.DeleteRange( + start, + end, + ) + if err != nil { + return errors.Wrap(err, "compact") + } + } + + for i := -1; i < len(sortedDataKeys); i++ { + p.logger.Info( + "clearing orphaned data", + zap.Int("data_range_index", i+1), + zap.Int("max_data_range_index", len(sortedProofKeys)), + ) + var start, end []byte + if i == -1 { + start = dataProofSegmentKey( + dataFilter, + bytes.Repeat([]byte{0x00}, 32), + ) + } else { + startBI := new(big.Int).SetBytes(sortedDataKeys[i]) + startBI.Add(startBI, big.NewInt(1)) + start = startBI.FillBytes(make([]byte, len(sortedDataKeys[i]))) + } + + if i == len(sortedDataKeys)-1 { + end = dataProofSegmentKey( + dataFilter, + bytes.Repeat([]byte{0xff}, 32), + ) + } else { + end = sortedDataKeys[i+1] + } + + err := p.db.DeleteRange( + start, + end, + ) + if err != nil { + return errors.Wrap(err, "compact") + } + } if err := p.db.DeleteRange( clockDataCandidateFrameKey( @@ -1297,47 +1576,14 @@ func (p *PebbleClockStore) Compact( return errors.Wrap(err, "compact") } - if err := p.db.Compact( - clockDataFrameKey(dataFilter, 0), - clockDataFrameKey(dataFilter, 1000000), - true, - ); err != nil { + err = p.db.Set([]byte{CLOCK_COMPACTION_DATA}, config.GetVersion()) + if err != nil { return errors.Wrap(err, "compact") } + } - if err := p.db.Compact( - clockDataCandidateFrameKey( - dataFilter, - 0, - make([]byte, 32), - make([]byte, 32), - ), - clockDataCandidateFrameKey( - dataFilter, - 1000000, - bytes.Repeat([]byte{0xff}, 32), - bytes.Repeat([]byte{0xff}, 32), - ), - true, - ); err != nil { - return errors.Wrap(err, "compact") - } - - if err := p.db.Compact( - clockDataParentIndexKey( - dataFilter, - 0, - make([]byte, 32), - ), - clockDataParentIndexKey( - dataFilter, - 1000000, - bytes.Repeat([]byte{0xff}, 32), - ), - true, - ); err != nil { - return errors.Wrap(err, "compact") - } + if err := p.db.CompactAll(); err != nil { + return errors.Wrap(err, "compact") } return nil diff --git a/node/store/data_proof.go b/node/store/data_proof.go index 5d855c8..bce779b 100644 --- a/node/store/data_proof.go +++ b/node/store/data_proof.go @@ -2,6 +2,7 @@ package store import ( "encoding/binary" + "fmt" "github.com/cockroachdb/pebble" "github.com/pkg/errors" @@ -203,6 +204,75 @@ func internalGetAggregateProof( return aggregate, nil } +func internalListAggregateProofKeys( + db KVDB, + filter []byte, + commitment []byte, + frameNumber uint64, +) ([][]byte, [][]byte, [][]byte, error) { + proofs := [][]byte{dataProofMetadataKey(filter, commitment)} + commits := [][]byte{} + data := [][]byte{} + + value, closer, err := db.Get(dataProofMetadataKey(filter, commitment)) + if err != nil { + fmt.Println("proof lookup failed") + + if errors.Is(err, pebble.ErrNotFound) { + return nil, nil, nil, ErrNotFound + } + + return nil, nil, nil, errors.Wrap(err, "list aggregate proof") + } + + defer closer.Close() + copied := make([]byte, len(value[8:])) + limit := binary.BigEndian.Uint64(value[0:8]) + copy(copied, value[8:]) + + iter, err := db.NewIter( + dataProofInclusionKey(filter, commitment, 0), + dataProofInclusionKey(filter, commitment, limit+1), + ) + if err != nil { + fmt.Println("inclusion lookup failed") + + return nil, nil, nil, errors.Wrap(err, "list aggregate proof") + } + + i := uint32(0) + commits = append(commits, dataProofInclusionKey(filter, commitment, 0)) + for iter.First(); iter.Valid(); iter.Next() { + incCommit := iter.Value() + + urlLength := binary.BigEndian.Uint16(incCommit[:2]) + commitLength := binary.BigEndian.Uint16(incCommit[2:4]) + + url := make([]byte, urlLength) + copy(url, incCommit[4:urlLength+4]) + + commit := make([]byte, commitLength) + copy(commit, incCommit[urlLength+4:urlLength+4+commitLength]) + + remainder := int(urlLength + 4 + commitLength) + + for j := 0; j < (len(incCommit)-remainder)/32; j++ { + start := remainder + (j * 32) + end := remainder + ((j + 1) * 32) + + data = append(data, dataProofSegmentKey(filter, incCommit[start:end])) + } + + i++ + } + + if err = iter.Close(); err != nil { + return nil, nil, nil, errors.Wrap(err, "list aggregate proof") + } + + return proofs, commits, data, nil +} + func (p *PebbleDataProofStore) GetAggregateProof( filter []byte, commitment []byte, diff --git a/node/store/inmem.go b/node/store/inmem.go index 8f27b74..7680f48 100644 --- a/node/store/inmem.go +++ b/node/store/inmem.go @@ -344,4 +344,8 @@ func (d *InMemKVDB) DeleteRange(start, end []byte) error { return nil } +func (d *InMemKVDB) CompactAll() error { + return nil +} + var _ KVDB = (*InMemKVDB)(nil) diff --git a/node/store/kvdb.go b/node/store/kvdb.go index 01ebadf..4143d3a 100644 --- a/node/store/kvdb.go +++ b/node/store/kvdb.go @@ -11,6 +11,7 @@ type KVDB interface { NewBatch() Transaction NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) Compact(start, end []byte, parallelize bool) error + CompactAll() error Close() error DeleteRange(start, end []byte) error } diff --git a/node/store/pebble.go b/node/store/pebble.go index 91b39fd..a9072bd 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -4,6 +4,7 @@ import ( "io" "github.com/cockroachdb/pebble" + "github.com/pkg/errors" "source.quilibrium.com/quilibrium/monorepo/node/config" ) @@ -60,6 +61,30 @@ func (p *PebbleDB) DeleteRange(start, end []byte) error { return p.db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true}) } +func (p *PebbleDB) CompactAll() error { + iter, err := p.db.NewIter(nil) + if err != nil { + return errors.Wrap(err, "compact all") + } + + var first, last []byte + if iter.First() { + first = append(first, iter.Key()...) + } + if iter.Last() { + last = append(last, iter.Key()...) + } + if err := iter.Close(); err != nil { + return errors.Wrap(err, "compact all") + } + + if err := p.Compact(first, last, true); err != nil { + return errors.Wrap(err, "compact all") + } + + return nil +} + var _ KVDB = (*PebbleDB)(nil) type Transaction interface {