ceremonyclient/node/store/clock.go

1650 lines
40 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package store
import (
"bytes"
"encoding/binary"
"math/big"
"sort"
"github.com/cockroachdb/pebble"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
type ClockStore interface {
NewTransaction() (Transaction, error)
GetLatestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetEarliestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetMasterClockFrame(
filter []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error)
RangeMasterClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleMasterClockIterator, error)
PutMasterClockFrame(frame *protobufs.ClockFrame, txn Transaction) error
GetLatestDataClockFrame(
filter []byte,
proverTrie *tries.RollingFrecencyCritbitTrie,
) (*protobufs.ClockFrame, error)
GetEarliestDataClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetDataClockFrame(
filter []byte,
frameNumber uint64,
truncate bool,
) (*protobufs.ClockFrame, *tries.RollingFrecencyCritbitTrie, error)
RangeDataClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleClockIterator, error)
CommitDataClockFrame(
filter []byte,
frameNumber uint64,
selector []byte,
proverTrie *tries.RollingFrecencyCritbitTrie,
txn Transaction,
backfill bool,
) error
StageDataClockFrame(
selector []byte,
frame *protobufs.ClockFrame,
txn Transaction,
) error
GetStagedDataClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
truncate bool,
) (*protobufs.ClockFrame, error)
GetCompressedDataClockFrames(
filter []byte,
fromFrameNumber uint64,
toFrameNumber uint64,
) (*protobufs.CeremonyCompressedSync, error)
SetLatestDataClockFrameNumber(
filter []byte,
frameNumber uint64,
) error
ResetMasterClockFrames(filter []byte) error
ResetDataClockFrames(filter []byte) error
Compact(
dataFilter []byte,
) error
GetTotalDistance(
filter []byte,
frameNumber uint64,
selector []byte,
) (*big.Int, error)
SetTotalDistance(
filter []byte,
frameNumber uint64,
selector []byte,
totalDistance *big.Int,
) error
}
type PebbleClockStore struct {
db KVDB
logger *zap.Logger
}
var _ ClockStore = (*PebbleClockStore)(nil)
type PebbleMasterClockIterator struct {
i Iterator
}
type PebbleClockIterator struct {
i Iterator
db *PebbleClockStore
}
var _ TypedIterator[*protobufs.ClockFrame] = (*PebbleMasterClockIterator)(nil)
var _ TypedIterator[*protobufs.ClockFrame] = (*PebbleClockIterator)(nil)
func (p *PebbleMasterClockIterator) First() bool {
return p.i.First()
}
func (p *PebbleMasterClockIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleMasterClockIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleMasterClockIterator) Value() (*protobufs.ClockFrame, error) {
if !p.i.Valid() {
return nil, ErrNotFound
}
key := p.i.Key()
value := p.i.Value()
frame := &protobufs.ClockFrame{}
frameNumber, filter, err := extractFrameNumberAndFilterFromMasterFrameKey(key)
if err != nil {
return nil, errors.Wrap(err, "get master clock frame iterator value")
}
frame.FrameNumber = frameNumber
frame.Filter = make([]byte, len(filter))
copy(frame.Filter, filter)
if len(value) < 521 {
return nil, errors.Wrap(
ErrInvalidData,
"get master clock frame iterator value",
)
}
copied := make([]byte, len(value))
copy(copied, value)
frame.Difficulty = binary.BigEndian.Uint32(copied[:4])
frame.Input = copied[4 : len(copied)-516]
frame.Output = copied[len(copied)-516:]
previousSelectorBytes := [516]byte{}
copy(previousSelectorBytes[:], frame.Input[:516])
parent, err := poseidon.HashBytes(previousSelectorBytes[:])
if err != nil {
return nil, errors.Wrap(err, "get master clock frame iterator value")
}
frame.ParentSelector = parent.FillBytes(make([]byte, 32))
return frame, nil
}
func (p *PebbleMasterClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing master clock iterator")
}
func (p *PebbleClockIterator) First() bool {
return p.i.First()
}
func (p *PebbleClockIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleClockIterator) Prev() bool {
return p.i.Prev()
}
func (p *PebbleClockIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleClockIterator) TruncatedValue() (
*protobufs.ClockFrame,
error,
) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.ClockFrame{}
if len(value) == (len(p.i.Key()) + 32) {
frameValue, frameCloser, err := p.db.db.Get(value)
if err != nil {
return nil, errors.Wrap(err, "get truncated clock frame iterator value")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get truncated clock frame iterator value",
)
}
frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get truncated clock frame iterator value",
)
}
}
return frame, nil
}
func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.ClockFrame{}
genesisFramePreIndex := false
// We do a bit of a cheap trick here while things are still stuck in the old
// ways: we use the size of the parent index key to determine if it's the new
// format, or the old raw frame
if len(value) == (len(p.i.Key()) + 32) {
frameValue, frameCloser, err := p.db.db.Get(value)
if err != nil {
return nil, errors.Wrap(err, "get clock frame iterator value")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
defer frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
genesisFramePreIndex = frame.FrameNumber == 0
}
if err := p.db.fillAggregateProofs(frame, genesisFramePreIndex); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
return frame, nil
}
func (p *PebbleClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing clock frame iterator")
}
func NewPebbleClockStore(db KVDB, logger *zap.Logger) *PebbleClockStore {
return &PebbleClockStore{
db,
logger,
}
}
const CLOCK_FRAME = 0x00
const CLOCK_MASTER_FRAME_DATA = 0x00
const CLOCK_DATA_FRAME_DATA = 0x01
const CLOCK_DATA_FRAME_CANDIDATE_DATA = 0x02
const CLOCK_DATA_FRAME_FRECENCY_DATA = 0x03
const CLOCK_DATA_FRAME_DISTANCE_DATA = 0x04
const CLOCK_COMPACTION_DATA = 0x05
const CLOCK_MASTER_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_MASTER_FRAME_DATA
const CLOCK_MASTER_FRAME_INDEX_LATEST = 0x20 | CLOCK_MASTER_FRAME_DATA
const CLOCK_MASTER_FRAME_INDEX_PARENT = 0x30 | CLOCK_MASTER_FRAME_DATA
const CLOCK_DATA_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_DATA_FRAME_DATA
const CLOCK_DATA_FRAME_INDEX_LATEST = 0x20 | CLOCK_DATA_FRAME_DATA
const CLOCK_DATA_FRAME_INDEX_PARENT = 0x30 | CLOCK_DATA_FRAME_DATA
const CLOCK_DATA_FRAME_CANDIDATE_INDEX_LATEST = 0x20 |
CLOCK_DATA_FRAME_CANDIDATE_DATA
//
// DB Keys
//
// Keys are structured as:
// <core type><sub type | index>[<non-index increment>]<segment>
// Increment necessarily must be full width elsewise the frame number would
// easily produce conflicts if filters are stepped by byte:
// 0x01 || 0xffff == 0x01ff || 0xff
//
// Master frames are serialized as output data only, Data frames are raw
// protobufs for fast disk-to-network output.
func clockFrameKey(filter []byte, frameNumber uint64, frameType byte) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
return key
}
func clockMasterFrameKey(filter []byte, frameNumber uint64) []byte {
return clockFrameKey(filter, frameNumber, CLOCK_MASTER_FRAME_DATA)
}
func extractFrameNumberAndFilterFromMasterFrameKey(
key []byte,
) (uint64, []byte, error) {
if len(key) < 11 {
return 0, nil, errors.Wrap(
ErrInvalidData,
"extract frame number and filter from master frame key",
)
}
copied := make([]byte, len(key))
copy(copied, key)
return binary.BigEndian.Uint64(copied[2:10]), copied[10:], nil
}
func clockDataFrameKey(
filter []byte,
frameNumber uint64,
) []byte {
return clockFrameKey(filter, frameNumber, CLOCK_DATA_FRAME_DATA)
}
func clockLatestIndex(filter []byte, frameType byte) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = append(key, filter...)
return key
}
func clockMasterLatestIndex(filter []byte) []byte {
return clockLatestIndex(filter, CLOCK_MASTER_FRAME_INDEX_LATEST)
}
func clockDataLatestIndex(filter []byte) []byte {
return clockLatestIndex(filter, CLOCK_DATA_FRAME_INDEX_LATEST)
}
func clockDataCandidateLatestIndex(filter []byte) []byte {
return clockLatestIndex(filter, CLOCK_DATA_FRAME_CANDIDATE_INDEX_LATEST)
}
func clockEarliestIndex(filter []byte, frameType byte) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = append(key, filter...)
return key
}
func clockMasterEarliestIndex(filter []byte) []byte {
return clockEarliestIndex(filter, CLOCK_MASTER_FRAME_INDEX_EARLIEST)
}
func clockDataEarliestIndex(filter []byte) []byte {
return clockEarliestIndex(filter, CLOCK_DATA_FRAME_INDEX_EARLIEST)
}
// Produces an index key of size: len(filter) + 42
func clockParentIndexKey(
filter []byte,
frameNumber uint64,
selector []byte,
frameType byte,
) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
key = append(key, rightAlign(selector, 32)...)
return key
}
func clockDataParentIndexKey(
filter []byte,
frameNumber uint64,
selector []byte,
) []byte {
return clockParentIndexKey(
filter,
frameNumber,
rightAlign(selector, 32),
CLOCK_DATA_FRAME_INDEX_PARENT,
)
}
func clockDataCandidateFrameKey(
filter []byte,
frameNumber uint64,
parent []byte,
distance []byte,
) []byte {
key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_CANDIDATE_DATA}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
key = append(key, rightAlign(parent, 32)...)
key = append(key, rightAlign(distance, 32)...)
return key
}
func clockProverTrieKey(filter []byte, frameNumber uint64) []byte {
key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_FRECENCY_DATA}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
return key
}
func clockDataTotalDistanceKey(
filter []byte,
frameNumber uint64,
selector []byte,
) []byte {
key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_DISTANCE_DATA}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
key = append(key, rightAlign(selector, 32)...)
return key
}
func (p *PebbleClockStore) NewTransaction() (Transaction, error) {
return p.db.NewBatch(), nil
}
// GetEarliestMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) GetEarliestMasterClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockMasterEarliestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get earliest master clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetMasterClockFrame(filter, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get earliest master clock frame")
}
return frame, nil
}
// GetLatestMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) GetLatestMasterClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockMasterLatestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest master clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetMasterClockFrame(filter, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get latest master clock frame")
}
return frame, nil
}
// GetMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) GetMasterClockFrame(
filter []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error) {
value, closer, err := p.db.Get(clockMasterFrameKey(filter, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get master clock frame")
}
copied := make([]byte, len(value))
copy(copied[:], value[:])
defer closer.Close()
frame := &protobufs.ClockFrame{}
frame.FrameNumber = frameNumber
frame.Filter = filter
frame.Difficulty = binary.BigEndian.Uint32(copied[:4])
frame.Input = copied[4 : len(copied)-516]
frame.Output = copied[len(copied)-516:]
previousSelectorBytes := [516]byte{}
copy(previousSelectorBytes[:], frame.Input[:516])
parent, err := poseidon.HashBytes(previousSelectorBytes[:])
if err != nil {
return nil, errors.Wrap(err, "get master clock frame")
}
frame.ParentSelector = parent.FillBytes(make([]byte, 32))
return frame, nil
}
// RangeMasterClockFrames implements ClockStore.
func (p *PebbleClockStore) RangeMasterClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleMasterClockIterator, error) {
if startFrameNumber > endFrameNumber {
temp := endFrameNumber
endFrameNumber = startFrameNumber
startFrameNumber = temp
}
iter, err := p.db.NewIter(
clockMasterFrameKey(filter, startFrameNumber),
clockMasterFrameKey(filter, endFrameNumber),
)
if err != nil {
return nil, errors.Wrap(err, "range master clock frames")
}
return &PebbleMasterClockIterator{i: iter}, nil
}
// PutMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) PutMasterClockFrame(
frame *protobufs.ClockFrame,
txn Transaction,
) error {
data := binary.BigEndian.AppendUint32([]byte{}, frame.Difficulty)
data = append(data, frame.Input...)
data = append(data, frame.Output...)
frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber)
if err := txn.Set(
clockMasterFrameKey(frame.Filter, frame.FrameNumber),
data,
); err != nil {
return errors.Wrap(err, "put master clock frame")
}
_, closer, err := p.db.Get(clockMasterEarliestIndex(frame.Filter))
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "put master clock frame")
}
if err = txn.Set(
clockMasterEarliestIndex(frame.Filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put master clock frame")
}
}
if err == nil && closer != nil {
closer.Close()
}
if err = txn.Set(
clockMasterLatestIndex(frame.Filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put master clock frame")
}
return nil
}
// GetDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetDataClockFrame(
filter []byte,
frameNumber uint64,
truncate bool,
) (*protobufs.ClockFrame, *tries.RollingFrecencyCritbitTrie, error) {
value, closer, err := p.db.Get(clockDataFrameKey(filter, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, nil, ErrNotFound
}
return nil, nil, errors.Wrap(err, "get data clock frame")
}
frame := &protobufs.ClockFrame{}
genesisFramePreIndex := false
// We do a bit of a cheap trick here while things are still stuck in the old
// ways: we use the size of the parent index key to determine if it's the new
// format, or the old raw frame
if len(value) == (len(filter) + 42) {
frameValue, frameCloser, err := p.db.Get(value)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, nil, ErrNotFound
}
return nil, nil, errors.Wrap(err, "get data clock frame")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
closer.Close()
defer frameCloser.Close()
} else {
genesisFramePreIndex = frameNumber == 0
if err := proto.Unmarshal(value, frame); err != nil {
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
defer closer.Close()
}
if !truncate {
if err = p.fillAggregateProofs(frame, genesisFramePreIndex); err != nil {
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
proverTrie := &tries.RollingFrecencyCritbitTrie{}
trieData, closer, err := p.db.Get(clockProverTrieKey(filter, frameNumber))
if err != nil {
return nil, nil, errors.Wrap(err, "get latest data clock frame")
}
defer closer.Close()
if err := proverTrie.Deserialize(trieData); err != nil {
return nil, nil, errors.Wrap(err, "get latest data clock frame")
}
return frame, proverTrie, nil
}
return frame, nil, nil
}
func (p *PebbleClockStore) fillAggregateProofs(
frame *protobufs.ClockFrame,
genesisFramePreIndex bool,
) error {
if frame.FrameNumber == 0 && genesisFramePreIndex {
return nil
}
for i := 0; i < len(frame.Input[516:])/74; i++ {
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
ap, err := internalGetAggregateProof(
p.db,
frame.Filter,
commit,
frame.FrameNumber,
)
if err != nil {
return err
}
frame.AggregateProofs = append(frame.AggregateProofs, ap)
}
return nil
}
func (p *PebbleClockStore) saveAggregateProofs(
txn Transaction,
frame *protobufs.ClockFrame,
) error {
shouldClose := false
if txn == nil {
var err error
txn, err = p.NewTransaction()
if err != nil {
return err
}
shouldClose = true
}
for i := 0; i < len(frame.Input[516:])/74; i++ {
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
err := internalPutAggregateProof(
p.db,
txn,
frame.AggregateProofs[i],
commit,
)
if err != nil {
if err = txn.Abort(); err != nil {
return err
}
}
}
if shouldClose {
txn.Commit()
}
return nil
}
// GetEarliestDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetEarliestDataClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockDataEarliestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get earliest data clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, _, err := p.GetDataClockFrame(filter, frameNumber, false)
if err != nil {
return nil, errors.Wrap(err, "get earliest data clock frame")
}
return frame, nil
}
// GetLatestDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetLatestDataClockFrame(
filter []byte,
proverTrie *tries.RollingFrecencyCritbitTrie,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockDataLatestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest data clock frame")
}
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, _, err := p.GetDataClockFrame(filter, frameNumber, false)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest data clock frame")
}
closer.Close()
if proverTrie != nil {
trieData, closer, err := p.db.Get(clockProverTrieKey(filter, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest data clock frame")
}
defer closer.Close()
if err := proverTrie.Deserialize(trieData); err != nil {
return nil, errors.Wrap(err, "get latest data clock frame")
}
}
return frame, nil
}
// GetStagedDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetStagedDataClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
truncate bool,
) (*protobufs.ClockFrame, error) {
data, closer, err := p.db.Get(
clockDataParentIndexKey(filter, frameNumber, parentSelector),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, errors.Wrap(ErrNotFound, "get parent data clock frame")
}
return nil, errors.Wrap(err, "get parent data clock frame")
}
parent := &protobufs.ClockFrame{}
if err := proto.Unmarshal(data, parent); err != nil {
return nil, errors.Wrap(err, "get parent data clock frame")
}
if !truncate {
if err := p.fillAggregateProofs(parent, false); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
}
if closer != nil {
closer.Close()
}
return parent, nil
}
// StageDataClockFrame implements ClockStore.
func (p *PebbleClockStore) StageDataClockFrame(
selector []byte,
frame *protobufs.ClockFrame,
txn Transaction,
) error {
if err := p.saveAggregateProofs(txn, frame); err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"stage data clock frame",
)
}
temp := append(
[]*protobufs.InclusionAggregateProof{},
frame.AggregateProofs...,
)
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
data, err := proto.Marshal(frame)
if err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"stage data clock frame",
)
}
frame.AggregateProofs = temp
if err = txn.Set(
clockDataParentIndexKey(
frame.Filter,
frame.FrameNumber,
selector,
),
data,
); err != nil {
return errors.Wrap(err, "stage data clock frame")
}
return nil
}
// CommitDataClockFrame implements ClockStore.
func (p *PebbleClockStore) CommitDataClockFrame(
filter []byte,
frameNumber uint64,
selector []byte,
proverTrie *tries.RollingFrecencyCritbitTrie,
txn Transaction,
backfill bool,
) error {
frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, frameNumber)
if err := txn.Set(
clockDataFrameKey(filter, frameNumber),
clockDataParentIndexKey(filter, frameNumber, selector),
); err != nil {
return errors.Wrap(err, "commit data clock frame")
}
proverData, err := proverTrie.Serialize()
if err != nil {
return errors.Wrap(err, "commit data clock frame")
}
if err = txn.Set(
clockProverTrieKey(filter, frameNumber),
proverData,
); err != nil {
return errors.Wrap(err, "commit data clock frame")
}
_, closer, err := p.db.Get(clockDataEarliestIndex(filter))
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "commit data clock frame")
}
if err = txn.Set(
clockDataEarliestIndex(filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "commit data clock frame")
}
}
if err == nil && closer != nil {
closer.Close()
}
if !backfill {
if err = txn.Set(
clockDataLatestIndex(filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "commit data clock frame")
}
}
return nil
}
// RangeDataClockFrames implements ClockStore.
func (p *PebbleClockStore) RangeDataClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleClockIterator, error) {
if startFrameNumber > endFrameNumber {
temp := endFrameNumber
endFrameNumber = startFrameNumber
startFrameNumber = temp
}
iter, err := p.db.NewIter(
clockDataFrameKey(filter, startFrameNumber),
clockDataFrameKey(filter, endFrameNumber),
)
if err != nil {
return nil, errors.Wrap(err, "get data clock frames")
}
return &PebbleClockIterator{i: iter, db: p}, nil
}
func (p *PebbleClockStore) GetCompressedDataClockFrames(
filter []byte,
fromFrameNumber uint64,
toFrameNumber uint64,
) (*protobufs.CeremonyCompressedSync, error) {
from := clockDataFrameKey(filter, fromFrameNumber)
to := clockDataFrameKey(filter, toFrameNumber+1)
iter, err := p.db.NewIter(from, to)
if err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
syncMessage := &protobufs.CeremonyCompressedSync{}
proofs := map[string]*protobufs.InclusionProofsMap{}
commits := map[string]*protobufs.InclusionCommitmentsMap{}
segments := map[string]*protobufs.InclusionSegmentsMap{}
for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value()
frame := &protobufs.ClockFrame{}
genesisFramePreIndex := false
// We do a bit of a cheap trick here while things are still stuck in the old
// ways: we use the size of the parent index key to determine if it's the
// new format, or the old raw frame
if len(value) == (len(filter) + 42) {
frameValue, frameCloser, err := p.db.Get(value)
if err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get compressed data clock frames",
)
}
frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get compressed data clock frames",
)
}
genesisFramePreIndex = frame.FrameNumber == 0
}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
syncMessage.TruncatedClockFrames = append(
syncMessage.TruncatedClockFrames,
frame,
)
if frame.FrameNumber == 0 && genesisFramePreIndex {
continue
}
for i := 0; i < len(frame.Input[516:])/74; i++ {
aggregateCommit := frame.Input[516+(i*74) : 516+((i+1)*74)]
if _, ok := proofs[string(aggregateCommit)]; !ok {
proofs[string(aggregateCommit)] = &protobufs.InclusionProofsMap{
FrameCommit: aggregateCommit,
}
}
}
}
if err := iter.Close(); err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
for k, v := range proofs {
k := k
v := v
value, closer, err := p.db.Get(dataProofMetadataKey(filter, []byte(k)))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get compressed data clock frames")
}
copied := make([]byte, len(value[8:]))
limit := binary.BigEndian.Uint64(value[0:8])
copy(copied, value[8:])
v.Proof = copied
if err = closer.Close(); err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
iter, err := p.db.NewIter(
dataProofInclusionKey(filter, []byte(k), 0),
dataProofInclusionKey(filter, []byte(k), limit+1),
)
if err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
for iter.First(); iter.Valid(); iter.Next() {
incCommit := iter.Value()
urlLength := binary.BigEndian.Uint16(incCommit[:2])
commitLength := binary.BigEndian.Uint16(incCommit[2:4])
url := make([]byte, urlLength)
copy(url, incCommit[4:urlLength+4])
commit := make([]byte, commitLength)
copy(commit, incCommit[urlLength+4:urlLength+4+commitLength])
remainder := int(urlLength + 4 + commitLength)
if _, ok := commits[string(commit)]; !ok {
commits[string(commit)] = &protobufs.InclusionCommitmentsMap{
Commitment: commit,
TypeUrl: string(url),
}
for j := 0; j < (len(incCommit)-remainder)/32; j++ {
start := remainder + (j * 32)
end := remainder + ((j + 1) * 32)
hash := make([]byte, len(incCommit[start:end]))
copy(hash, incCommit[start:end])
commits[string(commit)].SegmentHashes = append(
commits[string(commit)].SegmentHashes,
hash,
)
if _, ok := segments[string(hash)]; !ok {
segValue, dataCloser, err := p.db.Get(
dataProofSegmentKey(filter, incCommit[start:end]),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
// If we've lost this key it means we're in a corrupted state
return nil, ErrInvalidData
}
return nil, errors.Wrap(err, "get compressed data clock frames")
}
segCopy := make([]byte, len(segValue))
copy(segCopy, segValue)
segments[string(hash)] = &protobufs.InclusionSegmentsMap{
Hash: hash,
Data: segCopy,
}
syncMessage.Segments = append(
syncMessage.Segments,
segments[string(hash)],
)
if err = dataCloser.Close(); err != nil {
return nil, errors.Wrap(err, "get compressed data clock frames")
}
}
}
}
v.Commitments = append(v.Commitments, commits[string(commit)])
}
syncMessage.Proofs = append(
syncMessage.Proofs,
v,
)
if len(syncMessage.TruncatedClockFrames) > 0 {
frames := syncMessage.TruncatedClockFrames
syncMessage.FromFrameNumber = frames[0].FrameNumber
syncMessage.ToFrameNumber = frames[len(frames)-1].FrameNumber
}
if err = iter.Close(); err != nil {
return nil, errors.Wrap(err, "get aggregate proof")
}
}
return syncMessage, nil
}
func (p *PebbleClockStore) SetLatestDataClockFrameNumber(
filter []byte,
frameNumber uint64,
) error {
err := p.db.Set(
clockDataLatestIndex(filter),
binary.BigEndian.AppendUint64(nil, frameNumber),
)
return errors.Wrap(err, "set latest data clock frame number")
}
func (p *PebbleClockStore) DeleteDataClockFrameRange(
filter []byte,
fromFrameNumber uint64,
toFrameNumber uint64,
) error {
err := p.db.DeleteRange(
clockDataFrameKey(
filter,
fromFrameNumber,
),
clockDataFrameKey(
filter,
toFrameNumber,
),
)
return errors.Wrap(err, "delete data clock frame range")
}
func (p *PebbleClockStore) ResetMasterClockFrames(filter []byte) error {
if err := p.db.DeleteRange(
clockMasterFrameKey(filter, 0),
clockMasterFrameKey(filter, 200000),
); err != nil {
return errors.Wrap(err, "reset master clock frames")
}
if err := p.db.Delete(clockMasterEarliestIndex(filter)); err != nil {
return errors.Wrap(err, "reset master clock frames")
}
if err := p.db.Delete(clockMasterLatestIndex(filter)); err != nil {
return errors.Wrap(err, "reset master clock frames")
}
return nil
}
func (p *PebbleClockStore) ResetDataClockFrames(filter []byte) error {
if err := p.db.DeleteRange(
clockDataFrameKey(filter, 0),
clockDataFrameKey(filter, 200000),
); err != nil {
return errors.Wrap(err, "reset data clock frames")
}
if err := p.db.Delete(clockDataEarliestIndex(filter)); err != nil {
return errors.Wrap(err, "reset data clock frames")
}
if err := p.db.Delete(clockDataLatestIndex(filter)); err != nil {
return errors.Wrap(err, "reset data clock frames")
}
return nil
}
func (p *PebbleClockStore) Compact(
dataFilter []byte,
) error {
version, closer, err := p.db.Get([]byte{CLOCK_COMPACTION_DATA})
cleared := true
if err != nil {
cleared = false
} else {
if bytes.Compare(version, config.GetVersion()) < 0 {
cleared = false
}
closer.Close()
}
if !cleared {
// If this node has been around since the early days, this is going to free
// up a lot of cruft.
if err := p.db.DeleteRange(
clockDataCandidateFrameKey(
make([]byte, 32),
0,
make([]byte, 32),
make([]byte, 32),
),
clockDataCandidateFrameKey(
bytes.Repeat([]byte{0xff}, 32),
1000000,
bytes.Repeat([]byte{0xff}, 32),
bytes.Repeat([]byte{0xff}, 32),
),
); err != nil {
return errors.Wrap(err, "compact")
}
if err := p.db.Compact(
clockDataCandidateFrameKey(
make([]byte, 32),
0,
make([]byte, 32),
make([]byte, 32),
),
clockDataCandidateFrameKey(
bytes.Repeat([]byte{0xff}, 32),
1000000,
bytes.Repeat([]byte{0xff}, 32),
bytes.Repeat([]byte{0xff}, 32),
),
true,
); err != nil {
return errors.Wrap(err, "compact")
}
if err := p.db.DeleteRange(
clockDataParentIndexKey(
make([]byte, 32),
0,
make([]byte, 32),
),
clockDataParentIndexKey(
bytes.Repeat([]byte{0xff}, 32),
0,
bytes.Repeat([]byte{0xff}, 32),
),
); err != nil {
return errors.Wrap(err, "compact")
}
}
if dataFilter != nil && !cleared {
parents := [][]byte{}
proofs := map[string]struct{}{}
commits := map[string]struct{}{}
data := map[string]struct{}{}
idxValue, closer, err := p.db.Get(clockDataLatestIndex(dataFilter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return ErrNotFound
}
return errors.Wrap(err, "compact")
}
last := binary.BigEndian.Uint64(idxValue)
closer.Close()
for frameNumber := uint64(1); frameNumber <= last; frameNumber++ {
value, closer, err := p.db.Get(clockDataFrameKey(dataFilter, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return ErrNotFound
}
return errors.Wrap(err, "compact")
}
frame := &protobufs.ClockFrame{}
if len(value) == (len(dataFilter) + 42) {
frameValue, frameCloser, err := p.db.Get(value)
if err != nil {
return errors.Wrap(err, "compact")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return errors.Wrap(err, "compact")
}
selector, err := frame.GetSelector()
if err != nil {
panic(err)
}
parents = append(parents,
clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes(
make([]byte, 32),
)),
)
closer.Close()
frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return errors.Wrap(err, "compact")
}
selector, err := frame.GetSelector()
if err != nil {
panic(err)
}
err = p.db.Set(
clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes(
make([]byte, 32),
)),
value,
)
if err != nil {
return errors.Wrap(err, "compact")
}
err = p.db.Set(
clockDataFrameKey(dataFilter, frameNumber),
clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes(
make([]byte, 32),
)),
)
parents = append(parents,
clockDataParentIndexKey(dataFilter, frameNumber, selector.FillBytes(
make([]byte, 32),
)),
)
closer.Close()
}
for i := 0; i < len(frame.Input[516:])/74; i++ {
p.logger.Info(
"preparing indexes for frame compaction",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("max_frame_number", last),
)
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
frameProofs, frameCommits, frameData, err :=
internalListAggregateProofKeys(
p.db,
dataFilter,
commit,
frameNumber,
)
if err != nil {
return errors.Wrap(err, "compact")
}
for _, proof := range frameProofs {
proof := proof
proofs[string(proof)] = struct{}{}
}
for _, comm := range frameCommits {
comm := comm
commits[string(comm)] = struct{}{}
}
for _, d := range frameData {
d := d
data[string(d)] = struct{}{}
}
}
}
p.logger.Info("sorting indexes for bulk clear")
sortedProofKeys := [][]byte{}
for k := range proofs {
k := k
sortedProofKeys = append(sortedProofKeys, []byte(k))
}
proofs = nil
sort.Slice(sortedProofKeys, func(i, j int) bool {
return bytes.Compare(sortedProofKeys[i], sortedProofKeys[j]) < 0
})
sortedCommitKeys := [][]byte{}
for k := range commits {
k := k
sortedCommitKeys = append(sortedCommitKeys, []byte(k))
}
commits = nil
sort.Slice(sortedCommitKeys, func(i, j int) bool {
return bytes.Compare(sortedCommitKeys[i], sortedCommitKeys[j]) < 0
})
sortedDataKeys := [][]byte{}
for k := range data {
k := k
sortedDataKeys = append(sortedDataKeys, []byte(k))
}
data = nil
sort.Slice(sortedDataKeys, func(i, j int) bool {
return bytes.Compare(sortedDataKeys[i], sortedDataKeys[j]) < 0
})
for i := uint64(0); i < uint64(len(parents)); i++ {
p.logger.Info(
"clearing orphaned frames for frame number",
zap.Uint64("frame_number", i+1),
zap.Int("max_frame_number", len(parents)),
)
pre := clockDataParentIndexKey(
dataFilter,
i+1,
bytes.Repeat([]byte{0x00}, 32),
)
err := p.db.DeleteRange(
pre,
parents[i],
)
if err != nil {
return errors.Wrap(err, "compact")
}
start := new(big.Int).SetBytes(parents[i])
start.Add(start, big.NewInt(1))
startBytes := start.FillBytes(make([]byte, len(parents[i])))
post := clockDataParentIndexKey(
dataFilter,
i+1,
bytes.Repeat([]byte{0xff}, 32),
)
err = p.db.DeleteRange(
startBytes,
post,
)
if err != nil {
return errors.Wrap(err, "compact")
}
}
for i := -1; i < len(sortedProofKeys); i++ {
p.logger.Info(
"clearing orphaned proof metadata",
zap.Int("proof_range_index", i+1),
zap.Int("max_proof_range_index", len(sortedProofKeys)),
)
var start, end []byte
if i == -1 {
start = dataProofMetadataKey(
dataFilter,
bytes.Repeat([]byte{0x00}, 74),
)
} else {
startBI := new(big.Int).SetBytes(sortedProofKeys[i])
startBI.Add(startBI, big.NewInt(1))
start = startBI.FillBytes(make([]byte, len(sortedProofKeys[i])))
}
if i == len(sortedProofKeys)-1 {
end = dataProofMetadataKey(
dataFilter,
bytes.Repeat([]byte{0xff}, 74),
)
} else {
end = sortedProofKeys[i+1]
}
err := p.db.DeleteRange(
start,
end,
)
if err != nil {
return errors.Wrap(err, "compact")
}
}
for i := -1; i < len(sortedCommitKeys); i++ {
p.logger.Info(
"clearing orphaned commits",
zap.Int("commit_range_index", i+1),
zap.Int("max_commit_range_index", len(sortedProofKeys)),
)
var start, end []byte
if i == -1 {
start = dataProofInclusionKey(
dataFilter,
bytes.Repeat([]byte{0x00}, 74),
0,
)
} else {
start = make([]byte, len(sortedCommitKeys[i]))
copy(start[:], sortedCommitKeys[i][:])
start[74] = 0xff
start[75] = 0xff
start[76] = 0xff
start[77] = 0xff
start[78] = 0xff
start[79] = 0xff
start[80] = 0xff
start[81] = 0xff
}
if i == len(sortedCommitKeys)-1 {
end = dataProofInclusionKey(
dataFilter,
bytes.Repeat([]byte{0xff}, 74),
0xffffffffffffffff,
)
} else {
end = sortedCommitKeys[i+1]
}
err := p.db.DeleteRange(
start,
end,
)
if err != nil {
return errors.Wrap(err, "compact")
}
}
for i := -1; i < len(sortedDataKeys); i++ {
p.logger.Info(
"clearing orphaned data",
zap.Int("data_range_index", i+1),
zap.Int("max_data_range_index", len(sortedProofKeys)),
)
var start, end []byte
if i == -1 {
start = dataProofSegmentKey(
dataFilter,
bytes.Repeat([]byte{0x00}, 32),
)
} else {
startBI := new(big.Int).SetBytes(sortedDataKeys[i])
startBI.Add(startBI, big.NewInt(1))
start = startBI.FillBytes(make([]byte, len(sortedDataKeys[i])))
}
if i == len(sortedDataKeys)-1 {
end = dataProofSegmentKey(
dataFilter,
bytes.Repeat([]byte{0xff}, 32),
)
} else {
end = sortedDataKeys[i+1]
}
err := p.db.DeleteRange(
start,
end,
)
if err != nil {
return errors.Wrap(err, "compact")
}
}
if err := p.db.DeleteRange(
clockDataCandidateFrameKey(
dataFilter,
0,
make([]byte, 32),
make([]byte, 32),
),
clockDataCandidateFrameKey(
dataFilter,
1000000,
bytes.Repeat([]byte{0xff}, 32),
bytes.Repeat([]byte{0xff}, 32),
),
); err != nil {
return errors.Wrap(err, "compact")
}
err = p.db.Set([]byte{CLOCK_COMPACTION_DATA}, config.GetVersion())
if err != nil {
return errors.Wrap(err, "compact")
}
}
return nil
}
func (p *PebbleClockStore) GetTotalDistance(
filter []byte,
frameNumber uint64,
selector []byte,
) (*big.Int, error) {
value, closer, err := p.db.Get(
clockDataTotalDistanceKey(filter, frameNumber, selector),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get total distance")
}
defer closer.Close()
dist := new(big.Int).SetBytes(value)
return dist, nil
}
func (p *PebbleClockStore) SetTotalDistance(
filter []byte,
frameNumber uint64,
selector []byte,
totalDistance *big.Int,
) error {
err := p.db.Set(
clockDataTotalDistanceKey(filter, frameNumber, selector),
totalDistance.Bytes(),
)
return errors.Wrap(err, "set total distance")
}