mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-12 16:55:18 +00:00
305 lines
9.4 KiB
Go
305 lines
9.4 KiB
Go
|
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
|
||
|
// of this source code is governed by a BSD-style license that can be found in
|
||
|
// the LICENSE file.
|
||
|
|
||
|
package objstorageprovider
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"encoding/binary"
|
||
|
"io"
|
||
|
|
||
|
"github.com/cockroachdb/errors"
|
||
|
"github.com/cockroachdb/pebble/internal/base"
|
||
|
"github.com/cockroachdb/pebble/objstorage"
|
||
|
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
|
||
|
"github.com/cockroachdb/pebble/objstorage/remote"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
tagCreatorID = 1
|
||
|
tagCreatorFileNum = 2
|
||
|
tagCleanupMethod = 3
|
||
|
// tagRefCheckID encodes the information for a ref marker that needs to be
|
||
|
// checked when attaching this object to another provider. This is set to the
|
||
|
// creator ID and FileNum for the provider that encodes the backing, and
|
||
|
// allows the "target" provider to check that the "source" provider kept its
|
||
|
// reference on the object alive.
|
||
|
tagRefCheckID = 4
|
||
|
// tagLocator encodes the remote.Locator; if absent the locator is "". It is
|
||
|
// followed by the locator string length and the locator string.
|
||
|
tagLocator = 5
|
||
|
// tagLocator encodes a custom object name (if present). It is followed by the
|
||
|
// custom name string length and the string.
|
||
|
tagCustomObjectName = 6
|
||
|
|
||
|
// Any new tags that don't have the tagNotSafeToIgnoreMask bit set must be
|
||
|
// followed by the length of the data (so they can be skipped).
|
||
|
|
||
|
// Any new tags that have the tagNotSafeToIgnoreMask bit set cause errors if
|
||
|
// they are encountered by earlier code that doesn't know the tag.
|
||
|
tagNotSafeToIgnoreMask = 64
|
||
|
)
|
||
|
|
||
|
func (p *provider) encodeRemoteObjectBacking(
|
||
|
meta *objstorage.ObjectMetadata,
|
||
|
) (objstorage.RemoteObjectBacking, error) {
|
||
|
if !meta.IsRemote() {
|
||
|
return nil, errors.AssertionFailedf("object %s not on remote storage", meta.DiskFileNum)
|
||
|
}
|
||
|
|
||
|
buf := make([]byte, 0, binary.MaxVarintLen64*4)
|
||
|
buf = binary.AppendUvarint(buf, tagCreatorID)
|
||
|
buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorID))
|
||
|
// TODO(radu): encode file type as well?
|
||
|
buf = binary.AppendUvarint(buf, tagCreatorFileNum)
|
||
|
buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorFileNum.FileNum()))
|
||
|
buf = binary.AppendUvarint(buf, tagCleanupMethod)
|
||
|
buf = binary.AppendUvarint(buf, uint64(meta.Remote.CleanupMethod))
|
||
|
if meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
|
||
|
buf = binary.AppendUvarint(buf, tagRefCheckID)
|
||
|
buf = binary.AppendUvarint(buf, uint64(p.remote.shared.creatorID))
|
||
|
buf = binary.AppendUvarint(buf, uint64(meta.DiskFileNum.FileNum()))
|
||
|
}
|
||
|
if meta.Remote.Locator != "" {
|
||
|
buf = binary.AppendUvarint(buf, tagLocator)
|
||
|
buf = encodeString(buf, string(meta.Remote.Locator))
|
||
|
}
|
||
|
if meta.Remote.CustomObjectName != "" {
|
||
|
buf = binary.AppendUvarint(buf, tagCustomObjectName)
|
||
|
buf = encodeString(buf, meta.Remote.CustomObjectName)
|
||
|
}
|
||
|
return buf, nil
|
||
|
}
|
||
|
|
||
|
type remoteObjectBackingHandle struct {
|
||
|
backing objstorage.RemoteObjectBacking
|
||
|
fileNum base.DiskFileNum
|
||
|
p *provider
|
||
|
}
|
||
|
|
||
|
func (s *remoteObjectBackingHandle) Get() (objstorage.RemoteObjectBacking, error) {
|
||
|
if s.backing == nil {
|
||
|
return nil, errors.Errorf("RemoteObjectBackingHandle.Get() called after Close()")
|
||
|
}
|
||
|
return s.backing, nil
|
||
|
}
|
||
|
|
||
|
func (s *remoteObjectBackingHandle) Close() {
|
||
|
if s.backing != nil {
|
||
|
s.backing = nil
|
||
|
s.p.unprotectObject(s.fileNum)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var _ objstorage.RemoteObjectBackingHandle = (*remoteObjectBackingHandle)(nil)
|
||
|
|
||
|
// RemoteObjectBacking is part of the objstorage.Provider interface.
|
||
|
func (p *provider) RemoteObjectBacking(
|
||
|
meta *objstorage.ObjectMetadata,
|
||
|
) (objstorage.RemoteObjectBackingHandle, error) {
|
||
|
backing, err := p.encodeRemoteObjectBacking(meta)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
p.protectObject(meta.DiskFileNum)
|
||
|
return &remoteObjectBackingHandle{
|
||
|
backing: backing,
|
||
|
fileNum: meta.DiskFileNum,
|
||
|
p: p,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// CreateExternalObjectBacking is part of the objstorage.Provider interface.
|
||
|
func (p *provider) CreateExternalObjectBacking(
|
||
|
locator remote.Locator, objName string,
|
||
|
) (objstorage.RemoteObjectBacking, error) {
|
||
|
var meta objstorage.ObjectMetadata
|
||
|
meta.Remote.Locator = locator
|
||
|
meta.Remote.CustomObjectName = objName
|
||
|
meta.Remote.CleanupMethod = objstorage.SharedNoCleanup
|
||
|
return p.encodeRemoteObjectBacking(&meta)
|
||
|
}
|
||
|
|
||
|
type decodedBacking struct {
|
||
|
meta objstorage.ObjectMetadata
|
||
|
// refToCheck is set only when meta.Remote.CleanupMethod is RefTracking
|
||
|
refToCheck struct {
|
||
|
creatorID objstorage.CreatorID
|
||
|
fileNum base.DiskFileNum
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// decodeRemoteObjectBacking decodes the remote object metadata.
|
||
|
//
|
||
|
// Note that the meta.Remote.Storage field is not set.
|
||
|
func decodeRemoteObjectBacking(
|
||
|
fileType base.FileType, fileNum base.DiskFileNum, buf objstorage.RemoteObjectBacking,
|
||
|
) (decodedBacking, error) {
|
||
|
var creatorID, creatorFileNum, cleanupMethod, refCheckCreatorID, refCheckFileNum uint64
|
||
|
var locator, customObjName string
|
||
|
br := bytes.NewReader(buf)
|
||
|
for {
|
||
|
tag, err := binary.ReadUvarint(br)
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
}
|
||
|
if err != nil {
|
||
|
return decodedBacking{}, err
|
||
|
}
|
||
|
switch tag {
|
||
|
case tagCreatorID:
|
||
|
creatorID, err = binary.ReadUvarint(br)
|
||
|
|
||
|
case tagCreatorFileNum:
|
||
|
creatorFileNum, err = binary.ReadUvarint(br)
|
||
|
|
||
|
case tagCleanupMethod:
|
||
|
cleanupMethod, err = binary.ReadUvarint(br)
|
||
|
|
||
|
case tagRefCheckID:
|
||
|
refCheckCreatorID, err = binary.ReadUvarint(br)
|
||
|
if err == nil {
|
||
|
refCheckFileNum, err = binary.ReadUvarint(br)
|
||
|
}
|
||
|
|
||
|
case tagLocator:
|
||
|
locator, err = decodeString(br)
|
||
|
|
||
|
case tagCustomObjectName:
|
||
|
customObjName, err = decodeString(br)
|
||
|
|
||
|
default:
|
||
|
// Ignore unknown tags, unless they're not safe to ignore.
|
||
|
if tag&tagNotSafeToIgnoreMask != 0 {
|
||
|
return decodedBacking{}, errors.Newf("unknown tag %d", tag)
|
||
|
}
|
||
|
var dataLen uint64
|
||
|
dataLen, err = binary.ReadUvarint(br)
|
||
|
if err == nil {
|
||
|
_, err = br.Seek(int64(dataLen), io.SeekCurrent)
|
||
|
}
|
||
|
}
|
||
|
if err != nil {
|
||
|
return decodedBacking{}, err
|
||
|
}
|
||
|
}
|
||
|
if customObjName == "" {
|
||
|
if creatorID == 0 {
|
||
|
return decodedBacking{}, errors.Newf("remote object backing missing creator ID")
|
||
|
}
|
||
|
if creatorFileNum == 0 {
|
||
|
return decodedBacking{}, errors.Newf("remote object backing missing creator file num")
|
||
|
}
|
||
|
}
|
||
|
var res decodedBacking
|
||
|
res.meta.DiskFileNum = fileNum
|
||
|
res.meta.FileType = fileType
|
||
|
res.meta.Remote.CreatorID = objstorage.CreatorID(creatorID)
|
||
|
res.meta.Remote.CreatorFileNum = base.FileNum(creatorFileNum).DiskFileNum()
|
||
|
res.meta.Remote.CleanupMethod = objstorage.SharedCleanupMethod(cleanupMethod)
|
||
|
if res.meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
|
||
|
if refCheckCreatorID == 0 || refCheckFileNum == 0 {
|
||
|
return decodedBacking{}, errors.Newf("remote object backing missing ref to check")
|
||
|
}
|
||
|
res.refToCheck.creatorID = objstorage.CreatorID(refCheckCreatorID)
|
||
|
res.refToCheck.fileNum = base.FileNum(refCheckFileNum).DiskFileNum()
|
||
|
}
|
||
|
res.meta.Remote.Locator = remote.Locator(locator)
|
||
|
res.meta.Remote.CustomObjectName = customObjName
|
||
|
return res, nil
|
||
|
}
|
||
|
|
||
|
func encodeString(buf []byte, s string) []byte {
|
||
|
buf = binary.AppendUvarint(buf, uint64(len(s)))
|
||
|
buf = append(buf, []byte(s)...)
|
||
|
return buf
|
||
|
}
|
||
|
|
||
|
func decodeString(br io.ByteReader) (string, error) {
|
||
|
length, err := binary.ReadUvarint(br)
|
||
|
if err != nil || length == 0 {
|
||
|
return "", err
|
||
|
}
|
||
|
buf := make([]byte, length)
|
||
|
for i := range buf {
|
||
|
buf[i], err = br.ReadByte()
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
}
|
||
|
return string(buf), nil
|
||
|
}
|
||
|
|
||
|
// AttachRemoteObjects is part of the objstorage.Provider interface.
|
||
|
func (p *provider) AttachRemoteObjects(
|
||
|
objs []objstorage.RemoteObjectToAttach,
|
||
|
) ([]objstorage.ObjectMetadata, error) {
|
||
|
decoded := make([]decodedBacking, len(objs))
|
||
|
for i, o := range objs {
|
||
|
var err error
|
||
|
decoded[i], err = decodeRemoteObjectBacking(o.FileType, o.FileNum, o.Backing)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
decoded[i].meta.Remote.Storage, err = p.ensureStorage(decoded[i].meta.Remote.Locator)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Create the reference marker objects.
|
||
|
// TODO(radu): parallelize this.
|
||
|
for _, d := range decoded {
|
||
|
if d.meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
|
||
|
continue
|
||
|
}
|
||
|
if err := p.sharedCreateRef(d.meta); err != nil {
|
||
|
// TODO(radu): clean up references previously created in this loop.
|
||
|
return nil, err
|
||
|
}
|
||
|
// Check the "origin"'s reference.
|
||
|
refName := sharedObjectRefName(d.meta, d.refToCheck.creatorID, d.refToCheck.fileNum)
|
||
|
if _, err := d.meta.Remote.Storage.Size(refName); err != nil {
|
||
|
_ = p.sharedUnref(d.meta)
|
||
|
// TODO(radu): clean up references previously created in this loop.
|
||
|
if d.meta.Remote.Storage.IsNotExistError(err) {
|
||
|
return nil, errors.Errorf("origin marker object %q does not exist;"+
|
||
|
" object probably removed from the provider which created the backing", refName)
|
||
|
}
|
||
|
return nil, errors.Wrapf(err, "checking origin's marker object %s", refName)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func() {
|
||
|
p.mu.Lock()
|
||
|
defer p.mu.Unlock()
|
||
|
for _, d := range decoded {
|
||
|
p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
|
||
|
FileNum: d.meta.DiskFileNum,
|
||
|
FileType: d.meta.FileType,
|
||
|
CreatorID: d.meta.Remote.CreatorID,
|
||
|
CreatorFileNum: d.meta.Remote.CreatorFileNum,
|
||
|
CleanupMethod: d.meta.Remote.CleanupMethod,
|
||
|
Locator: d.meta.Remote.Locator,
|
||
|
CustomObjectName: d.meta.Remote.CustomObjectName,
|
||
|
})
|
||
|
}
|
||
|
}()
|
||
|
if err := p.sharedSync(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
metas := make([]objstorage.ObjectMetadata, len(decoded))
|
||
|
for i, d := range decoded {
|
||
|
metas[i] = d.meta
|
||
|
}
|
||
|
|
||
|
p.mu.Lock()
|
||
|
defer p.mu.Unlock()
|
||
|
for _, meta := range metas {
|
||
|
p.mu.knownObjects[meta.DiskFileNum] = meta
|
||
|
}
|
||
|
return metas, nil
|
||
|
}
|