[QUIL-63] – QOL: Data compaction for ceremony

This commit is contained in:
Cassandra Heart 2023-09-27 04:05:39 -05:00
parent 9d4ffab29b
commit af92be100d
No known key found for this signature in database
GPG Key ID: 6352152859385958
7 changed files with 755 additions and 9 deletions

View File

@ -6,18 +6,25 @@ import (
"os"
"strings"
"github.com/pkg/errors"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/execution/ceremony/application"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
type DBConsole struct {
clockStore store.ClockStore
clockStore store.ClockStore
dataProofStore store.DataProofStore
}
func newDBConsole(
clockStore store.ClockStore,
dataProofStore store.DataProofStore,
) (*DBConsole, error) {
return &DBConsole{
clockStore,
dataProofStore,
}, nil
}
@ -36,7 +43,7 @@ func (c *DBConsole) Run() {
switch cmd {
case "quit":
return
case "show frames":
case "show master frames":
earliestFrame, err := c.clockStore.GetEarliestMasterClockFrame([]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
@ -102,6 +109,82 @@ func (c *DBConsole) Run() {
)
}
if err := iter.Close(); err != nil {
panic(err)
}
case "show ceremony frames":
earliestFrame, err := c.clockStore.GetEarliestDataClockFrame(
application.CEREMONY_ADDRESS,
)
if err != nil {
panic(errors.Wrap(err, "earliest"))
}
latestFrame, err := c.clockStore.GetLatestDataClockFrame(
application.CEREMONY_ADDRESS,
nil,
)
if err != nil {
panic(errors.Wrap(err, "latest"))
}
fmt.Printf(
"earliest: %d, latest: %d\n",
earliestFrame.FrameNumber,
latestFrame.FrameNumber,
)
fmt.Printf(
"Genesis Frame:\n\tVDF Proof: %x\n",
earliestFrame.Input[:516],
)
iter, err := c.clockStore.RangeDataClockFrames(
application.CEREMONY_ADDRESS,
earliestFrame.FrameNumber+1,
latestFrame.FrameNumber,
)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
value, err := iter.Value()
if err != nil {
panic(err)
}
selector, err := value.GetSelector()
if err != nil {
panic(err)
}
fmt.Printf(
"Frame %d (Selector: %x):\n\tParent: %x\n\tVDF Proof: %x\n",
value.FrameNumber,
selector.Bytes(),
value.ParentSelector,
value.Input[:516],
)
for i := 0; i < len(value.Input[516:])/74; i++ {
commit := value.Input[516+(i*74) : 516+((i+1)*74)]
fmt.Printf(
"\tCommitment %+x\n",
commit,
)
fmt.Printf(
"\t\tType: %s\n",
value.AggregateProofs[i].InclusionCommitments[0].TypeUrl,
)
b, _ := proto.Marshal(value.AggregateProofs[i])
hash := sha3.Sum256(b)
fmt.Printf("\t\tAP Hash: %+x\n", hash)
}
fmt.Println()
}
if err := iter.Close(); err != nil {
panic(err)
}

View File

@ -40,8 +40,10 @@ var storeSet = wire.NewSet(
store.NewPebbleDB,
store.NewPebbleClockStore,
store.NewPebbleKeyStore,
store.NewPebbleDataProofStore,
wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)),
wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)),
wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)),
)
var pubSubSet = wire.NewSet(
@ -87,3 +89,7 @@ func NewNode(*config.Config) (*Node, error) {
func NewDBConsole(*config.Config) (*DBConsole, error) {
panic(wire.Build(loggerSet, storeSet, newDBConsole))
}
func NewClockStore(*config.Config) (store.ClockStore, error) {
panic(wire.Build(loggerSet, storeSet))
}

View File

@ -47,13 +47,22 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) {
db := store.NewPebbleDB(dbConfig)
zapLogger := logger()
pebbleClockStore := store.NewPebbleClockStore(db, zapLogger)
dbConsole, err := newDBConsole(pebbleClockStore)
pebbleDataProofStore := store.NewPebbleDataProofStore(db, zapLogger)
dbConsole, err := newDBConsole(pebbleClockStore, pebbleDataProofStore)
if err != nil {
return nil, err
}
return dbConsole, nil
}
func NewClockStore(configConfig *config.Config) (store.ClockStore, error) {
dbConfig := configConfig.DB
db := store.NewPebbleDB(dbConfig)
zapLogger := logger()
pebbleClockStore := store.NewPebbleClockStore(db, zapLogger)
return pebbleClockStore, nil
}
// wire.go:
func logger() *zap.Logger {
@ -71,7 +80,7 @@ var loggerSet = wire.NewSet(
var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)))
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, store.NewPebbleClockStore, store.NewPebbleKeyStore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)))
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, store.NewPebbleClockStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)))
var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)))

View File

@ -16,6 +16,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/app"
"source.quilibrium.com/quilibrium/monorepo/node/config"
qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto"
"source.quilibrium.com/quilibrium/monorepo/node/execution/ceremony/application"
)
var (
@ -62,6 +63,7 @@ func main() {
}
clearIfTestData(*configDirectory, nodeConfig)
migrate(*configDirectory, nodeConfig)
if *dbConsole {
console, err := app.NewDBConsole(nodeConfig)
@ -116,6 +118,36 @@ func clearIfTestData(configDir string, nodeConfig *config.Config) {
}
}
func migrate(configDir string, nodeConfig *config.Config) {
_, err := os.Stat(filepath.Join(configDir, "MIGRATIONS"))
if os.IsNotExist(err) {
fmt.Println("Deduplicating and compressing clock frame data...")
clock, err := app.NewClockStore(nodeConfig)
if err := clock.Deduplicate(application.CEREMONY_ADDRESS); err != nil {
panic(err)
}
migrationFile, err := os.OpenFile(
filepath.Join(configDir, "MIGRATIONS"),
os.O_CREATE|os.O_RDWR,
fs.FileMode(0700),
)
if err != nil {
panic(err)
}
_, err = migrationFile.Write([]byte{0x00, 0x00, 0x01})
if err != nil {
panic(err)
}
err = migrationFile.Close()
if err != nil {
panic(err)
}
}
}
func printPeerID(p2pConfig *config.P2PConfig) {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {

View File

@ -2,6 +2,7 @@ package store
import (
"encoding/binary"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/iden3/go-iden3-crypto/poseidon"
@ -71,6 +72,7 @@ type ClockStore interface {
parent []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error)
Deduplicate(filter []byte) error
}
type PebbleClockStore struct {
@ -85,11 +87,13 @@ type PebbleMasterClockIterator struct {
}
type PebbleClockIterator struct {
i *pebble.Iterator
i *pebble.Iterator
db *PebbleClockStore
}
type PebbleCandidateClockIterator struct {
i *pebble.Iterator
i *pebble.Iterator
db *PebbleClockStore
}
var _ Iterator[*protobufs.ClockFrame] = (*PebbleMasterClockIterator)(nil)
@ -183,6 +187,13 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
)
}
if err := p.db.fillAggregateProofs(frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
return frame, nil
}
@ -216,6 +227,13 @@ func (p *PebbleCandidateClockIterator) Value() (*protobufs.ClockFrame, error) {
)
}
if err := p.db.fillAggregateProofs(frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
return frame, nil
}
@ -542,6 +560,13 @@ func (p *PebbleClockStore) GetDataClockFrame(
)
}
if err = p.fillAggregateProofs(frame); err != nil {
return nil, nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
proverTrie := &tries.RollingFrecencyCritbitTrie{}
trieData, closer, err := p.db.Get(clockProverTrieKey(filter, frameNumber))
@ -558,6 +583,98 @@ func (p *PebbleClockStore) GetDataClockFrame(
return frame, proverTrie, nil
}
func (p *PebbleClockStore) fillAggregateProofs(
frame *protobufs.ClockFrame,
) error {
if frame.FrameNumber == 0 {
return nil
}
for i := 0; i < len(frame.Input[516:])/74; i++ {
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
ap, err := internalGetAggregateProof(
p.db,
frame.Filter,
commit,
frame.FrameNumber,
func(typeUrl string, data [][]byte) ([]byte, error) {
if typeUrl == protobufs.IntrinsicExecutionOutputType {
o := &protobufs.IntrinsicExecutionOutput{}
copiedLeft := make([]byte, len(data[0]))
copiedRight := make([]byte, len(data[1]))
copy(copiedLeft, data[0])
copy(copiedRight, data[1])
o.Address = copiedLeft[:32]
o.Output = copiedLeft[32:]
o.Proof = copiedRight
return proto.Marshal(o)
}
copied := make([]byte, len(data[0]))
copy(copied, data[0])
return copied, nil
},
)
if err != nil {
return err
}
frame.AggregateProofs = append(frame.AggregateProofs, ap)
}
return nil
}
func (p *PebbleClockStore) saveAggregateProofs(
txn Transaction,
frame *protobufs.ClockFrame,
) error {
shouldClose := false
if txn == nil {
var err error
txn, err = p.NewTransaction()
if err != nil {
return err
}
shouldClose = true
}
for i := 0; i < len(frame.Input[516:])/74; i++ {
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
err := internalPutAggregateProof(
p.db,
txn,
frame.AggregateProofs[i],
commit, func(typeUrl string, data []byte) ([][]byte, error) {
if typeUrl == protobufs.IntrinsicExecutionOutputType {
o := &protobufs.IntrinsicExecutionOutput{}
if err := proto.Unmarshal(data, o); err != nil {
return nil, err
}
leftBits := append([]byte{}, o.Address...)
leftBits = append(leftBits, o.Output...)
rightBits := o.Proof
return [][]byte{leftBits, rightBits}, nil
}
return [][]byte{data}, nil
})
if err != nil {
if err = txn.Abort(); err != nil {
return err
}
}
}
if shouldClose {
txn.Commit()
}
return nil
}
// GetEarliestDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetEarliestDataClockFrame(
filter []byte,
@ -656,6 +773,13 @@ func (p *PebbleClockStore) GetParentDataClockFrame(
return nil, errors.Wrap(err, "get parent data clock frame")
}
if err := p.fillAggregateProofs(parent); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
if closer != nil {
closer.Close()
}
@ -671,6 +795,19 @@ func (p *PebbleClockStore) PutCandidateDataClockFrame(
frame *protobufs.ClockFrame,
txn Transaction,
) error {
if err := p.saveAggregateProofs(nil, frame); err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"put candidate data clock frame",
)
}
temp := append(
[]*protobufs.InclusionAggregateProof{},
frame.AggregateProofs...,
)
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
data, err := proto.Marshal(frame)
if err != nil {
return errors.Wrap(
@ -679,6 +816,8 @@ func (p *PebbleClockStore) PutCandidateDataClockFrame(
)
}
frame.AggregateProofs = temp
if err = txn.Set(
clockDataCandidateFrameKey(
frame.Filter,
@ -711,6 +850,22 @@ func (p *PebbleClockStore) PutDataClockFrame(
proverTrie *tries.RollingFrecencyCritbitTrie,
txn Transaction,
) error {
if frame.FrameNumber != 0 {
if err := p.saveAggregateProofs(nil, frame); err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"put candidate data clock frame",
)
}
}
temp := append(
[]*protobufs.InclusionAggregateProof{},
frame.AggregateProofs...,
)
if frame.FrameNumber != 0 {
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
}
data, err := proto.Marshal(frame)
if err != nil {
return errors.Wrap(
@ -719,6 +874,9 @@ func (p *PebbleClockStore) PutDataClockFrame(
)
}
if frame.FrameNumber != 0 {
frame.AggregateProofs = temp
}
frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber)
@ -811,7 +969,7 @@ func (p *PebbleClockStore) GetCandidateDataClockFrames(
})
frames := []*protobufs.ClockFrame{}
i := &PebbleCandidateClockIterator{i: iter}
i := &PebbleCandidateClockIterator{i: iter, db: p}
for i.First(); i.Valid(); i.Next() {
value, err := i.Value()
@ -861,7 +1019,7 @@ func (p *PebbleClockStore) RangeCandidateDataClockFrames(
),
})
return &PebbleCandidateClockIterator{i: iter}, nil
return &PebbleCandidateClockIterator{i: iter, db: p}, nil
}
// RangeDataClockFrames implements ClockStore.
@ -881,5 +1039,184 @@ func (p *PebbleClockStore) RangeDataClockFrames(
UpperBound: clockDataFrameKey(filter, endFrameNumber),
})
return &PebbleClockIterator{i: iter}, nil
return &PebbleClockIterator{i: iter, db: p}, nil
}
// Should only need be run once, before starting
func (p *PebbleClockStore) Deduplicate(filter []byte) error {
from := clockDataParentIndexKey(
filter,
1,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
)
to := clockDataParentIndexKey(
filter,
20000,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
)
iter := p.db.NewIter(&pebble.IterOptions{
LowerBound: from,
UpperBound: to,
})
i := 0
for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return err
}
if err := p.saveAggregateProofs(nil, frame); err != nil {
return err
}
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
newValue, err := proto.Marshal(frame)
if err != nil {
return err
}
err = p.db.Set(iter.Key(), newValue, &pebble.WriteOptions{Sync: true})
if err != nil {
return err
}
i++
if i%100 == 0 {
fmt.Println("Deduplicated 100 parent frames")
}
}
iter.Close()
if err := p.db.Compact(from, to, true); err != nil {
return err
}
from = clockDataFrameKey(filter, 1)
to = clockDataFrameKey(filter, 20000)
iter = p.db.NewIter(&pebble.IterOptions{
LowerBound: from,
UpperBound: to,
})
i = 0
for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return err
}
if err := p.saveAggregateProofs(nil, frame); err != nil {
return err
}
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
newValue, err := proto.Marshal(frame)
if err != nil {
return err
}
err = p.db.Set(iter.Key(), newValue, &pebble.WriteOptions{Sync: true})
if err != nil {
return err
}
i++
if i%100 == 0 {
fmt.Println("Deduplicated 100 data frames")
}
}
iter.Close()
if err := p.db.Compact(from, to, true); err != nil {
return err
}
from = clockDataCandidateFrameKey(
filter,
1,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
)
to = clockDataCandidateFrameKey(
filter,
20000,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
)
iter = p.db.NewIter(&pebble.IterOptions{
LowerBound: from,
UpperBound: to,
})
i = 0
for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return err
}
if err := p.saveAggregateProofs(nil, frame); err != nil {
return err
}
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
newValue, err := proto.Marshal(frame)
if err != nil {
return err
}
err = p.db.Set(iter.Key(), newValue, &pebble.WriteOptions{Sync: true})
if err != nil {
return err
}
i++
if i%100 == 0 {
fmt.Println("Deduplicated 100 candidate frames")
}
}
iter.Close()
if err := p.db.Compact(from, to, true); err != nil {
return err
}
p.db.Close()
return nil
}

274
node/store/data_proof.go Normal file
View File

@ -0,0 +1,274 @@
package store
import (
"encoding/binary"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
type DataProofStore interface {
NewTransaction() (Transaction, error)
GetAggregateProof(
filter []byte,
commitment []byte,
frameNumber uint64,
inclusionReassembler func(typeUrl string, data [][]byte) ([]byte, error),
) (
*protobufs.InclusionAggregateProof,
error,
)
PutAggregateProof(
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
) error
}
type PebbleDataProofStore struct {
db *pebble.DB
logger *zap.Logger
}
func NewPebbleDataProofStore(
db *pebble.DB,
logger *zap.Logger,
) *PebbleDataProofStore {
return &PebbleDataProofStore{
db,
logger,
}
}
const (
DATA_PROOF = 0x04
DATA_PROOF_METADATA = 0x00
DATA_PROOF_INCLUSION = 0x01
DATA_PROOF_SEGMENT = 0x02
)
func dataProofMetadataKey(filter []byte, commitment []byte) []byte {
key := []byte{DATA_PROOF, DATA_PROOF_METADATA}
key = append(key, commitment...)
key = append(key, filter...)
return key
}
func dataProofInclusionKey(
filter []byte,
commitment []byte,
seqNo uint64,
) []byte {
key := []byte{DATA_PROOF, DATA_PROOF_INCLUSION}
key = append(key, commitment...)
key = binary.BigEndian.AppendUint64(key, seqNo)
key = append(key, filter...)
return key
}
func dataProofSegmentKey(
filter []byte,
hash []byte,
) []byte {
key := []byte{DATA_PROOF, DATA_PROOF_SEGMENT}
key = append(key, hash...)
key = append(key, filter...)
return key
}
func (p *PebbleDataProofStore) NewTransaction() (Transaction, error) {
return &PebbleTransaction{
b: p.db.NewBatch(),
}, nil
}
func internalGetAggregateProof(
db *pebble.DB,
filter []byte,
commitment []byte,
frameNumber uint64,
inclusionReassembler func(typeUrl string, data [][]byte) ([]byte, error),
) (*protobufs.InclusionAggregateProof, error) {
value, closer, err := db.Get(dataProofMetadataKey(filter, commitment))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get aggregate proof")
}
defer closer.Close()
copied := make([]byte, len(value[8:]))
limit := binary.BigEndian.Uint64(value[0:8])
copy(copied, value[8:])
aggregate := &protobufs.InclusionAggregateProof{
Filter: filter,
FrameNumber: frameNumber,
InclusionCommitments: []*protobufs.InclusionCommitment{},
Proof: copied,
}
iter := db.NewIter(&pebble.IterOptions{
LowerBound: dataProofInclusionKey(filter, commitment, 0),
UpperBound: dataProofInclusionKey(filter, commitment, limit+1),
})
i := uint32(0)
for iter.First(); iter.Valid(); iter.Next() {
incCommit := iter.Value()
urlLength := binary.BigEndian.Uint16(incCommit[:2])
commitLength := binary.BigEndian.Uint16(incCommit[2:4])
url := make([]byte, urlLength)
copy(url, incCommit[4:urlLength+4])
commit := make([]byte, commitLength)
copy(commit, incCommit[urlLength+4:urlLength+4+commitLength])
remainder := int(urlLength + 4 + commitLength)
inclusionCommitment := &protobufs.InclusionCommitment{
Filter: filter,
FrameNumber: frameNumber,
Position: i,
TypeUrl: string(url),
Commitment: commit,
}
chunks := [][]byte{}
for j := 0; j < (len(incCommit)-remainder)/32; j++ {
start := remainder + (j * 32)
end := remainder + ((j + 1) * 32)
segValue, dataCloser, err := db.Get(
dataProofSegmentKey(filter, incCommit[start:end]),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
// If we've lost this key it means we're in a corrupted state
return nil, ErrInvalidData
}
return nil, errors.Wrap(err, "get aggregate proof")
}
segCopy := make([]byte, len(segValue))
copy(segCopy, segValue)
chunks = append(chunks, segCopy)
if err = dataCloser.Close(); err != nil {
return nil, errors.Wrap(err, "get aggregate proof")
}
}
inclusionCommitment.Data, err = inclusionReassembler(string(url), chunks)
if err != nil {
return nil, errors.Wrap(err, "get aggregate proof")
}
aggregate.InclusionCommitments = append(
aggregate.InclusionCommitments,
inclusionCommitment,
)
i++
}
if err = iter.Close(); err != nil {
return nil, errors.Wrap(err, "get aggregate proof")
}
return aggregate, nil
}
func (p *PebbleDataProofStore) GetAggregateProof(
filter []byte,
commitment []byte,
frameNumber uint64,
inclusionReassembler func(typeUrl string, data [][]byte) ([]byte, error),
) (*protobufs.InclusionAggregateProof, error) {
return internalGetAggregateProof(
p.db,
filter,
commitment,
frameNumber,
inclusionReassembler,
)
}
func internalPutAggregateProof(
db *pebble.DB,
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
) error {
buf := binary.BigEndian.AppendUint64(
nil,
uint64(len(aggregateProof.InclusionCommitments)),
)
buf = append(buf, aggregateProof.Proof...)
for i, inc := range aggregateProof.InclusionCommitments {
segments, err := inclusionSplitter(inc.TypeUrl, inc.Data)
if err != nil {
return errors.Wrap(err, "get aggregate proof")
}
urlLength := len(inc.TypeUrl)
commitLength := len(inc.Commitment)
encoded := binary.BigEndian.AppendUint16(nil, uint16(urlLength))
encoded = binary.BigEndian.AppendUint16(encoded, uint16(commitLength))
encoded = append(encoded, []byte(inc.TypeUrl)...)
encoded = append(encoded, inc.Commitment...)
for _, segment := range segments {
hash := sha3.Sum256(segment)
if err = txn.Set(
dataProofSegmentKey(aggregateProof.Filter, hash[:]),
segment,
); err != nil {
return errors.Wrap(err, "put aggregate proof")
}
encoded = append(encoded, hash[:]...)
}
if err = txn.Set(
dataProofInclusionKey(aggregateProof.Filter, commitment, uint64(i)),
encoded,
); err != nil {
return errors.Wrap(err, "put aggregate proof")
}
}
if err := txn.Set(
dataProofMetadataKey(aggregateProof.Filter, commitment),
buf,
); err != nil {
return errors.Wrap(err, "put aggregate proof")
}
return nil
}
func (p *PebbleDataProofStore) PutAggregateProof(
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
) error {
return internalPutAggregateProof(
p.db,
txn,
aggregateProof,
commitment,
inclusionSplitter,
)
}

View File

@ -18,6 +18,7 @@ type Transaction interface {
Set(key []byte, value []byte) error
Commit() error
Delete(key []byte) error
Abort() error
}
type PebbleTransaction struct {
@ -36,6 +37,10 @@ func (t *PebbleTransaction) Delete(key []byte) error {
return t.b.Delete(key, &pebble.WriteOptions{Sync: true})
}
func (t *PebbleTransaction) Abort() error {
return t.b.Close()
}
var _ Transaction = (*PebbleTransaction)(nil)
func rightAlign(data []byte, size int) []byte {