mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-23 14:15:18 +00:00
Merge pull request #20 from QuilibriumNetwork/QUIL-63-data-compaction
[QUIL-63] – QOL: Data compaction for ceremony
This commit is contained in:
commit
0274332cdc
@ -6,18 +6,25 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/crypto/sha3"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/execution/ceremony/application"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/store"
|
||||
)
|
||||
|
||||
type DBConsole struct {
|
||||
clockStore store.ClockStore
|
||||
clockStore store.ClockStore
|
||||
dataProofStore store.DataProofStore
|
||||
}
|
||||
|
||||
func newDBConsole(
|
||||
clockStore store.ClockStore,
|
||||
dataProofStore store.DataProofStore,
|
||||
) (*DBConsole, error) {
|
||||
return &DBConsole{
|
||||
clockStore,
|
||||
dataProofStore,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -36,7 +43,7 @@ func (c *DBConsole) Run() {
|
||||
switch cmd {
|
||||
case "quit":
|
||||
return
|
||||
case "show frames":
|
||||
case "show master frames":
|
||||
earliestFrame, err := c.clockStore.GetEarliestMasterClockFrame([]byte{
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
@ -102,6 +109,82 @@ func (c *DBConsole) Run() {
|
||||
)
|
||||
}
|
||||
|
||||
if err := iter.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "show ceremony frames":
|
||||
earliestFrame, err := c.clockStore.GetEarliestDataClockFrame(
|
||||
application.CEREMONY_ADDRESS,
|
||||
)
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "earliest"))
|
||||
}
|
||||
|
||||
latestFrame, err := c.clockStore.GetLatestDataClockFrame(
|
||||
application.CEREMONY_ADDRESS,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "latest"))
|
||||
}
|
||||
|
||||
fmt.Printf(
|
||||
"earliest: %d, latest: %d\n",
|
||||
earliestFrame.FrameNumber,
|
||||
latestFrame.FrameNumber,
|
||||
)
|
||||
|
||||
fmt.Printf(
|
||||
"Genesis Frame:\n\tVDF Proof: %x\n",
|
||||
earliestFrame.Input[:516],
|
||||
)
|
||||
|
||||
iter, err := c.clockStore.RangeDataClockFrames(
|
||||
application.CEREMONY_ADDRESS,
|
||||
earliestFrame.FrameNumber+1,
|
||||
latestFrame.FrameNumber,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
value, err := iter.Value()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
selector, err := value.GetSelector()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Printf(
|
||||
"Frame %d (Selector: %x):\n\tParent: %x\n\tVDF Proof: %x\n",
|
||||
value.FrameNumber,
|
||||
selector.Bytes(),
|
||||
value.ParentSelector,
|
||||
value.Input[:516],
|
||||
)
|
||||
|
||||
for i := 0; i < len(value.Input[516:])/74; i++ {
|
||||
commit := value.Input[516+(i*74) : 516+((i+1)*74)]
|
||||
fmt.Printf(
|
||||
"\tCommitment %+x\n",
|
||||
commit,
|
||||
)
|
||||
fmt.Printf(
|
||||
"\t\tType: %s\n",
|
||||
value.AggregateProofs[i].InclusionCommitments[0].TypeUrl,
|
||||
)
|
||||
b, _ := proto.Marshal(value.AggregateProofs[i])
|
||||
hash := sha3.Sum256(b)
|
||||
fmt.Printf("\t\tAP Hash: %+x\n", hash)
|
||||
}
|
||||
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
if err := iter.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -40,8 +40,10 @@ var storeSet = wire.NewSet(
|
||||
store.NewPebbleDB,
|
||||
store.NewPebbleClockStore,
|
||||
store.NewPebbleKeyStore,
|
||||
store.NewPebbleDataProofStore,
|
||||
wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)),
|
||||
wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)),
|
||||
wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)),
|
||||
)
|
||||
|
||||
var pubSubSet = wire.NewSet(
|
||||
@ -87,3 +89,7 @@ func NewNode(*config.Config) (*Node, error) {
|
||||
func NewDBConsole(*config.Config) (*DBConsole, error) {
|
||||
panic(wire.Build(loggerSet, storeSet, newDBConsole))
|
||||
}
|
||||
|
||||
func NewClockStore(*config.Config) (store.ClockStore, error) {
|
||||
panic(wire.Build(loggerSet, storeSet))
|
||||
}
|
||||
|
@ -47,13 +47,22 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) {
|
||||
db := store.NewPebbleDB(dbConfig)
|
||||
zapLogger := logger()
|
||||
pebbleClockStore := store.NewPebbleClockStore(db, zapLogger)
|
||||
dbConsole, err := newDBConsole(pebbleClockStore)
|
||||
pebbleDataProofStore := store.NewPebbleDataProofStore(db, zapLogger)
|
||||
dbConsole, err := newDBConsole(pebbleClockStore, pebbleDataProofStore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dbConsole, nil
|
||||
}
|
||||
|
||||
func NewClockStore(configConfig *config.Config) (store.ClockStore, error) {
|
||||
dbConfig := configConfig.DB
|
||||
db := store.NewPebbleDB(dbConfig)
|
||||
zapLogger := logger()
|
||||
pebbleClockStore := store.NewPebbleClockStore(db, zapLogger)
|
||||
return pebbleClockStore, nil
|
||||
}
|
||||
|
||||
// wire.go:
|
||||
|
||||
func logger() *zap.Logger {
|
||||
@ -71,7 +80,7 @@ var loggerSet = wire.NewSet(
|
||||
|
||||
var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)))
|
||||
|
||||
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, store.NewPebbleClockStore, store.NewPebbleKeyStore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)))
|
||||
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, store.NewPebbleClockStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)))
|
||||
|
||||
var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)))
|
||||
|
||||
|
32
node/main.go
32
node/main.go
@ -16,6 +16,7 @@ import (
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/app"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/config"
|
||||
qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/execution/ceremony/application"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -62,6 +63,7 @@ func main() {
|
||||
}
|
||||
|
||||
clearIfTestData(*configDirectory, nodeConfig)
|
||||
migrate(*configDirectory, nodeConfig)
|
||||
|
||||
if *dbConsole {
|
||||
console, err := app.NewDBConsole(nodeConfig)
|
||||
@ -116,6 +118,36 @@ func clearIfTestData(configDir string, nodeConfig *config.Config) {
|
||||
}
|
||||
}
|
||||
|
||||
func migrate(configDir string, nodeConfig *config.Config) {
|
||||
_, err := os.Stat(filepath.Join(configDir, "MIGRATIONS"))
|
||||
if os.IsNotExist(err) {
|
||||
fmt.Println("Deduplicating and compressing clock frame data...")
|
||||
clock, err := app.NewClockStore(nodeConfig)
|
||||
if err := clock.Deduplicate(application.CEREMONY_ADDRESS); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
migrationFile, err := os.OpenFile(
|
||||
filepath.Join(configDir, "MIGRATIONS"),
|
||||
os.O_CREATE|os.O_RDWR,
|
||||
fs.FileMode(0700),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
_, err = migrationFile.Write([]byte{0x00, 0x00, 0x01})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = migrationFile.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func printPeerID(p2pConfig *config.P2PConfig) {
|
||||
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
|
||||
if err != nil {
|
||||
|
@ -2,6 +2,7 @@ package store
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/iden3/go-iden3-crypto/poseidon"
|
||||
@ -71,6 +72,7 @@ type ClockStore interface {
|
||||
parent []byte,
|
||||
frameNumber uint64,
|
||||
) (*protobufs.ClockFrame, error)
|
||||
Deduplicate(filter []byte) error
|
||||
}
|
||||
|
||||
type PebbleClockStore struct {
|
||||
@ -85,11 +87,13 @@ type PebbleMasterClockIterator struct {
|
||||
}
|
||||
|
||||
type PebbleClockIterator struct {
|
||||
i *pebble.Iterator
|
||||
i *pebble.Iterator
|
||||
db *PebbleClockStore
|
||||
}
|
||||
|
||||
type PebbleCandidateClockIterator struct {
|
||||
i *pebble.Iterator
|
||||
i *pebble.Iterator
|
||||
db *PebbleClockStore
|
||||
}
|
||||
|
||||
var _ Iterator[*protobufs.ClockFrame] = (*PebbleMasterClockIterator)(nil)
|
||||
@ -183,6 +187,13 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
|
||||
)
|
||||
}
|
||||
|
||||
if err := p.db.fillAggregateProofs(frame); err != nil {
|
||||
return nil, errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"get clock frame iterator value",
|
||||
)
|
||||
}
|
||||
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
@ -216,6 +227,13 @@ func (p *PebbleCandidateClockIterator) Value() (*protobufs.ClockFrame, error) {
|
||||
)
|
||||
}
|
||||
|
||||
if err := p.db.fillAggregateProofs(frame); err != nil {
|
||||
return nil, errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"get clock frame iterator value",
|
||||
)
|
||||
}
|
||||
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
@ -542,6 +560,13 @@ func (p *PebbleClockStore) GetDataClockFrame(
|
||||
)
|
||||
}
|
||||
|
||||
if err = p.fillAggregateProofs(frame); err != nil {
|
||||
return nil, nil, errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"get data clock frame",
|
||||
)
|
||||
}
|
||||
|
||||
proverTrie := &tries.RollingFrecencyCritbitTrie{}
|
||||
|
||||
trieData, closer, err := p.db.Get(clockProverTrieKey(filter, frameNumber))
|
||||
@ -558,6 +583,98 @@ func (p *PebbleClockStore) GetDataClockFrame(
|
||||
return frame, proverTrie, nil
|
||||
}
|
||||
|
||||
func (p *PebbleClockStore) fillAggregateProofs(
|
||||
frame *protobufs.ClockFrame,
|
||||
) error {
|
||||
if frame.FrameNumber == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < len(frame.Input[516:])/74; i++ {
|
||||
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
|
||||
ap, err := internalGetAggregateProof(
|
||||
p.db,
|
||||
frame.Filter,
|
||||
commit,
|
||||
frame.FrameNumber,
|
||||
func(typeUrl string, data [][]byte) ([]byte, error) {
|
||||
if typeUrl == protobufs.IntrinsicExecutionOutputType {
|
||||
o := &protobufs.IntrinsicExecutionOutput{}
|
||||
copiedLeft := make([]byte, len(data[0]))
|
||||
copiedRight := make([]byte, len(data[1]))
|
||||
copy(copiedLeft, data[0])
|
||||
copy(copiedRight, data[1])
|
||||
|
||||
o.Address = copiedLeft[:32]
|
||||
o.Output = copiedLeft[32:]
|
||||
o.Proof = copiedRight
|
||||
return proto.Marshal(o)
|
||||
}
|
||||
|
||||
copied := make([]byte, len(data[0]))
|
||||
copy(copied, data[0])
|
||||
return copied, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
frame.AggregateProofs = append(frame.AggregateProofs, ap)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PebbleClockStore) saveAggregateProofs(
|
||||
txn Transaction,
|
||||
frame *protobufs.ClockFrame,
|
||||
) error {
|
||||
shouldClose := false
|
||||
if txn == nil {
|
||||
var err error
|
||||
txn, err = p.NewTransaction()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
shouldClose = true
|
||||
}
|
||||
|
||||
for i := 0; i < len(frame.Input[516:])/74; i++ {
|
||||
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
|
||||
err := internalPutAggregateProof(
|
||||
p.db,
|
||||
txn,
|
||||
frame.AggregateProofs[i],
|
||||
commit, func(typeUrl string, data []byte) ([][]byte, error) {
|
||||
if typeUrl == protobufs.IntrinsicExecutionOutputType {
|
||||
o := &protobufs.IntrinsicExecutionOutput{}
|
||||
if err := proto.Unmarshal(data, o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
leftBits := append([]byte{}, o.Address...)
|
||||
leftBits = append(leftBits, o.Output...)
|
||||
rightBits := o.Proof
|
||||
return [][]byte{leftBits, rightBits}, nil
|
||||
}
|
||||
|
||||
return [][]byte{data}, nil
|
||||
})
|
||||
if err != nil {
|
||||
if err = txn.Abort(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldClose {
|
||||
txn.Commit()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEarliestDataClockFrame implements ClockStore.
|
||||
func (p *PebbleClockStore) GetEarliestDataClockFrame(
|
||||
filter []byte,
|
||||
@ -656,6 +773,13 @@ func (p *PebbleClockStore) GetParentDataClockFrame(
|
||||
return nil, errors.Wrap(err, "get parent data clock frame")
|
||||
}
|
||||
|
||||
if err := p.fillAggregateProofs(parent); err != nil {
|
||||
return nil, errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"get clock frame iterator value",
|
||||
)
|
||||
}
|
||||
|
||||
if closer != nil {
|
||||
closer.Close()
|
||||
}
|
||||
@ -671,6 +795,19 @@ func (p *PebbleClockStore) PutCandidateDataClockFrame(
|
||||
frame *protobufs.ClockFrame,
|
||||
txn Transaction,
|
||||
) error {
|
||||
if err := p.saveAggregateProofs(nil, frame); err != nil {
|
||||
return errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"put candidate data clock frame",
|
||||
)
|
||||
}
|
||||
|
||||
temp := append(
|
||||
[]*protobufs.InclusionAggregateProof{},
|
||||
frame.AggregateProofs...,
|
||||
)
|
||||
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
|
||||
|
||||
data, err := proto.Marshal(frame)
|
||||
if err != nil {
|
||||
return errors.Wrap(
|
||||
@ -679,6 +816,8 @@ func (p *PebbleClockStore) PutCandidateDataClockFrame(
|
||||
)
|
||||
}
|
||||
|
||||
frame.AggregateProofs = temp
|
||||
|
||||
if err = txn.Set(
|
||||
clockDataCandidateFrameKey(
|
||||
frame.Filter,
|
||||
@ -711,6 +850,22 @@ func (p *PebbleClockStore) PutDataClockFrame(
|
||||
proverTrie *tries.RollingFrecencyCritbitTrie,
|
||||
txn Transaction,
|
||||
) error {
|
||||
if frame.FrameNumber != 0 {
|
||||
if err := p.saveAggregateProofs(nil, frame); err != nil {
|
||||
return errors.Wrap(
|
||||
errors.Wrap(err, ErrInvalidData.Error()),
|
||||
"put candidate data clock frame",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
temp := append(
|
||||
[]*protobufs.InclusionAggregateProof{},
|
||||
frame.AggregateProofs...,
|
||||
)
|
||||
if frame.FrameNumber != 0 {
|
||||
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
|
||||
}
|
||||
data, err := proto.Marshal(frame)
|
||||
if err != nil {
|
||||
return errors.Wrap(
|
||||
@ -719,6 +874,9 @@ func (p *PebbleClockStore) PutDataClockFrame(
|
||||
)
|
||||
}
|
||||
|
||||
if frame.FrameNumber != 0 {
|
||||
frame.AggregateProofs = temp
|
||||
}
|
||||
frameNumberBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber)
|
||||
|
||||
@ -811,7 +969,7 @@ func (p *PebbleClockStore) GetCandidateDataClockFrames(
|
||||
})
|
||||
|
||||
frames := []*protobufs.ClockFrame{}
|
||||
i := &PebbleCandidateClockIterator{i: iter}
|
||||
i := &PebbleCandidateClockIterator{i: iter, db: p}
|
||||
|
||||
for i.First(); i.Valid(); i.Next() {
|
||||
value, err := i.Value()
|
||||
@ -861,7 +1019,7 @@ func (p *PebbleClockStore) RangeCandidateDataClockFrames(
|
||||
),
|
||||
})
|
||||
|
||||
return &PebbleCandidateClockIterator{i: iter}, nil
|
||||
return &PebbleCandidateClockIterator{i: iter, db: p}, nil
|
||||
}
|
||||
|
||||
// RangeDataClockFrames implements ClockStore.
|
||||
@ -881,5 +1039,184 @@ func (p *PebbleClockStore) RangeDataClockFrames(
|
||||
UpperBound: clockDataFrameKey(filter, endFrameNumber),
|
||||
})
|
||||
|
||||
return &PebbleClockIterator{i: iter}, nil
|
||||
return &PebbleClockIterator{i: iter, db: p}, nil
|
||||
}
|
||||
|
||||
// Should only need be run once, before starting
|
||||
func (p *PebbleClockStore) Deduplicate(filter []byte) error {
|
||||
from := clockDataParentIndexKey(
|
||||
filter,
|
||||
1,
|
||||
[]byte{
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
},
|
||||
)
|
||||
to := clockDataParentIndexKey(
|
||||
filter,
|
||||
20000,
|
||||
[]byte{
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
},
|
||||
)
|
||||
|
||||
iter := p.db.NewIter(&pebble.IterOptions{
|
||||
LowerBound: from,
|
||||
UpperBound: to,
|
||||
})
|
||||
|
||||
i := 0
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
value := iter.Value()
|
||||
frame := &protobufs.ClockFrame{}
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.saveAggregateProofs(nil, frame); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
|
||||
newValue, err := proto.Marshal(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.db.Set(iter.Key(), newValue, &pebble.WriteOptions{Sync: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i++
|
||||
if i%100 == 0 {
|
||||
fmt.Println("Deduplicated 100 parent frames")
|
||||
}
|
||||
}
|
||||
|
||||
iter.Close()
|
||||
if err := p.db.Compact(from, to, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
from = clockDataFrameKey(filter, 1)
|
||||
to = clockDataFrameKey(filter, 20000)
|
||||
|
||||
iter = p.db.NewIter(&pebble.IterOptions{
|
||||
LowerBound: from,
|
||||
UpperBound: to,
|
||||
})
|
||||
|
||||
i = 0
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
value := iter.Value()
|
||||
frame := &protobufs.ClockFrame{}
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.saveAggregateProofs(nil, frame); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
|
||||
newValue, err := proto.Marshal(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.db.Set(iter.Key(), newValue, &pebble.WriteOptions{Sync: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i++
|
||||
if i%100 == 0 {
|
||||
fmt.Println("Deduplicated 100 data frames")
|
||||
}
|
||||
}
|
||||
|
||||
iter.Close()
|
||||
if err := p.db.Compact(from, to, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
from = clockDataCandidateFrameKey(
|
||||
filter,
|
||||
1,
|
||||
[]byte{
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
},
|
||||
[]byte{
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
},
|
||||
)
|
||||
to = clockDataCandidateFrameKey(
|
||||
filter,
|
||||
20000,
|
||||
[]byte{
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
},
|
||||
[]byte{
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
},
|
||||
)
|
||||
|
||||
iter = p.db.NewIter(&pebble.IterOptions{
|
||||
LowerBound: from,
|
||||
UpperBound: to,
|
||||
})
|
||||
|
||||
i = 0
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
value := iter.Value()
|
||||
frame := &protobufs.ClockFrame{}
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.saveAggregateProofs(nil, frame); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
|
||||
newValue, err := proto.Marshal(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.db.Set(iter.Key(), newValue, &pebble.WriteOptions{Sync: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i++
|
||||
if i%100 == 0 {
|
||||
fmt.Println("Deduplicated 100 candidate frames")
|
||||
}
|
||||
}
|
||||
|
||||
iter.Close()
|
||||
if err := p.db.Compact(from, to, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.db.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
274
node/store/data_proof.go
Normal file
274
node/store/data_proof.go
Normal file
@ -0,0 +1,274 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/crypto/sha3"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
|
||||
)
|
||||
|
||||
type DataProofStore interface {
|
||||
NewTransaction() (Transaction, error)
|
||||
GetAggregateProof(
|
||||
filter []byte,
|
||||
commitment []byte,
|
||||
frameNumber uint64,
|
||||
inclusionReassembler func(typeUrl string, data [][]byte) ([]byte, error),
|
||||
) (
|
||||
*protobufs.InclusionAggregateProof,
|
||||
error,
|
||||
)
|
||||
PutAggregateProof(
|
||||
txn Transaction,
|
||||
aggregateProof *protobufs.InclusionAggregateProof,
|
||||
commitment []byte,
|
||||
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
|
||||
) error
|
||||
}
|
||||
|
||||
type PebbleDataProofStore struct {
|
||||
db *pebble.DB
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewPebbleDataProofStore(
|
||||
db *pebble.DB,
|
||||
logger *zap.Logger,
|
||||
) *PebbleDataProofStore {
|
||||
return &PebbleDataProofStore{
|
||||
db,
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
DATA_PROOF = 0x04
|
||||
DATA_PROOF_METADATA = 0x00
|
||||
DATA_PROOF_INCLUSION = 0x01
|
||||
DATA_PROOF_SEGMENT = 0x02
|
||||
)
|
||||
|
||||
func dataProofMetadataKey(filter []byte, commitment []byte) []byte {
|
||||
key := []byte{DATA_PROOF, DATA_PROOF_METADATA}
|
||||
key = append(key, commitment...)
|
||||
key = append(key, filter...)
|
||||
return key
|
||||
}
|
||||
|
||||
func dataProofInclusionKey(
|
||||
filter []byte,
|
||||
commitment []byte,
|
||||
seqNo uint64,
|
||||
) []byte {
|
||||
key := []byte{DATA_PROOF, DATA_PROOF_INCLUSION}
|
||||
key = append(key, commitment...)
|
||||
key = binary.BigEndian.AppendUint64(key, seqNo)
|
||||
key = append(key, filter...)
|
||||
return key
|
||||
}
|
||||
|
||||
func dataProofSegmentKey(
|
||||
filter []byte,
|
||||
hash []byte,
|
||||
) []byte {
|
||||
key := []byte{DATA_PROOF, DATA_PROOF_SEGMENT}
|
||||
key = append(key, hash...)
|
||||
key = append(key, filter...)
|
||||
return key
|
||||
}
|
||||
|
||||
func (p *PebbleDataProofStore) NewTransaction() (Transaction, error) {
|
||||
return &PebbleTransaction{
|
||||
b: p.db.NewBatch(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func internalGetAggregateProof(
|
||||
db *pebble.DB,
|
||||
filter []byte,
|
||||
commitment []byte,
|
||||
frameNumber uint64,
|
||||
inclusionReassembler func(typeUrl string, data [][]byte) ([]byte, error),
|
||||
) (*protobufs.InclusionAggregateProof, error) {
|
||||
value, closer, err := db.Get(dataProofMetadataKey(filter, commitment))
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
|
||||
defer closer.Close()
|
||||
copied := make([]byte, len(value[8:]))
|
||||
limit := binary.BigEndian.Uint64(value[0:8])
|
||||
copy(copied, value[8:])
|
||||
|
||||
aggregate := &protobufs.InclusionAggregateProof{
|
||||
Filter: filter,
|
||||
FrameNumber: frameNumber,
|
||||
InclusionCommitments: []*protobufs.InclusionCommitment{},
|
||||
Proof: copied,
|
||||
}
|
||||
|
||||
iter := db.NewIter(&pebble.IterOptions{
|
||||
LowerBound: dataProofInclusionKey(filter, commitment, 0),
|
||||
UpperBound: dataProofInclusionKey(filter, commitment, limit+1),
|
||||
})
|
||||
|
||||
i := uint32(0)
|
||||
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
incCommit := iter.Value()
|
||||
|
||||
urlLength := binary.BigEndian.Uint16(incCommit[:2])
|
||||
commitLength := binary.BigEndian.Uint16(incCommit[2:4])
|
||||
|
||||
url := make([]byte, urlLength)
|
||||
copy(url, incCommit[4:urlLength+4])
|
||||
|
||||
commit := make([]byte, commitLength)
|
||||
copy(commit, incCommit[urlLength+4:urlLength+4+commitLength])
|
||||
|
||||
remainder := int(urlLength + 4 + commitLength)
|
||||
|
||||
inclusionCommitment := &protobufs.InclusionCommitment{
|
||||
Filter: filter,
|
||||
FrameNumber: frameNumber,
|
||||
Position: i,
|
||||
TypeUrl: string(url),
|
||||
Commitment: commit,
|
||||
}
|
||||
|
||||
chunks := [][]byte{}
|
||||
for j := 0; j < (len(incCommit)-remainder)/32; j++ {
|
||||
start := remainder + (j * 32)
|
||||
end := remainder + ((j + 1) * 32)
|
||||
segValue, dataCloser, err := db.Get(
|
||||
dataProofSegmentKey(filter, incCommit[start:end]),
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
// If we've lost this key it means we're in a corrupted state
|
||||
return nil, ErrInvalidData
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
|
||||
segCopy := make([]byte, len(segValue))
|
||||
copy(segCopy, segValue)
|
||||
chunks = append(chunks, segCopy)
|
||||
|
||||
if err = dataCloser.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
}
|
||||
|
||||
inclusionCommitment.Data, err = inclusionReassembler(string(url), chunks)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
|
||||
aggregate.InclusionCommitments = append(
|
||||
aggregate.InclusionCommitments,
|
||||
inclusionCommitment,
|
||||
)
|
||||
i++
|
||||
}
|
||||
|
||||
if err = iter.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
|
||||
return aggregate, nil
|
||||
}
|
||||
|
||||
func (p *PebbleDataProofStore) GetAggregateProof(
|
||||
filter []byte,
|
||||
commitment []byte,
|
||||
frameNumber uint64,
|
||||
inclusionReassembler func(typeUrl string, data [][]byte) ([]byte, error),
|
||||
) (*protobufs.InclusionAggregateProof, error) {
|
||||
return internalGetAggregateProof(
|
||||
p.db,
|
||||
filter,
|
||||
commitment,
|
||||
frameNumber,
|
||||
inclusionReassembler,
|
||||
)
|
||||
}
|
||||
|
||||
func internalPutAggregateProof(
|
||||
db *pebble.DB,
|
||||
txn Transaction,
|
||||
aggregateProof *protobufs.InclusionAggregateProof,
|
||||
commitment []byte,
|
||||
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
|
||||
) error {
|
||||
buf := binary.BigEndian.AppendUint64(
|
||||
nil,
|
||||
uint64(len(aggregateProof.InclusionCommitments)),
|
||||
)
|
||||
buf = append(buf, aggregateProof.Proof...)
|
||||
|
||||
for i, inc := range aggregateProof.InclusionCommitments {
|
||||
segments, err := inclusionSplitter(inc.TypeUrl, inc.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get aggregate proof")
|
||||
}
|
||||
|
||||
urlLength := len(inc.TypeUrl)
|
||||
commitLength := len(inc.Commitment)
|
||||
encoded := binary.BigEndian.AppendUint16(nil, uint16(urlLength))
|
||||
encoded = binary.BigEndian.AppendUint16(encoded, uint16(commitLength))
|
||||
|
||||
encoded = append(encoded, []byte(inc.TypeUrl)...)
|
||||
encoded = append(encoded, inc.Commitment...)
|
||||
|
||||
for _, segment := range segments {
|
||||
hash := sha3.Sum256(segment)
|
||||
if err = txn.Set(
|
||||
dataProofSegmentKey(aggregateProof.Filter, hash[:]),
|
||||
segment,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "put aggregate proof")
|
||||
}
|
||||
encoded = append(encoded, hash[:]...)
|
||||
}
|
||||
|
||||
if err = txn.Set(
|
||||
dataProofInclusionKey(aggregateProof.Filter, commitment, uint64(i)),
|
||||
encoded,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "put aggregate proof")
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Set(
|
||||
dataProofMetadataKey(aggregateProof.Filter, commitment),
|
||||
buf,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "put aggregate proof")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PebbleDataProofStore) PutAggregateProof(
|
||||
txn Transaction,
|
||||
aggregateProof *protobufs.InclusionAggregateProof,
|
||||
commitment []byte,
|
||||
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
|
||||
) error {
|
||||
return internalPutAggregateProof(
|
||||
p.db,
|
||||
txn,
|
||||
aggregateProof,
|
||||
commitment,
|
||||
inclusionSplitter,
|
||||
)
|
||||
}
|
@ -18,6 +18,7 @@ type Transaction interface {
|
||||
Set(key []byte, value []byte) error
|
||||
Commit() error
|
||||
Delete(key []byte) error
|
||||
Abort() error
|
||||
}
|
||||
|
||||
type PebbleTransaction struct {
|
||||
@ -36,6 +37,10 @@ func (t *PebbleTransaction) Delete(key []byte) error {
|
||||
return t.b.Delete(key, &pebble.WriteOptions{Sync: true})
|
||||
}
|
||||
|
||||
func (t *PebbleTransaction) Abort() error {
|
||||
return t.b.Close()
|
||||
}
|
||||
|
||||
var _ Transaction = (*PebbleTransaction)(nil)
|
||||
|
||||
func rightAlign(data []byte, size int) []byte {
|
||||
|
Loading…
Reference in New Issue
Block a user