ceremonyclient/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go

673 lines
18 KiB
Go
Raw Permalink Normal View History

2023-09-25 02:43:35 +00:00
package ceremony
import (
2024-02-13 07:04:56 +00:00
"bytes"
2024-02-14 07:11:12 +00:00
"context"
2023-09-25 02:43:35 +00:00
"crypto"
2024-02-14 07:11:12 +00:00
"crypto/tls"
2023-11-01 03:45:20 +00:00
"encoding/binary"
2023-09-25 02:43:35 +00:00
"sync"
"time"
2024-02-14 07:11:12 +00:00
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
2023-09-25 02:43:35 +00:00
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
2024-02-14 07:11:12 +00:00
"google.golang.org/grpc/credentials"
2023-09-25 02:43:35 +00:00
"google.golang.org/protobuf/types/known/anypb"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
2023-09-25 02:43:35 +00:00
"source.quilibrium.com/quilibrium/monorepo/nekryptology/pkg/core/curves"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
2024-02-13 07:04:56 +00:00
qtime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
2023-09-25 02:43:35 +00:00
qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto"
"source.quilibrium.com/quilibrium/monorepo/node/execution"
"source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
2023-10-27 02:41:38 +00:00
const PEER_INFO_TTL = 60 * 60 * 1000
const UNCOOPERATIVE_PEER_INFO_TTL = 5 * 60 * 1000
2023-09-25 02:43:35 +00:00
type InclusionMap = map[curves.PairingPoint]*protobufs.InclusionCommitment
type PolynomialMap = map[curves.PairingPoint][]curves.PairingScalar
type SyncStatusType int
const (
SyncStatusNotSyncing = iota
SyncStatusAwaitingResponse
SyncStatusSynchronizing
2023-11-01 03:45:20 +00:00
SyncStatusFailed
2023-09-25 02:43:35 +00:00
)
2023-09-29 07:55:09 +00:00
type peerInfo struct {
2024-02-24 08:35:13 +00:00
peerId []byte
multiaddr string
maxFrame uint64
timestamp int64
lastSeen int64
version []byte
signature []byte
publicKey []byte
direct bool
totalDistance []byte
2023-09-29 07:55:09 +00:00
}
type ChannelServer = protobufs.CeremonyService_GetPublicChannelServer
2023-09-25 02:43:35 +00:00
type CeremonyDataClockConsensusEngine struct {
protobufs.UnimplementedCeremonyServiceServer
2023-09-25 02:43:35 +00:00
difficulty uint32
logger *zap.Logger
state consensus.EngineState
clockStore store.ClockStore
keyStore store.KeyStore
pubSub p2p.PubSub
keyManager keys.KeyManager
2024-02-13 07:04:56 +00:00
masterTimeReel *qtime.MasterTimeReel
dataTimeReel *qtime.DataTimeReel
2024-03-21 07:14:45 +00:00
peerInfoManager p2p.PeerInfoManager
2023-09-25 02:43:35 +00:00
provingKey crypto.Signer
provingKeyBytes []byte
provingKeyType keys.KeyType
provingKeyAddress []byte
lastFrameReceivedAt time.Time
latestFrameReceived uint64
frameProverTrie *tries.RollingFrecencyCritbitTrie
dependencyMap map[string]*anypb.Any
pendingCommits chan *anypb.Any
pendingCommitWorkers int64
2024-02-13 07:04:56 +00:00
inclusionProver qcrypto.InclusionProver
frameProver qcrypto.FrameProver
2023-09-25 02:43:35 +00:00
stagedLobbyStateTransitions *protobufs.CeremonyLobbyStateTransition
minimumPeersRequired int
2024-02-14 07:11:12 +00:00
statsClient protobufs.NodeStatsClient
2024-02-24 08:35:13 +00:00
currentReceivingSyncPeersMx sync.Mutex
2024-02-20 20:01:10 +00:00
currentReceivingSyncPeers int
2023-09-25 02:43:35 +00:00
frameChan chan *protobufs.ClockFrame
executionEngines map[string]execution.ExecutionEngine
filter []byte
input []byte
parentSelector []byte
syncingStatus SyncStatusType
syncingTarget []byte
2023-11-01 03:45:20 +00:00
previousHead *protobufs.ClockFrame
2023-09-25 02:43:35 +00:00
engineMx sync.Mutex
dependencyMapMx sync.Mutex
stagedLobbyStateTransitionsMx sync.Mutex
peerMapMx sync.RWMutex
2023-09-29 07:55:09 +00:00
peerAnnounceMapMx sync.Mutex
2023-09-25 02:43:35 +00:00
lastKeyBundleAnnouncementFrame uint64
2023-09-29 07:55:09 +00:00
peerMap map[string]*peerInfo
uncooperativePeersMap map[string]*peerInfo
messageProcessorCh chan *pb.Message
2023-09-25 02:43:35 +00:00
}
var _ consensus.DataConsensusEngine = (*CeremonyDataClockConsensusEngine)(nil)
// Creates a new data clock for ceremony execution  this is a hybrid clock,
// normally data clocks are bloom sharded and have node-specific proofs along
// with the public VDF proofs, but in this case it is a proof from the execution
// across all participating nodes.
func NewCeremonyDataClockConsensusEngine(
engineConfig *config.EngineConfig,
logger *zap.Logger,
keyManager keys.KeyManager,
clockStore store.ClockStore,
keyStore store.KeyStore,
pubSub p2p.PubSub,
2024-02-13 07:04:56 +00:00
frameProver qcrypto.FrameProver,
inclusionProver qcrypto.InclusionProver,
masterTimeReel *qtime.MasterTimeReel,
dataTimeReel *qtime.DataTimeReel,
2024-03-21 07:14:45 +00:00
peerInfoManager p2p.PeerInfoManager,
2024-01-03 07:31:42 +00:00
filter []byte,
seed []byte,
2023-09-25 02:43:35 +00:00
) *CeremonyDataClockConsensusEngine {
if logger == nil {
panic(errors.New("logger is nil"))
}
if engineConfig == nil {
panic(errors.New("engine config is nil"))
}
if keyManager == nil {
panic(errors.New("key manager is nil"))
}
if clockStore == nil {
panic(errors.New("clock store is nil"))
}
if keyStore == nil {
panic(errors.New("key store is nil"))
}
if pubSub == nil {
panic(errors.New("pubsub is nil"))
}
2024-02-13 07:04:56 +00:00
if frameProver == nil {
panic(errors.New("frame prover is nil"))
}
if inclusionProver == nil {
panic(errors.New("inclusion prover is nil"))
}
if masterTimeReel == nil {
panic(errors.New("master time reel is nil"))
}
if dataTimeReel == nil {
panic(errors.New("data time reel is nil"))
}
2024-03-21 07:14:45 +00:00
if peerInfoManager == nil {
panic(errors.New("peer info manager is nil"))
}
minimumPeersRequired := engineConfig.MinimumPeersRequired
if minimumPeersRequired == 0 {
minimumPeersRequired = 3
}
2024-01-03 07:31:42 +00:00
difficulty := engineConfig.Difficulty
if difficulty == 0 {
difficulty = 10000
}
2024-02-14 07:11:12 +00:00
var statsClient protobufs.NodeStatsClient
if engineConfig.StatsMultiaddr != "" {
ma, err := multiaddr.NewMultiaddr(engineConfig.StatsMultiaddr)
if err != nil {
panic(err)
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
panic(err)
}
cc, err := grpc.Dial(
addr,
grpc.WithTransportCredentials(
credentials.NewTLS(&tls.Config{InsecureSkipVerify: false}),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(600*1024*1024),
grpc.MaxCallRecvMsgSize(600*1024*1024),
),
)
if err != nil {
panic(err)
}
statsClient = protobufs.NewNodeStatsClient(cc)
}
2023-09-25 02:43:35 +00:00
e := &CeremonyDataClockConsensusEngine{
2024-01-03 07:31:42 +00:00
difficulty: difficulty,
2023-09-25 02:43:35 +00:00
logger: logger,
state: consensus.EngineStateStopped,
clockStore: clockStore,
keyStore: keyStore,
keyManager: keyManager,
pubSub: pubSub,
frameChan: make(chan *protobufs.ClockFrame),
executionEngines: map[string]execution.ExecutionEngine{},
dependencyMap: make(map[string]*anypb.Any),
parentSelector: []byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
2024-02-20 20:01:10 +00:00
currentReceivingSyncPeers: 0,
lastFrameReceivedAt: time.Time{},
frameProverTrie: &tries.RollingFrecencyCritbitTrie{},
inclusionProver: inclusionProver,
syncingStatus: SyncStatusNotSyncing,
peerMap: map[string]*peerInfo{},
uncooperativePeersMap: map[string]*peerInfo{},
minimumPeersRequired: minimumPeersRequired,
frameProver: frameProver,
masterTimeReel: masterTimeReel,
dataTimeReel: dataTimeReel,
2024-03-21 07:14:45 +00:00
peerInfoManager: peerInfoManager,
2024-02-20 20:01:10 +00:00
statsClient: statsClient,
messageProcessorCh: make(chan *pb.Message),
2023-09-25 02:43:35 +00:00
}
logger.Info("constructing consensus engine")
signer, keyType, bytes, address := e.GetProvingKey(
engineConfig,
)
2024-01-03 07:31:42 +00:00
e.filter = filter
e.input = seed
2023-09-25 02:43:35 +00:00
e.provingKey = signer
e.provingKeyType = keyType
e.provingKeyBytes = bytes
e.provingKeyAddress = address
return e
}
2024-01-03 07:31:42 +00:00
func (e *CeremonyDataClockConsensusEngine) Start() <-chan error {
2023-09-25 02:43:35 +00:00
e.logger.Info("starting ceremony consensus engine")
e.state = consensus.EngineStateStarting
errChan := make(chan error)
e.state = consensus.EngineStateLoading
e.logger.Info("loading last seen state")
2024-02-13 07:04:56 +00:00
err := e.dataTimeReel.Start()
if err != nil {
2024-01-13 06:21:16 +00:00
panic(err)
}
2024-02-13 07:04:56 +00:00
e.frameProverTrie = e.dataTimeReel.GetFrameProverTrie()
2024-01-03 07:31:42 +00:00
err = e.createCommunicationKeys()
if err != nil {
panic(err)
2023-09-25 02:43:35 +00:00
}
go e.runMessageHandler()
2023-09-25 02:43:35 +00:00
e.logger.Info("subscribing to pubsub messages")
e.pubSub.Subscribe(e.filter, e.handleMessage, true)
go func() {
server := grpc.NewServer(
grpc.MaxSendMsgSize(600*1024*1024),
grpc.MaxRecvMsgSize(600*1024*1024),
)
protobufs.RegisterCeremonyServiceServer(server, e)
if err := e.pubSub.StartDirectChannelListener(
e.pubSub.GetPeerID(),
2024-03-01 07:12:31 +00:00
"",
server,
); err != nil {
panic(err)
}
}()
2023-09-25 02:43:35 +00:00
e.state = consensus.EngineStateCollecting
2023-09-29 07:55:09 +00:00
go func() {
2024-01-10 06:58:38 +00:00
thresholdBeforeConfirming := 4
2023-09-29 07:55:09 +00:00
for {
list := &protobufs.CeremonyPeerListAnnounce{
PeerList: []*protobufs.CeremonyPeer{},
}
2024-02-13 07:04:56 +00:00
frame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
e.latestFrameReceived = frame.FrameNumber
e.logger.Info(
"preparing peer announce",
zap.Uint64("frame_number", frame.FrameNumber),
)
2023-11-01 03:45:20 +00:00
timestamp := time.Now().UnixMilli()
2024-02-13 07:04:56 +00:00
msg := binary.BigEndian.AppendUint64([]byte{}, frame.FrameNumber)
msg = append(msg, config.GetVersion()...)
2023-11-01 03:45:20 +00:00
msg = binary.BigEndian.AppendUint64(msg, uint64(timestamp))
sig, err := e.pubSub.SignMessage(msg)
if err != nil {
panic(err)
}
2023-09-29 07:55:09 +00:00
e.peerMapMx.Lock()
e.peerMap[string(e.pubSub.GetPeerID())] = &peerInfo{
peerId: e.pubSub.GetPeerID(),
multiaddr: "",
2024-02-13 07:04:56 +00:00
maxFrame: frame.FrameNumber,
version: config.GetVersion(),
2023-11-01 03:45:20 +00:00
signature: sig,
publicKey: e.pubSub.GetPublicKey(),
timestamp: timestamp,
2024-02-24 08:35:13 +00:00
totalDistance: e.dataTimeReel.GetTotalDistance().FillBytes(
make([]byte, 256),
),
2023-09-29 07:55:09 +00:00
}
deletes := []*peerInfo{}
list.PeerList = append(list.PeerList, &protobufs.CeremonyPeer{
PeerId: e.pubSub.GetPeerID(),
Multiaddr: "",
MaxFrame: frame.FrameNumber,
Version: config.GetVersion(),
Signature: sig,
PublicKey: e.pubSub.GetPublicKey(),
Timestamp: timestamp,
TotalDistance: e.dataTimeReel.GetTotalDistance().FillBytes(
make([]byte, 256),
),
})
for _, v := range e.uncooperativePeersMap {
if v == nil {
continue
}
2024-01-10 06:58:38 +00:00
if v.timestamp <= time.Now().UnixMilli()-UNCOOPERATIVE_PEER_INFO_TTL ||
thresholdBeforeConfirming > 0 {
deletes = append(deletes, v)
}
}
for _, v := range deletes {
delete(e.uncooperativePeersMap, string(v.peerId))
2023-09-29 07:55:09 +00:00
}
e.peerMapMx.Unlock()
2024-02-14 07:11:12 +00:00
if e.statsClient != nil {
_, err := e.statsClient.PutPeerInfo(
context.Background(),
&protobufs.PutPeerInfoRequest{
PeerInfo: []*protobufs.PeerInfo{
{
PeerId: e.pubSub.GetPeerID(),
Multiaddrs: []string{""},
MaxFrame: frame.FrameNumber,
Version: config.GetVersion(),
Signature: sig,
PublicKey: e.pubSub.GetPublicKey(),
Timestamp: timestamp,
TotalDistance: e.dataTimeReel.GetTotalDistance().FillBytes(
make([]byte, 256),
),
},
},
UncooperativePeerInfo: []*protobufs.PeerInfo{},
2024-02-14 07:11:12 +00:00
},
)
if err != nil {
e.logger.Error("could not emit stats", zap.Error(err))
}
}
2024-02-24 08:35:13 +00:00
e.logger.Info(
"broadcasting peer info",
zap.Uint64("frame_number", frame.FrameNumber),
)
2023-09-29 07:55:09 +00:00
if err := e.publishMessage(e.filter, list); err != nil {
e.logger.Debug("error publishing message", zap.Error(err))
}
2024-01-10 06:58:38 +00:00
if thresholdBeforeConfirming > 0 {
thresholdBeforeConfirming--
}
time.Sleep(120 * time.Second)
2023-09-29 07:55:09 +00:00
}
}()
go e.runLoop()
2024-02-13 07:04:56 +00:00
go func() {
errChan <- nil
}()
return errChan
}
func (e *CeremonyDataClockConsensusEngine) runLoop() {
dataFrameCh := e.dataTimeReel.NewFrameCh()
e.logger.Info("waiting for peer list mappings")
// We need to re-tune this so that libp2p's peerstore activation threshold
// considers DHT peers to be correct:
time.Sleep(30 * time.Second)
for e.state < consensus.EngineStateStopping {
peerCount := e.pubSub.GetNetworkPeersCount()
if peerCount < e.minimumPeersRequired {
e.logger.Info(
"waiting for minimum peers",
zap.Int("peer_count", peerCount),
)
time.Sleep(1 * time.Second)
} else {
latestFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
select {
case dataFrame := <-dataFrameCh:
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
2024-02-23 03:23:26 +00:00
dataFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
if latestFrame != nil &&
dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
go func() {
select {
case e.frameChan <- latestFrame:
default:
}
}()
}
2024-02-13 07:04:56 +00:00
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
continue
}
2024-02-13 07:04:56 +00:00
if bytes.Equal(
e.frameProverTrie.FindNearest(e.provingKeyAddress).External.Key,
e.provingKeyAddress,
) {
2024-03-17 21:14:37 +00:00
e.dataTimeReel.Insert(nextFrame, false)
2024-02-16 21:46:54 +00:00
2024-02-13 07:04:56 +00:00
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
2024-02-13 07:04:56 +00:00
}
case <-time.After(20 * time.Second):
dataFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
continue
}
2024-02-23 03:23:26 +00:00
if latestFrame == nil ||
latestFrame.FrameNumber < dataFrame.FrameNumber {
latestFrame, err = e.dataTimeReel.Head()
if err != nil {
panic(err)
}
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
go func() {
select {
case e.frameChan <- latestFrame:
default:
}
}()
}
2024-02-13 07:04:56 +00:00
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
continue
}
2024-02-19 00:28:29 +00:00
if bytes.Equal(
e.frameProverTrie.FindNearest(e.provingKeyAddress).External.Key,
e.provingKeyAddress,
) {
2024-03-17 21:14:37 +00:00
e.dataTimeReel.Insert(nextFrame, false)
2024-02-16 21:46:54 +00:00
2024-02-13 07:04:56 +00:00
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
2023-09-25 02:43:35 +00:00
}
}
}
2024-02-13 07:04:56 +00:00
}
2023-09-25 02:43:35 +00:00
}
func (e *CeremonyDataClockConsensusEngine) Stop(force bool) <-chan error {
e.logger.Info("stopping ceremony consensus engine")
e.state = consensus.EngineStateStopping
errChan := make(chan error)
wg := sync.WaitGroup{}
wg.Add(len(e.executionEngines))
for name := range e.executionEngines {
name := name
go func(name string) {
2024-02-13 07:04:56 +00:00
frame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
err = <-e.UnregisterExecutor(name, frame.FrameNumber, force)
2023-09-25 02:43:35 +00:00
if err != nil {
errChan <- err
}
wg.Done()
}(name)
}
e.logger.Info("waiting for execution engines to stop")
wg.Wait()
e.logger.Info("execution engines stopped")
2024-03-24 08:11:00 +00:00
e.dataTimeReel.Stop()
2023-09-25 02:43:35 +00:00
e.state = consensus.EngineStateStopped
e.engineMx.Lock()
defer e.engineMx.Unlock()
go func() {
errChan <- nil
}()
return errChan
}
func (e *CeremonyDataClockConsensusEngine) GetDifficulty() uint32 {
return e.difficulty
}
2024-01-03 07:31:42 +00:00
func (e *CeremonyDataClockConsensusEngine) GetFrame() *protobufs.ClockFrame {
2024-02-13 07:04:56 +00:00
frame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
return frame
2023-09-25 02:43:35 +00:00
}
func (e *CeremonyDataClockConsensusEngine) GetState() consensus.EngineState {
return e.state
}
func (
e *CeremonyDataClockConsensusEngine,
) GetFrameChannel() <-chan *protobufs.ClockFrame {
return e.frameChan
}
func (
e *CeremonyDataClockConsensusEngine,
) GetPeerInfo() *protobufs.PeerInfoResponse {
resp := &protobufs.PeerInfoResponse{}
e.peerMapMx.RLock()
for _, v := range e.peerMap {
resp.PeerInfo = append(resp.PeerInfo, &protobufs.PeerInfo{
2024-02-24 08:35:13 +00:00
PeerId: v.peerId,
Multiaddrs: []string{v.multiaddr},
MaxFrame: v.maxFrame,
Timestamp: v.timestamp,
Version: v.version,
Signature: v.signature,
PublicKey: v.publicKey,
TotalDistance: v.totalDistance,
})
}
for _, v := range e.uncooperativePeersMap {
resp.UncooperativePeerInfo = append(
resp.UncooperativePeerInfo,
&protobufs.PeerInfo{
2024-02-24 08:35:13 +00:00
PeerId: v.peerId,
Multiaddrs: []string{v.multiaddr},
MaxFrame: v.maxFrame,
Timestamp: v.timestamp,
Version: v.version,
Signature: v.signature,
PublicKey: v.publicKey,
TotalDistance: v.totalDistance,
},
)
}
e.peerMapMx.RUnlock()
return resp
}
func (e *CeremonyDataClockConsensusEngine) createCommunicationKeys() error {
_, err := e.keyManager.GetAgreementKey("q-ratchet-idk")
if err != nil {
if errors.Is(err, keys.KeyNotFoundErr) {
_, err = e.keyManager.CreateAgreementKey(
"q-ratchet-idk",
keys.KeyTypeX448,
)
if err != nil {
return errors.Wrap(err, "announce key bundle")
}
} else {
return errors.Wrap(err, "announce key bundle")
}
}
_, err = e.keyManager.GetAgreementKey("q-ratchet-spk")
if err != nil {
if errors.Is(err, keys.KeyNotFoundErr) {
_, err = e.keyManager.CreateAgreementKey(
"q-ratchet-spk",
keys.KeyTypeX448,
)
if err != nil {
return errors.Wrap(err, "announce key bundle")
}
} else {
return errors.Wrap(err, "announce key bundle")
}
}
return nil
}