* v1.4.6

* adjust connection manager for bootstrappers, go back to autoscale for resource
This commit is contained in:
Cassandra Heart 2024-03-12 02:45:20 -05:00 committed by GitHub
parent 1fc27018b5
commit 644500bc42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 332 additions and 726 deletions

View File

@ -887,11 +887,11 @@ func logoVersion(width int) string {
out += " ''---.. ...---'' ##\n" out += " ''---.. ...---'' ##\n"
out += " ''----------''\n" out += " ''----------''\n"
out += " \n" out += " \n"
out += " Quilibrium Node - v1.4.5 Sunset\n" out += " Quilibrium Node - v1.4.6 Sunset\n"
out += " \n" out += " \n"
out += " DB Console\n" out += " DB Console\n"
} else { } else {
out = "Quilibrium Node - v1.4.5 Sunset - DB Console\n" out = "Quilibrium Node - v1.4.6 Sunset - DB Console\n"
} }
return out return out
} }

View File

@ -77,7 +77,7 @@ func (n *Node) RunRepair() {
if err == nil && head != nil { if err == nil && head != nil {
for head != nil && head.FrameNumber != 0 { for head != nil && head.FrameNumber != 0 {
prev := head prev := head
head, err = n.clockStore.GetParentDataClockFrame( head, err = n.clockStore.GetStagedDataClockFrame(
intrinsicFilter, intrinsicFilter,
head.FrameNumber-1, head.FrameNumber-1,
head.ParentSelector, head.ParentSelector,
@ -99,11 +99,11 @@ func (n *Node) RunRepair() {
"repairing frame", "repairing frame",
zap.Uint64("frame_number", head.FrameNumber), zap.Uint64("frame_number", head.FrameNumber),
) )
head, err = n.clockStore.GetParentDataClockFrame( head, err = n.clockStore.GetStagedDataClockFrame(
intrinsicFilter, intrinsicFilter,
prev.FrameNumber-1, prev.FrameNumber-1,
prev.ParentSelector, prev.ParentSelector,
false, true,
) )
if err != nil { if err != nil {
panic(err) panic(err)
@ -114,7 +114,19 @@ func (n *Node) RunRepair() {
panic(err) panic(err)
} }
err = n.clockStore.PutDataClockFrame(head, proverTrie, txn, true) selector, err := head.GetSelector()
if err != nil {
panic(err)
}
err = n.clockStore.CommitDataClockFrame(
intrinsicFilter,
head.FrameNumber,
selector.FillBytes(make([]byte, 32)),
proverTrie,
txn,
true,
)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -145,6 +145,10 @@ func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce(
continue continue
} }
if !bytes.Equal(p.PeerId, peerID) {
continue
}
if p.PublicKey == nil || p.Signature == nil || p.Version == nil { if p.PublicKey == nil || p.Signature == nil || p.Version == nil {
continue continue
} }
@ -188,7 +192,7 @@ func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce(
"peer provided outdated version, penalizing app score", "peer provided outdated version, penalizing app score",
zap.Binary("peer_id", p.PeerId), zap.Binary("peer_id", p.PeerId),
) )
e.pubSub.SetPeerScore(p.PeerId, -100) e.pubSub.SetPeerScore(p.PeerId, -10000)
continue continue
} }
} }

View File

@ -306,6 +306,11 @@ func (e *CeremonyDataClockConsensusEngine) Start() <-chan error {
panic(err) panic(err)
} }
e.logger.Info(
"preparing peer announce",
zap.Uint64("frame_number", frame.FrameNumber),
)
timestamp := time.Now().UnixMilli() timestamp := time.Now().UnixMilli()
msg := binary.BigEndian.AppendUint64([]byte{}, frame.FrameNumber) msg := binary.BigEndian.AppendUint64([]byte{}, frame.FrameNumber)
msg = append(msg, consensus.GetVersion()...) msg = append(msg, consensus.GetVersion()...)

View File

@ -292,7 +292,7 @@ func (e *CeremonyDataClockConsensusEngine) sync(
parentSelector := make([]byte, 32) parentSelector := make([]byte, 32)
for _, selector := range preflight.Preflight.RangeParentSelectors { for _, selector := range preflight.Preflight.RangeParentSelectors {
match, err := e.clockStore.GetParentDataClockFrame( match, err := e.clockStore.GetStagedDataClockFrame(
e.filter, e.filter,
selector.FrameNumber, selector.FrameNumber,
selector.ParentSelector, selector.ParentSelector,
@ -309,10 +309,37 @@ func (e *CeremonyDataClockConsensusEngine) sync(
if match != nil && found == 0 { if match != nil && found == 0 {
found = match.FrameNumber found = match.FrameNumber
parentSelector = match.ParentSelector parentSelector = match.ParentSelector
break
} }
} }
if found != 0 { if found != 0 && !bytes.Equal(parentSelector, make([]byte, 32)) {
from = found check, err := e.clockStore.GetStagedDataClockFrame(
e.filter,
found,
parentSelector,
true,
)
if err != nil {
from = 1
} else {
e.logger.Info("checking interstitial continuity")
for check.FrameNumber > 1 {
check, err = e.clockStore.GetStagedDataClockFrame(
e.filter,
check.FrameNumber-1,
check.ParentSelector,
true,
)
if err != nil {
from = 1
e.logger.Info(
"could not confirm interstitial continuity, setting search to 1",
)
break
}
}
from = found
}
} }
err = s.Send(&protobufs.CeremonyCompressedSyncRequestMessage{ err = s.Send(&protobufs.CeremonyCompressedSyncRequestMessage{

View File

@ -300,28 +300,23 @@ func (e *CeremonyDataClockConsensusEngine) GetCompressedSyncFrames(
return errors.Wrap(err, "get compressed sync frames") return errors.Wrap(err, "get compressed sync frames")
} else { } else {
frames, err := e.clockStore.GetCandidateDataClockFrames(e.filter, from) e.logger.Debug(
if err != nil || len(frames) == 0 { "peer asked for undiscovered frame",
e.logger.Debug( zap.Uint64("frame_number", request.FromFrameNumber),
"peer asked for undiscovered frame", )
zap.Uint64("frame_number", request.FromFrameNumber),
)
if err := server.SendMsg( if err := server.SendMsg(
&protobufs.ClockFramesResponse{ &protobufs.ClockFramesResponse{
Filter: request.Filter, Filter: request.Filter,
FromFrameNumber: 0, FromFrameNumber: 0,
ToFrameNumber: 0, ToFrameNumber: 0,
ClockFrames: []*protobufs.ClockFrame{}, ClockFrames: []*protobufs.ClockFrame{},
}, },
); err != nil { ); err != nil {
return errors.Wrap(err, "get compressed sync frames") return errors.Wrap(err, "get compressed sync frames")
}
return nil
} }
parent = nil return nil
} }
} }
@ -333,7 +328,7 @@ func (e *CeremonyDataClockConsensusEngine) GetCompressedSyncFrames(
} }
for !bytes.Equal(frame.ParentSelector, parent) { for !bytes.Equal(frame.ParentSelector, parent) {
ours, err := e.clockStore.GetParentDataClockFrame( ours, err := e.clockStore.GetStagedDataClockFrame(
e.filter, e.filter,
frame.FrameNumber-1, frame.FrameNumber-1,
frame.ParentSelector, frame.ParentSelector,
@ -345,7 +340,7 @@ func (e *CeremonyDataClockConsensusEngine) GetCompressedSyncFrames(
break break
} }
theirs, err := e.clockStore.GetParentDataClockFrame( theirs, err := e.clockStore.GetStagedDataClockFrame(
e.filter, e.filter,
frame.FrameNumber-1, frame.FrameNumber-1,
parent, parent,
@ -365,7 +360,7 @@ func (e *CeremonyDataClockConsensusEngine) GetCompressedSyncFrames(
if request.RangeParentSelectors != nil { if request.RangeParentSelectors != nil {
for _, selector := range request.RangeParentSelectors { for _, selector := range request.RangeParentSelectors {
frame, err := e.clockStore.GetParentDataClockFrame( frame, err := e.clockStore.GetStagedDataClockFrame(
e.filter, e.filter,
selector.FrameNumber, selector.FrameNumber,
selector.ParentSelector, selector.ParentSelector,

View File

@ -59,5 +59,5 @@ func GetMinimumVersion() []byte {
} }
func GetVersion() []byte { func GetVersion() []byte {
return []byte{0x01, 0x04, 0x05} return []byte{0x01, 0x04, 0x06}
} }

View File

@ -183,7 +183,7 @@ func (e *MasterClockConsensusEngine) handleSelfTestReport(
) )
if report.Cores < 3 || memory < 16000000000 { if report.Cores < 3 || memory < 16000000000 {
e.logger.Info( e.logger.Debug(
"peer reported invalid configuration", "peer reported invalid configuration",
zap.String("peer_id", base58.Encode(peerID)), zap.String("peer_id", base58.Encode(peerID)),
zap.Uint32("difficulty", report.Difficulty), zap.Uint32("difficulty", report.Difficulty),
@ -208,7 +208,7 @@ func (e *MasterClockConsensusEngine) handleSelfTestReport(
cc, err := e.pubSub.GetDirectChannel(peerID, "validation") cc, err := e.pubSub.GetDirectChannel(peerID, "validation")
if err != nil { if err != nil {
e.logger.Error( e.logger.Debug(
"could not connect for validation", "could not connect for validation",
zap.String("peer_id", base58.Encode(peerID)), zap.String("peer_id", base58.Encode(peerID)),
zap.Uint32("difficulty", report.Difficulty), zap.Uint32("difficulty", report.Difficulty),
@ -245,7 +245,7 @@ func (e *MasterClockConsensusEngine) handleSelfTestReport(
cc.Close() cc.Close()
if !bytes.Equal(verification, validation.Validation) { if !bytes.Equal(verification, validation.Validation) {
e.logger.Error( e.logger.Debug(
"provided invalid verification", "provided invalid verification",
zap.String("peer_id", base58.Encode(peerID)), zap.String("peer_id", base58.Encode(peerID)),
zap.Uint32("difficulty", report.Difficulty), zap.Uint32("difficulty", report.Difficulty),
@ -266,7 +266,7 @@ func (e *MasterClockConsensusEngine) handleSelfTestReport(
} }
if end-start > 2000 { if end-start > 2000 {
e.logger.Error( e.logger.Debug(
"slow bandwidth, scoring out", "slow bandwidth, scoring out",
zap.String("peer_id", base58.Encode(peerID)), zap.String("peer_id", base58.Encode(peerID)),
zap.Uint32("difficulty", report.Difficulty), zap.Uint32("difficulty", report.Difficulty),

View File

@ -40,22 +40,24 @@ type MasterClockConsensusEngine struct {
frameProver crypto.FrameProver frameProver crypto.FrameProver
lastFrameReceivedAt time.Time lastFrameReceivedAt time.Time
frameChan chan *protobufs.ClockFrame frameChan chan *protobufs.ClockFrame
executionEngines map[string]execution.ExecutionEngine executionEngines map[string]execution.ExecutionEngine
filter []byte filter []byte
input []byte input []byte
syncingStatus SyncStatusType syncingStatus SyncStatusType
syncingTarget []byte syncingTarget []byte
engineMx sync.Mutex engineMx sync.Mutex
seenFramesMx sync.Mutex seenFramesMx sync.Mutex
historicFramesMx sync.Mutex historicFramesMx sync.Mutex
seenFrames []*protobufs.ClockFrame seenFrames []*protobufs.ClockFrame
historicFrames []*protobufs.ClockFrame historicFrames []*protobufs.ClockFrame
clockStore store.ClockStore clockStore store.ClockStore
masterTimeReel *qtime.MasterTimeReel masterTimeReel *qtime.MasterTimeReel
report *protobufs.SelfTestReport report *protobufs.SelfTestReport
peerMapMx sync.Mutex peerMapMx sync.Mutex
peerMap map[string]*protobufs.SelfTestReport peerMap map[string]*protobufs.SelfTestReport
currentReceivingSyncPeers int
currentReceivingSyncPeersMx sync.Mutex
} }
var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil) var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil)

View File

@ -12,6 +12,22 @@ func (e *MasterClockConsensusEngine) Sync(
request *protobufs.SyncRequest, request *protobufs.SyncRequest,
server protobufs.ValidationService_SyncServer, server protobufs.ValidationService_SyncServer,
) error { ) error {
e.currentReceivingSyncPeersMx.Lock()
if e.currentReceivingSyncPeers > 4 {
e.currentReceivingSyncPeersMx.Unlock()
e.logger.Debug("currently processing maximum sync requests, returning")
return nil
}
e.currentReceivingSyncPeers++
e.currentReceivingSyncPeersMx.Unlock()
defer func() {
e.currentReceivingSyncPeersMx.Lock()
e.currentReceivingSyncPeers--
e.currentReceivingSyncPeersMx.Unlock()
}()
from := request.FramesRequest.FromFrameNumber from := request.FramesRequest.FromFrameNumber
masterFrame, err := e.masterTimeReel.Head() masterFrame, err := e.masterTimeReel.Head()

View File

@ -127,6 +127,7 @@ func (d *DataTimeReel) Start() error {
} }
if frame == nil { if frame == nil {
d.head, d.proverTrie = d.createGenesisFrame() d.head, d.proverTrie = d.createGenesisFrame()
d.totalDistance = big.NewInt(0) d.totalDistance = big.NewInt(0)
d.headDistance = big.NewInt(0) d.headDistance = big.NewInt(0)
@ -239,13 +240,30 @@ func (d *DataTimeReel) createGenesisFrame() (
panic(err) panic(err)
} }
selector, err := frame.GetSelector()
if err != nil {
panic(err)
}
txn, err := d.clockStore.NewTransaction() txn, err := d.clockStore.NewTransaction()
if err != nil { if err != nil {
panic(err) panic(err)
} }
if err := d.clockStore.PutDataClockFrame( err = d.clockStore.StageDataClockFrame(
selector.FillBytes(make([]byte, 32)),
frame, frame,
txn,
)
if err != nil {
txn.Abort()
panic(err)
}
if err := d.clockStore.CommitDataClockFrame(
d.filter,
0,
selector.FillBytes(make([]byte, 32)),
trie, trie,
txn, txn,
false, false,
@ -294,7 +312,7 @@ func (d *DataTimeReel) runLoop() {
panic(err) panic(err)
} }
rawFrame, err := d.clockStore.GetParentDataClockFrame( rawFrame, err := d.clockStore.GetStagedDataClockFrame(
d.filter, d.filter,
frame.frameNumber, frame.frameNumber,
frame.selector.FillBytes(make([]byte, 32)), frame.selector.FillBytes(make([]byte, 32)),
@ -341,7 +359,7 @@ func (d *DataTimeReel) runLoop() {
continue continue
} }
rawFrame, err := d.clockStore.GetParentDataClockFrame( rawFrame, err := d.clockStore.GetStagedDataClockFrame(
d.filter, d.filter,
frame.frameNumber, frame.frameNumber,
frame.selector.FillBytes(make([]byte, 32)), frame.selector.FillBytes(make([]byte, 32)),
@ -460,7 +478,7 @@ func (d *DataTimeReel) storePending(
frame *protobufs.ClockFrame, frame *protobufs.ClockFrame,
) { ) {
// avoid db thrashing // avoid db thrashing
if existing, err := d.clockStore.GetParentDataClockFrame( if existing, err := d.clockStore.GetStagedDataClockFrame(
frame.Filter, frame.Filter,
frame.FrameNumber, frame.FrameNumber,
selector.FillBytes(make([]byte, 32)), selector.FillBytes(make([]byte, 32)),
@ -478,9 +496,7 @@ func (d *DataTimeReel) storePending(
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = d.clockStore.PutCandidateDataClockFrame( err = d.clockStore.StageDataClockFrame(
parent.FillBytes(make([]byte, 32)),
distance.FillBytes(make([]byte, 32)),
selector.FillBytes(make([]byte, 32)), selector.FillBytes(make([]byte, 32)),
frame, frame,
txn, txn,
@ -578,8 +594,15 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) {
zap.String("distance", distance.Text(16)), zap.String("distance", distance.Text(16)),
) )
if err := d.clockStore.PutDataClockFrame( selector, err := frame.GetSelector()
frame, if err != nil {
panic(err)
}
if err := d.clockStore.CommitDataClockFrame(
d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
d.proverTrie, d.proverTrie,
txn, txn,
false, false,
@ -607,7 +630,7 @@ func (d *DataTimeReel) getTotalDistance(frame *protobufs.ClockFrame) *big.Int {
} }
for index := frame; err == nil && for index := frame; err == nil &&
index.FrameNumber > 0; index, err = d.clockStore.GetParentDataClockFrame( index.FrameNumber > 0; index, err = d.clockStore.GetStagedDataClockFrame(
d.filter, d.filter,
index.FrameNumber-1, index.FrameNumber-1,
index.ParentSelector, index.ParentSelector,
@ -692,7 +715,7 @@ func (d *DataTimeReel) forkChoice(
rightReplaySelectors..., rightReplaySelectors...,
) )
rightIndex, err = d.clockStore.GetParentDataClockFrame( rightIndex, err = d.clockStore.GetStagedDataClockFrame(
d.filter, d.filter,
rightIndex.FrameNumber-1, rightIndex.FrameNumber-1,
rightIndex.ParentSelector, rightIndex.ParentSelector,
@ -737,7 +760,7 @@ func (d *DataTimeReel) forkChoice(
), ),
rightReplaySelectors..., rightReplaySelectors...,
) )
leftIndex, err = d.clockStore.GetParentDataClockFrame( leftIndex, err = d.clockStore.GetStagedDataClockFrame(
d.filter, d.filter,
leftIndex.FrameNumber-1, leftIndex.FrameNumber-1,
leftIndex.ParentSelector, leftIndex.ParentSelector,
@ -755,7 +778,7 @@ func (d *DataTimeReel) forkChoice(
panic(err) panic(err)
} }
rightIndex, err = d.clockStore.GetParentDataClockFrame( rightIndex, err = d.clockStore.GetStagedDataClockFrame(
d.filter, d.filter,
rightIndex.FrameNumber-1, rightIndex.FrameNumber-1,
rightIndex.ParentSelector, rightIndex.ParentSelector,
@ -810,23 +833,15 @@ func (d *DataTimeReel) forkChoice(
rightReplaySelectors = rightReplaySelectors =
rightReplaySelectors[1:] rightReplaySelectors[1:]
rightIndex, err = d.clockStore.GetParentDataClockFrame(
d.filter,
frameNumber,
next,
false,
)
if err != nil {
panic(err)
}
txn, err := d.clockStore.NewTransaction() txn, err := d.clockStore.NewTransaction()
if err != nil { if err != nil {
panic(err) panic(err)
} }
if err := d.clockStore.PutDataClockFrame( if err := d.clockStore.CommitDataClockFrame(
rightIndex, d.filter,
frameNumber,
next,
d.proverTrie, d.proverTrie,
txn, txn,
rightIndex.FrameNumber < d.head.FrameNumber, rightIndex.FrameNumber < d.head.FrameNumber,
@ -846,8 +861,10 @@ func (d *DataTimeReel) forkChoice(
panic(err) panic(err)
} }
if err := d.clockStore.PutDataClockFrame( if err := d.clockStore.CommitDataClockFrame(
frame, d.filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
d.proverTrie, d.proverTrie,
txn, txn,
false, false,

View File

@ -1386,7 +1386,7 @@ func (e *CeremonyExecutionEngine) VerifyExecution(
return errors.Wrap(err, "verify execution") return errors.Wrap(err, "verify execution")
} }
parent, err := e.clockStore.GetParentDataClockFrame( parent, err := e.clockStore.GetStagedDataClockFrame(
append( append(
p2p.GetBloomFilter(application.CEREMONY_ADDRESS, 256, 3), p2p.GetBloomFilter(application.CEREMONY_ADDRESS, 256, 3),
p2p.GetBloomFilterIndices( p2p.GetBloomFilterIndices(

View File

@ -523,5 +523,5 @@ func printLogo() {
func printVersion() { func printVersion() {
fmt.Println(" ") fmt.Println(" ")
fmt.Println(" Quilibrium Node - v1.4.5 Sunset") fmt.Println(" Quilibrium Node - v1.4.6 Sunset")
} }

View File

@ -22,7 +22,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/discovery/util" "github.com/libp2p/go-libp2p/p2p/discovery/util"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -108,24 +107,12 @@ func NewBlossomSub(
} }
if isBootstrapPeer { if isBootstrapPeer {
limits := rcmgr.DefaultLimits mgr, err := connmgr.NewConnManager(512, 8192)
libp2p.SetDefaultServiceLimits(&limits)
limits.SystemBaseLimit.ConnsInbound = 2048
limits.SystemBaseLimit.StreamsInbound = 2048
rmgr, err := rcmgr.NewResourceManager(
rcmgr.NewFixedLimiter(limits.AutoScale()),
)
if err != nil {
panic(err)
}
mgr, err := connmgr.NewConnManager(512, 2048)
if err != nil { if err != nil {
panic(err) panic(err)
} }
opts = append(opts, opts = append(opts,
libp2p.ResourceManager(rmgr),
libp2p.ConnectionManager(mgr), libp2p.ConnectionManager(mgr),
) )
} }
@ -576,7 +563,7 @@ func discoverPeers(
continue continue
} }
logger.Info("found peer", zap.String("peer_id", peer.ID.Pretty())) logger.Debug("found peer", zap.String("peer_id", peer.ID.Pretty()))
err := h.Connect(ctx, peer) err := h.Connect(ctx, peer)
if err != nil { if err != nil {
logger.Debug( logger.Debug(
@ -585,7 +572,7 @@ func discoverPeers(
zap.Error(err), zap.Error(err),
) )
} else { } else {
logger.Info( logger.Debug(
"connected to peer", "connected to peer",
zap.String("peer_id", peer.ID.Pretty()), zap.String("peer_id", peer.ID.Pretty()),
) )

View File

@ -70,27 +70,6 @@ func (r *RPCServer) GetFrameInfo(
ClockFrame: frame, ClockFrame: frame,
}, nil }, nil
} else { } else {
frames, err := r.clockStore.GetCandidateDataClockFrames(
req.Filter,
req.FrameNumber,
)
if err != nil {
return nil, errors.Wrap(err, "get frame info")
}
for _, frame := range frames {
selector, err := frame.GetSelector()
if err != nil {
return nil, errors.Wrap(err, "get frame info")
}
if bytes.Equal(selector.Bytes(), req.Selector) {
return &protobufs.FrameInfoResponse{
ClockFrame: frame,
}, nil
}
}
return nil, errors.Wrap(errors.New("not found"), "get frame info") return nil, errors.Wrap(errors.New("not found"), "get frame info")
} }
} }
@ -151,44 +130,6 @@ func (r *RPCServer) GetFrames(
return nil, errors.Wrap(err, "get frames") return nil, errors.Wrap(err, "get frames")
} }
if req.IncludeCandidates {
from := req.FromFrameNumber
if len(frames) > 0 {
from = frames[len(frames)-1].FrameNumber + 1
}
for from < req.ToFrameNumber {
iter, err := r.clockStore.RangeCandidateDataClockFrames(
req.Filter,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
from,
)
if err != nil {
return nil, errors.Wrap(err, "get frames")
}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.TruncatedValue()
if err != nil {
iter.Close()
return nil, errors.Wrap(err, "get frames")
}
frames = append(frames, frame)
}
if err := iter.Close(); err != nil {
return nil, errors.Wrap(err, "get frames")
}
from++
}
}
return &protobufs.FramesResponse{ return &protobufs.FramesResponse{
TruncatedClockFrames: frames, TruncatedClockFrames: frames,
}, nil }, nil
@ -289,17 +230,15 @@ func (r *RPCServer) GetTokenInfo(
} }
confirmedTotal := new(big.Int) confirmedTotal := new(big.Int)
unconfirmedTotal := new(big.Int)
ownedTotal := new(big.Int) ownedTotal := new(big.Int)
unconfirmedOwnedTotal := new(big.Int)
if confirmed.RewardTrie.Root == nil || if confirmed.RewardTrie.Root == nil ||
(confirmed.RewardTrie.Root.External == nil && (confirmed.RewardTrie.Root.External == nil &&
confirmed.RewardTrie.Root.Internal == nil) { confirmed.RewardTrie.Root.Internal == nil) {
return &protobufs.TokenInfoResponse{ return &protobufs.TokenInfoResponse{
ConfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)), ConfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
UnconfirmedTokenSupply: unconfirmedTotal.FillBytes(make([]byte, 32)), UnconfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
OwnedTokens: ownedTotal.FillBytes(make([]byte, 32)), OwnedTokens: ownedTotal.FillBytes(make([]byte, 32)),
UnconfirmedOwnedTokens: unconfirmedOwnedTotal.FillBytes(make([]byte, 32)), UnconfirmedOwnedTokens: ownedTotal.FillBytes(make([]byte, 32)),
}, nil }, nil
} }
@ -358,78 +297,10 @@ func (r *RPCServer) GetTokenInfo(
limbs = nextLimbs limbs = nextLimbs
} }
candidateFrame, err := r.clockStore.GetHighestCandidateDataClockFrame(
append(
p2p.GetBloomFilter(application.CEREMONY_ADDRESS, 256, 3),
p2p.GetBloomFilterIndices(application.CEREMONY_ADDRESS, 65536, 24)...,
),
)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "get token info") return nil, errors.Wrap(err, "get token info")
} }
unconfirmed, err := application.MaterializeApplicationFromFrame(
candidateFrame,
)
if err != nil {
return nil, errors.Wrap(err, "get token info")
}
limbs = []*tries.RewardInternalNode{}
if unconfirmed.RewardTrie.Root.Internal != nil {
limbs = append(limbs, unconfirmed.RewardTrie.Root.Internal)
} else {
unconfirmedTotal = unconfirmedTotal.Add(
unconfirmedTotal,
new(big.Int).SetUint64(unconfirmed.RewardTrie.Root.External.Total),
)
if bytes.Equal(
unconfirmed.RewardTrie.Root.External.Key,
addrBytes,
) {
unconfirmedOwnedTotal = unconfirmedOwnedTotal.Add(
unconfirmedOwnedTotal,
new(big.Int).SetUint64(unconfirmed.RewardTrie.Root.External.Total),
)
}
}
for len(limbs) != 0 {
nextLimbs := []*tries.RewardInternalNode{}
for _, limb := range limbs {
for _, child := range limb.Child {
child := child
if child.Internal != nil {
nextLimbs = append(nextLimbs, child.Internal)
} else {
unconfirmedTotal = unconfirmedTotal.Add(
unconfirmedTotal,
new(big.Int).SetUint64(child.External.Total),
)
if bytes.Equal(
child.External.Key,
addrBytes,
) {
unconfirmedOwnedTotal = unconfirmedOwnedTotal.Add(
unconfirmedOwnedTotal,
new(big.Int).SetUint64(child.External.Total),
)
}
if bytes.Equal(
child.External.Key,
peerAddrBytes,
) {
unconfirmedOwnedTotal = unconfirmedOwnedTotal.Add(
unconfirmedOwnedTotal,
new(big.Int).SetUint64(child.External.Total),
)
}
}
}
}
limbs = nextLimbs
}
// 1 QUIL = 0x1DCD65000 units // 1 QUIL = 0x1DCD65000 units
conversionFactor, ok := new(big.Int).SetString("1DCD65000", 16) conversionFactor, ok := new(big.Int).SetString("1DCD65000", 16)
if !ok { if !ok {
@ -437,18 +308,13 @@ func (r *RPCServer) GetTokenInfo(
} }
confirmedTotal = confirmedTotal.Mul(confirmedTotal, conversionFactor) confirmedTotal = confirmedTotal.Mul(confirmedTotal, conversionFactor)
unconfirmedTotal = unconfirmedTotal.Mul(unconfirmedTotal, conversionFactor)
ownedTotal = ownedTotal.Mul(ownedTotal, conversionFactor) ownedTotal = ownedTotal.Mul(ownedTotal, conversionFactor)
unconfirmedOwnedTotal = unconfirmedOwnedTotal.Mul(
unconfirmedOwnedTotal,
conversionFactor,
)
return &protobufs.TokenInfoResponse{ return &protobufs.TokenInfoResponse{
ConfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)), ConfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
UnconfirmedTokenSupply: unconfirmedTotal.FillBytes(make([]byte, 32)), UnconfirmedTokenSupply: confirmedTotal.FillBytes(make([]byte, 32)),
OwnedTokens: ownedTotal.FillBytes(make([]byte, 32)), OwnedTokens: ownedTotal.FillBytes(make([]byte, 32)),
UnconfirmedOwnedTokens: unconfirmedOwnedTotal.FillBytes(make([]byte, 32)), UnconfirmedOwnedTokens: ownedTotal.FillBytes(make([]byte, 32)),
}, nil }, nil
} }

View File

@ -32,9 +32,6 @@ type ClockStore interface {
filter []byte, filter []byte,
proverTrie *tries.RollingFrecencyCritbitTrie, proverTrie *tries.RollingFrecencyCritbitTrie,
) (*protobufs.ClockFrame, error) ) (*protobufs.ClockFrame, error)
GetLatestCandidateDataClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error)
GetEarliestDataClockFrame(filter []byte) (*protobufs.ClockFrame, error) GetEarliestDataClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetDataClockFrame( GetDataClockFrame(
filter []byte, filter []byte,
@ -46,45 +43,25 @@ type ClockStore interface {
startFrameNumber uint64, startFrameNumber uint64,
endFrameNumber uint64, endFrameNumber uint64,
) (*PebbleClockIterator, error) ) (*PebbleClockIterator, error)
PutDataClockFrame( CommitDataClockFrame(
frame *protobufs.ClockFrame, filter []byte,
frameNumber uint64,
selector []byte,
proverTrie *tries.RollingFrecencyCritbitTrie, proverTrie *tries.RollingFrecencyCritbitTrie,
txn Transaction, txn Transaction,
backfill bool, backfill bool,
) error ) error
PutCandidateDataClockFrame( StageDataClockFrame(
parentSelector []byte,
distance []byte,
selector []byte, selector []byte,
frame *protobufs.ClockFrame, frame *protobufs.ClockFrame,
txn Transaction, txn Transaction,
) error ) error
GetCandidateDataClockFrame( GetStagedDataClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
distance []byte,
) (*protobufs.ClockFrame, error)
GetCandidateDataClockFrames(
filter []byte,
frameNumber uint64,
) ([]*protobufs.ClockFrame, error)
GetParentDataClockFrame(
filter []byte, filter []byte,
frameNumber uint64, frameNumber uint64,
parentSelector []byte, parentSelector []byte,
truncate bool, truncate bool,
) (*protobufs.ClockFrame, error) ) (*protobufs.ClockFrame, error)
RangeCandidateDataClockFrames(
filter []byte,
parent []byte,
frameNumber uint64,
) (*PebbleCandidateClockIterator, error)
GetLeadingCandidateDataClockFrame(
filter []byte,
parent []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error)
Deduplicate(filter []byte) error Deduplicate(filter []byte) error
GetCompressedDataClockFrames( GetCompressedDataClockFrames(
filter []byte, filter []byte,
@ -95,14 +72,6 @@ type ClockStore interface {
filter []byte, filter []byte,
frameNumber uint64, frameNumber uint64,
) error ) error
DeleteCandidateDataClockFrameRange(
filter []byte,
fromFrameNumber uint64,
toFrameNumber uint64,
) error
GetHighestCandidateDataClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error)
ResetMasterClockFrames(filter []byte) error ResetMasterClockFrames(filter []byte) error
ResetDataClockFrames(filter []byte) error ResetDataClockFrames(filter []byte) error
Compact( Compact(
@ -127,14 +96,8 @@ type PebbleClockIterator struct {
db *PebbleClockStore db *PebbleClockStore
} }
type PebbleCandidateClockIterator struct {
i Iterator
db *PebbleClockStore
}
var _ TypedIterator[*protobufs.ClockFrame] = (*PebbleMasterClockIterator)(nil) var _ TypedIterator[*protobufs.ClockFrame] = (*PebbleMasterClockIterator)(nil)
var _ TypedIterator[*protobufs.ClockFrame] = (*PebbleClockIterator)(nil) var _ TypedIterator[*protobufs.ClockFrame] = (*PebbleClockIterator)(nil)
var _ TypedIterator[*protobufs.ClockFrame] = (*PebbleCandidateClockIterator)(nil)
func (p *PebbleMasterClockIterator) First() bool { func (p *PebbleMasterClockIterator) First() bool {
return p.i.First() return p.i.First()
@ -247,7 +210,7 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
) )
} }
if err := p.db.fillAggregateProofs(frame); err != nil { if err := p.db.fillAggregateProofs(frame, false); err != nil {
return nil, errors.Wrap( return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()), errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value", "get clock frame iterator value",
@ -261,66 +224,6 @@ func (p *PebbleClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing clock frame iterator") return errors.Wrap(p.i.Close(), "closing clock frame iterator")
} }
func (p *PebbleCandidateClockIterator) First() bool {
return p.i.First()
}
func (p *PebbleCandidateClockIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleCandidateClockIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleCandidateClockIterator) TruncatedValue() (
*protobufs.ClockFrame,
error,
) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get candidate clock frame iterator value",
)
}
return frame, nil
}
func (p *PebbleCandidateClockIterator) Value() (*protobufs.ClockFrame, error) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get candidate clock frame iterator value",
)
}
if err := p.db.fillAggregateProofs(frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
return frame, nil
}
func (p *PebbleCandidateClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing candidate clock frame iterator")
}
func NewPebbleClockStore(db KVDB, logger *zap.Logger) *PebbleClockStore { func NewPebbleClockStore(db KVDB, logger *zap.Logger) *PebbleClockStore {
return &PebbleClockStore{ return &PebbleClockStore{
db, db,
@ -419,6 +322,7 @@ func clockDataEarliestIndex(filter []byte) []byte {
return clockEarliestIndex(filter, CLOCK_DATA_FRAME_INDEX_EARLIEST) return clockEarliestIndex(filter, CLOCK_DATA_FRAME_INDEX_EARLIEST)
} }
// Produces an index key of size: len(filter) + 42
func clockParentIndexKey( func clockParentIndexKey(
filter []byte, filter []byte,
frameNumber uint64, frameNumber uint64,
@ -639,17 +543,38 @@ func (p *PebbleClockStore) GetDataClockFrame(
return nil, nil, errors.Wrap(err, "get data clock frame") return nil, nil, errors.Wrap(err, "get data clock frame")
} }
defer closer.Close()
frame := &protobufs.ClockFrame{} frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil { genesisFramePreIndex := false
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()), // We do a bit of a cheap trick here while things are still stuck in the old
"get data clock frame", // ways: we use the size of the parent index key to determine if it's the new
) // format, or the old raw frame
if len(value) == (len(filter) + 42) {
frameValue, frameCloser, err := p.db.Get(value)
if err != nil {
return nil, nil, errors.Wrap(err, "get data clock frame")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
closer.Close()
defer frameCloser.Close()
} else {
genesisFramePreIndex = frameNumber == 0
if err := proto.Unmarshal(value, frame); err != nil {
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
defer closer.Close()
} }
if !truncate { if !truncate {
if err = p.fillAggregateProofs(frame); err != nil { if err = p.fillAggregateProofs(frame, genesisFramePreIndex); err != nil {
return nil, nil, errors.Wrap( return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()), errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame", "get data clock frame",
@ -677,8 +602,9 @@ func (p *PebbleClockStore) GetDataClockFrame(
func (p *PebbleClockStore) fillAggregateProofs( func (p *PebbleClockStore) fillAggregateProofs(
frame *protobufs.ClockFrame, frame *protobufs.ClockFrame,
genesisFramePreIndex bool,
) error { ) error {
if frame.FrameNumber == 0 { if frame.FrameNumber == 0 && genesisFramePreIndex {
return nil return nil
} }
@ -809,72 +735,21 @@ func (p *PebbleClockStore) GetLatestDataClockFrame(
return frame, nil return frame, nil
} }
func (p *PebbleClockStore) GetLatestCandidateDataClockFrame( // GetStagedDataClockFrame implements ClockStore.
filter []byte, func (p *PebbleClockStore) GetStagedDataClockFrame(
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockDataCandidateLatestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest candidate data clock frame")
}
frameNumber := binary.BigEndian.Uint64(idxValue)
frames, err := p.GetCandidateDataClockFrames(filter, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get latest candidate data clock frame")
}
closer.Close()
if len(frames) == 0 {
return nil, ErrNotFound
}
return frames[0], nil
}
// GetLeadingCandidateDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetLeadingCandidateDataClockFrame(
filter []byte,
parent []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error) {
iter, err := p.RangeCandidateDataClockFrames(filter, parent, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get leading candidate data clock frame")
}
if !iter.First() {
return nil, ErrNotFound
}
defer iter.Close()
frame, err := iter.Value()
return frame, errors.Wrap(err, "get leading candidate data clock frame")
}
// GetParentDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetParentDataClockFrame(
filter []byte, filter []byte,
frameNumber uint64, frameNumber uint64,
parentSelector []byte, parentSelector []byte,
truncate bool, truncate bool,
) (*protobufs.ClockFrame, error) { ) (*protobufs.ClockFrame, error) {
check := false
data, closer, err := p.db.Get( data, closer, err := p.db.Get(
clockDataParentIndexKey(filter, frameNumber, parentSelector), clockDataParentIndexKey(filter, frameNumber, parentSelector),
) )
if err != nil && !errors.Is(err, pebble.ErrNotFound) { if err != nil {
return nil, errors.Wrap(err, "get parent data clock frame") if errors.Is(err, pebble.ErrNotFound) {
} else if err != nil && errors.Is(err, pebble.ErrNotFound) {
data, closer, err = p.db.Get(clockDataFrameKey(filter, frameNumber))
if err != nil {
return nil, errors.Wrap(ErrNotFound, "get parent data clock frame") return nil, errors.Wrap(ErrNotFound, "get parent data clock frame")
} }
check = true return nil, errors.Wrap(err, "get parent data clock frame")
} }
parent := &protobufs.ClockFrame{} parent := &protobufs.ClockFrame{}
@ -882,18 +757,8 @@ func (p *PebbleClockStore) GetParentDataClockFrame(
return nil, errors.Wrap(err, "get parent data clock frame") return nil, errors.Wrap(err, "get parent data clock frame")
} }
if check {
selector, err := parent.GetSelector()
if err != nil {
return nil, errors.Wrap(err, "get parent data clock frame")
}
if !bytes.Equal(selector.FillBytes(make([]byte, 32)), parentSelector) {
return nil, errors.Wrap(ErrNotFound, "get parent data clock frame")
}
}
if !truncate { if !truncate {
if err := p.fillAggregateProofs(parent); err != nil { if err := p.fillAggregateProofs(parent, false); err != nil {
return nil, errors.Wrap( return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()), errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value", "get clock frame iterator value",
@ -908,10 +773,8 @@ func (p *PebbleClockStore) GetParentDataClockFrame(
return parent, nil return parent, nil
} }
// PutCandidateDataClockFrame implements ClockStore. // StageDataClockFrame implements ClockStore.
func (p *PebbleClockStore) PutCandidateDataClockFrame( func (p *PebbleClockStore) StageDataClockFrame(
parentSelector []byte,
distance []byte,
selector []byte, selector []byte,
frame *protobufs.ClockFrame, frame *protobufs.ClockFrame,
txn Transaction, txn Transaction,
@ -919,7 +782,7 @@ func (p *PebbleClockStore) PutCandidateDataClockFrame(
if err := p.saveAggregateProofs(txn, frame); err != nil { if err := p.saveAggregateProofs(txn, frame); err != nil {
return errors.Wrap( return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()), errors.Wrap(err, ErrInvalidData.Error()),
"put candidate data clock frame", "stage data clock frame",
) )
} }
@ -933,24 +796,12 @@ func (p *PebbleClockStore) PutCandidateDataClockFrame(
if err != nil { if err != nil {
return errors.Wrap( return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()), errors.Wrap(err, ErrInvalidData.Error()),
"put candidate data clock frame", "stage data clock frame",
) )
} }
frame.AggregateProofs = temp frame.AggregateProofs = temp
if err = txn.Set(
clockDataCandidateFrameKey(
frame.Filter,
frame.FrameNumber,
frame.ParentSelector,
distance,
),
data,
); err != nil {
return errors.Wrap(err, "put candidate data clock frame")
}
if err = txn.Set( if err = txn.Set(
clockDataParentIndexKey( clockDataParentIndexKey(
frame.Filter, frame.Filter,
@ -959,118 +810,54 @@ func (p *PebbleClockStore) PutCandidateDataClockFrame(
), ),
data, data,
); err != nil { ); err != nil {
return errors.Wrap(err, "put candidate data clock frame") return errors.Wrap(err, "stage data clock frame")
}
numberBytes, closer, err := p.db.Get(clockDataCandidateLatestIndex(frame.Filter))
if err != nil && !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "put candidate data clock frame")
}
existingNumber := uint64(0)
if numberBytes != nil {
existingNumber = binary.BigEndian.Uint64(numberBytes)
closer.Close()
}
if frame.FrameNumber > existingNumber {
frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber)
if err = txn.Set(
clockDataCandidateLatestIndex(frame.Filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put candidate data clock frame")
}
} }
return nil return nil
} }
// PutDataClockFrame implements ClockStore. // CommitDataClockFrame implements ClockStore.
func (p *PebbleClockStore) PutDataClockFrame( func (p *PebbleClockStore) CommitDataClockFrame(
frame *protobufs.ClockFrame, filter []byte,
frameNumber uint64,
selector []byte,
proverTrie *tries.RollingFrecencyCritbitTrie, proverTrie *tries.RollingFrecencyCritbitTrie,
txn Transaction, txn Transaction,
backfill bool, backfill bool,
) error { ) error {
if frame.FrameNumber != 0 {
if err := p.saveAggregateProofs(txn, frame); err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"put candidate data clock frame",
)
}
}
temp := append(
[]*protobufs.InclusionAggregateProof{},
frame.AggregateProofs...,
)
if frame.FrameNumber != 0 {
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
}
data, err := proto.Marshal(frame)
if err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"put data clock frame",
)
}
if frame.FrameNumber != 0 {
frame.AggregateProofs = temp
}
frameNumberBytes := make([]byte, 8) frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber) binary.BigEndian.PutUint64(frameNumberBytes, frameNumber)
if err = txn.Set( if err := txn.Set(
clockDataFrameKey(frame.Filter, frame.FrameNumber), clockDataFrameKey(filter, frameNumber),
data, clockDataParentIndexKey(filter, frameNumber, selector),
); err != nil { ); err != nil {
return errors.Wrap(err, "put data clock frame") return errors.Wrap(err, "commit data clock frame")
}
selector, err := frame.GetSelector()
if err != nil {
return errors.Wrap(err, "put data clock frame")
}
if err = txn.Set(
clockDataParentIndexKey(
frame.Filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
),
data,
); err != nil {
return errors.Wrap(err, "put data clock frame")
} }
proverData, err := proverTrie.Serialize() proverData, err := proverTrie.Serialize()
if err != nil { if err != nil {
return errors.Wrap(err, "put data clock frame") return errors.Wrap(err, "commit data clock frame")
} }
if err = txn.Set( if err = txn.Set(
clockProverTrieKey(frame.Filter, frame.FrameNumber), clockProverTrieKey(filter, frameNumber),
proverData, proverData,
); err != nil { ); err != nil {
return errors.Wrap(err, "put data clock frame") return errors.Wrap(err, "commit data clock frame")
} }
_, closer, err := p.db.Get(clockDataEarliestIndex(frame.Filter)) _, closer, err := p.db.Get(clockDataEarliestIndex(filter))
if err != nil { if err != nil {
if !errors.Is(err, pebble.ErrNotFound) { if !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "put data clock frame") return errors.Wrap(err, "commit data clock frame")
} }
if err = txn.Set( if err = txn.Set(
clockDataEarliestIndex(frame.Filter), clockDataEarliestIndex(filter),
frameNumberBytes, frameNumberBytes,
); err != nil { ); err != nil {
return errors.Wrap(err, "put data clock frame") return errors.Wrap(err, "commit data clock frame")
} }
} }
@ -1080,172 +867,16 @@ func (p *PebbleClockStore) PutDataClockFrame(
if !backfill { if !backfill {
if err = txn.Set( if err = txn.Set(
clockDataLatestIndex(frame.Filter), clockDataLatestIndex(filter),
frameNumberBytes, frameNumberBytes,
); err != nil { ); err != nil {
return errors.Wrap(err, "put data clock frame") return errors.Wrap(err, "commit data clock frame")
} }
} }
return nil return nil
} }
func (p *PebbleClockStore) GetCandidateDataClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
distance []byte,
) (*protobufs.ClockFrame, error) {
value, closer, err := p.db.Get(clockDataCandidateFrameKey(
filter,
frameNumber,
parentSelector,
distance,
))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get candidate data clock frame")
}
defer closer.Close()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get candidate data clock frame",
)
}
if err = p.fillAggregateProofs(frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get candidate data clock frame",
)
}
return frame, nil
}
// GetCandidateDataClockFrames implements ClockStore.
// Distance is 32-byte aligned, so we just use a 0x00 * 32 -> 0xff * 32 range
func (p *PebbleClockStore) GetCandidateDataClockFrames(
filter []byte,
frameNumber uint64,
) ([]*protobufs.ClockFrame, error) {
iter, err := p.db.NewIter(
clockDataCandidateFrameKey(
filter,
frameNumber,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
),
clockDataCandidateFrameKey(
filter,
frameNumber,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
),
)
if err != nil {
return nil, errors.Wrap(err, "get candidate data clock frames")
}
frames := []*protobufs.ClockFrame{}
i := &PebbleCandidateClockIterator{i: iter, db: p}
for i.First(); i.Valid(); i.Next() {
value, err := i.Value()
if err != nil {
return nil, errors.Wrap(err, "get candidate data clock frames")
}
frames = append(frames, value)
}
if err := i.Close(); err != nil {
return nil, errors.Wrap(err, "get candidate data clock frames")
}
return frames, nil
}
// RangeCandidateDataClockFrames implements ClockStore.
// Distance is 32-byte aligned, so we just use a 0x00 * 32 -> 0xff * 32 range
func (p *PebbleClockStore) RangeCandidateDataClockFrames(
filter []byte,
parent []byte,
frameNumber uint64,
) (*PebbleCandidateClockIterator, error) {
fromParent := rightAlign(parent, 32)
toParent := rightAlign(parent, 32)
if bytes.Equal(parent, []byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}) {
toParent = []byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
}
iter, err := p.db.NewIter(
clockDataCandidateFrameKey(
filter,
frameNumber,
fromParent,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
),
clockDataCandidateFrameKey(
filter,
frameNumber,
toParent,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
),
)
if err != nil {
return nil, errors.Wrap(err, "range candidate data clock frames")
}
return &PebbleCandidateClockIterator{i: iter, db: p}, nil
}
// RangeDataClockFrames implements ClockStore. // RangeDataClockFrames implements ClockStore.
func (p *PebbleClockStore) RangeDataClockFrames( func (p *PebbleClockStore) RangeDataClockFrames(
filter []byte, filter []byte,
@ -1469,6 +1100,33 @@ func (p *PebbleClockStore) GetCompressedDataClockFrames(
for iter.First(); iter.Valid(); iter.Next() { for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value() value := iter.Value()
frame := &protobufs.ClockFrame{} frame := &protobufs.ClockFrame{}
genesisFramePreIndex := false
// We do a bit of a cheap trick here while things are still stuck in the old
// ways: we use the size of the parent index key to determine if it's the new
// format, or the old raw frame
if len(value) == (len(filter) + 42) {
frameValue, frameCloser, err := p.db.Get(value)
if err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get compressed data clock frames",
)
}
frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get compressed data clock frames",
)
}
genesisFramePreIndex = frame.FrameNumber == 0
}
if err := proto.Unmarshal(value, frame); err != nil { if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames") return nil, errors.Wrap(err, "get compressed data clock frames")
} }
@ -1477,7 +1135,7 @@ func (p *PebbleClockStore) GetCompressedDataClockFrames(
syncMessage.TruncatedClockFrames, syncMessage.TruncatedClockFrames,
frame, frame,
) )
if frame.FrameNumber == 0 { if frame.FrameNumber == 0 && genesisFramePreIndex {
continue continue
} }
for i := 0; i < len(frame.Input[516:])/74; i++ { for i := 0; i < len(frame.Input[516:])/74; i++ {
@ -1620,48 +1278,6 @@ func (p *PebbleClockStore) SetLatestDataClockFrameNumber(
return errors.Wrap(err, "set latest data clock frame number") return errors.Wrap(err, "set latest data clock frame number")
} }
func (p *PebbleClockStore) DeleteCandidateDataClockFrameRange(
filter []byte,
fromFrameNumber uint64,
toFrameNumber uint64,
) error {
err := p.db.DeleteRange(
clockDataCandidateFrameKey(
filter,
fromFrameNumber,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
),
clockDataCandidateFrameKey(
filter,
toFrameNumber,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
),
)
return errors.Wrap(err, "delete candidate data clock frame range")
}
func (p *PebbleClockStore) DeleteDataClockFrameRange( func (p *PebbleClockStore) DeleteDataClockFrameRange(
filter []byte, filter []byte,
fromFrameNumber uint64, fromFrameNumber uint64,
@ -1680,16 +1296,6 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange(
return errors.Wrap(err, "delete data clock frame range") return errors.Wrap(err, "delete data clock frame range")
} }
func (p *PebbleClockStore) GetHighestCandidateDataClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error) {
frame, err := p.GetLatestCandidateDataClockFrame(filter)
if err != nil {
return nil, errors.Wrap(err, "get highest candidate data clock frame")
}
return frame, nil
}
func (p *PebbleClockStore) ResetMasterClockFrames(filter []byte) error { func (p *PebbleClockStore) ResetMasterClockFrames(filter []byte) error {
if err := p.db.DeleteRange( if err := p.db.DeleteRange(
clockMasterFrameKey(filter, 0), clockMasterFrameKey(filter, 0),
@ -1741,7 +1347,60 @@ func (p *PebbleClockStore) Compact(
} }
} }
// If this node has been around since the early days, this is going to free
// up a lot of cruft.
if err := p.db.DeleteRange(
clockDataCandidateFrameKey(
make([]byte, 32),
0,
make([]byte, 32),
make([]byte, 32),
),
clockDataCandidateFrameKey(
bytes.Repeat([]byte{0xff}, 32),
1000000,
bytes.Repeat([]byte{0xff}, 32),
bytes.Repeat([]byte{0xff}, 32),
),
); err != nil {
return errors.Wrap(err, "compact")
}
if err := p.db.Compact(
clockDataCandidateFrameKey(
make([]byte, 32),
0,
make([]byte, 32),
make([]byte, 32),
),
clockDataCandidateFrameKey(
bytes.Repeat([]byte{0xff}, 32),
1000000,
bytes.Repeat([]byte{0xff}, 32),
bytes.Repeat([]byte{0xff}, 32),
),
true,
); err != nil {
return errors.Wrap(err, "compact")
}
if dataFilter != nil { if dataFilter != nil {
if err := p.db.DeleteRange(
clockDataCandidateFrameKey(
dataFilter,
0,
make([]byte, 32),
make([]byte, 32),
),
clockDataCandidateFrameKey(
dataFilter,
1000000,
bytes.Repeat([]byte{0xff}, 32),
bytes.Repeat([]byte{0xff}, 32),
),
); err != nil {
return errors.Wrap(err, "compact")
}
if err := p.db.Compact( if err := p.db.Compact(
clockDataFrameKey(dataFilter, 0), clockDataFrameKey(dataFilter, 0),
clockDataFrameKey(dataFilter, 1000000), clockDataFrameKey(dataFilter, 1000000),
@ -1767,6 +1426,22 @@ func (p *PebbleClockStore) Compact(
); err != nil { ); err != nil {
return errors.Wrap(err, "compact") return errors.Wrap(err, "compact")
} }
if err := p.db.Compact(
clockDataParentIndexKey(
dataFilter,
0,
make([]byte, 32),
),
clockDataParentIndexKey(
dataFilter,
1000000,
bytes.Repeat([]byte{0xff}, 32),
),
true,
); err != nil {
return errors.Wrap(err, "compact")
}
} }
return nil return nil