ceremonyclient/node/consensus/master/master_clock_consensus_engine.go

609 lines
16 KiB
Go
Raw Normal View History

2023-09-03 23:47:09 +00:00
package master
import (
"bytes"
2024-03-01 07:12:31 +00:00
"context"
"crypto/rand"
2024-05-25 05:07:57 +00:00
"encoding/binary"
2023-09-03 23:47:09 +00:00
"encoding/hex"
"io"
2024-03-04 03:20:24 +00:00
"math/big"
2023-09-03 23:47:09 +00:00
"sync"
"time"
"github.com/mr-tron/base58"
2023-09-03 23:47:09 +00:00
"github.com/pkg/errors"
"go.uber.org/zap"
2024-03-01 07:12:31 +00:00
"google.golang.org/grpc"
2023-09-03 23:47:09 +00:00
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
2024-02-13 07:04:56 +00:00
qtime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/crypto"
2023-09-03 23:47:09 +00:00
"source.quilibrium.com/quilibrium/monorepo/node/execution"
"source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
2023-09-09 23:45:47 +00:00
"source.quilibrium.com/quilibrium/monorepo/node/store"
2023-09-03 23:47:09 +00:00
)
type SyncStatusType int
const (
SyncStatusNotSyncing = iota
SyncStatusAwaitingResponse
SyncStatusSynchronizing
)
type MasterClockConsensusEngine struct {
2024-03-01 07:12:31 +00:00
*protobufs.UnimplementedValidationServiceServer
2023-09-03 23:47:09 +00:00
difficulty uint32
logger *zap.Logger
state consensus.EngineState
pubSub p2p.PubSub
keyManager keys.KeyManager
2024-02-13 07:04:56 +00:00
frameProver crypto.FrameProver
2023-09-03 23:47:09 +00:00
lastFrameReceivedAt time.Time
frameChan chan *protobufs.ClockFrame
executionEngines map[string]execution.ExecutionEngine
filter []byte
input []byte
syncingStatus SyncStatusType
syncingTarget []byte
engineMx sync.Mutex
seenFramesMx sync.Mutex
historicFramesMx sync.Mutex
seenFrames []*protobufs.ClockFrame
historicFrames []*protobufs.ClockFrame
clockStore store.ClockStore
masterTimeReel *qtime.MasterTimeReel
2024-03-21 07:14:45 +00:00
peerInfoManager p2p.PeerInfoManager
report *protobufs.SelfTestReport
frameValidationCh chan *protobufs.ClockFrame
bandwidthTestCh chan []byte
currentReceivingSyncPeers int
currentReceivingSyncPeersMx sync.Mutex
2023-09-03 23:47:09 +00:00
}
var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil)
func NewMasterClockConsensusEngine(
engineConfig *config.EngineConfig,
logger *zap.Logger,
2023-09-09 23:45:47 +00:00
clockStore store.ClockStore,
2023-09-03 23:47:09 +00:00
keyManager keys.KeyManager,
pubSub p2p.PubSub,
2024-02-13 07:04:56 +00:00
frameProver crypto.FrameProver,
masterTimeReel *qtime.MasterTimeReel,
2024-03-21 07:14:45 +00:00
peerInfoManager p2p.PeerInfoManager,
2024-03-01 07:12:31 +00:00
report *protobufs.SelfTestReport,
2023-09-03 23:47:09 +00:00
) *MasterClockConsensusEngine {
if logger == nil {
panic(errors.New("logger is nil"))
}
if engineConfig == nil {
panic(errors.New("engine config is nil"))
}
if keyManager == nil {
panic(errors.New("key manager is nil"))
}
if pubSub == nil {
panic(errors.New("pubsub is nil"))
}
2024-02-13 07:04:56 +00:00
if frameProver == nil {
panic(errors.New("frame prover is nil"))
}
if masterTimeReel == nil {
panic(errors.New("master time reel is nil"))
}
2023-09-03 23:47:09 +00:00
seed, err := hex.DecodeString(engineConfig.GenesisSeed)
if err != nil {
panic(errors.New("genesis seed is nil"))
}
e := &MasterClockConsensusEngine{
difficulty: 10000,
logger: logger,
state: consensus.EngineStateStopped,
keyManager: keyManager,
pubSub: pubSub,
executionEngines: map[string]execution.ExecutionEngine{},
2024-01-03 07:31:42 +00:00
frameChan: make(chan *protobufs.ClockFrame),
2023-09-03 23:47:09 +00:00
input: seed,
lastFrameReceivedAt: time.Time{},
syncingStatus: SyncStatusNotSyncing,
2023-09-09 23:45:47 +00:00
clockStore: clockStore,
2024-02-13 07:04:56 +00:00
frameProver: frameProver,
masterTimeReel: masterTimeReel,
2024-03-21 07:14:45 +00:00
peerInfoManager: peerInfoManager,
2024-03-01 07:12:31 +00:00
report: report,
frameValidationCh: make(chan *protobufs.ClockFrame),
bandwidthTestCh: make(chan []byte),
2024-01-03 07:31:42 +00:00
}
2024-03-21 07:14:45 +00:00
e.addPeerManifestReport(e.pubSub.GetPeerID(), report)
2024-03-01 07:12:31 +00:00
2023-09-03 23:47:09 +00:00
if e.filter, err = hex.DecodeString(engineConfig.Filter); err != nil {
panic(errors.Wrap(err, "could not parse filter value"))
}
logger.Info("constructing consensus engine")
return e
}
func (e *MasterClockConsensusEngine) Start() <-chan error {
2024-01-03 07:31:42 +00:00
e.logger.Info("starting master consensus engine")
2023-09-03 23:47:09 +00:00
e.state = consensus.EngineStateStarting
errChan := make(chan error)
2024-03-21 07:14:45 +00:00
e.peerInfoManager.Start()
2023-09-03 23:47:09 +00:00
e.state = consensus.EngineStateLoading
e.logger.Info("syncing last seen state")
2024-02-13 07:04:56 +00:00
err := e.masterTimeReel.Start()
if err != nil {
panic(err)
}
2023-09-09 23:45:47 +00:00
2024-02-13 07:04:56 +00:00
frame, err := e.masterTimeReel.Head()
if err != nil {
2023-09-09 23:45:47 +00:00
panic(err)
}
2024-02-13 07:04:56 +00:00
e.buildHistoricFrameCache(frame)
2023-09-03 23:47:09 +00:00
go func() {
for {
select {
case newFrame := <-e.frameValidationCh:
head, err := e.masterTimeReel.Head()
if err != nil {
panic(err)
}
if head.FrameNumber > newFrame.FrameNumber || newFrame.FrameNumber-head.FrameNumber > 128 {
e.logger.Debug(
"frame out of range, ignoring",
zap.Uint64("number", newFrame.FrameNumber),
)
continue
}
if err := e.frameProver.VerifyMasterClockFrame(newFrame); err != nil {
e.logger.Error("could not verify clock frame", zap.Error(err))
continue
}
2024-03-17 21:14:37 +00:00
e.masterTimeReel.Insert(newFrame, false)
case peerId := <-e.bandwidthTestCh:
e.performBandwidthTest(peerId)
}
}
}()
2023-09-03 23:47:09 +00:00
e.logger.Info("subscribing to pubsub messages")
e.pubSub.Subscribe(e.filter, e.handleMessage, true)
e.state = consensus.EngineStateCollecting
2024-03-01 07:12:31 +00:00
go func() {
server := grpc.NewServer(
grpc.MaxSendMsgSize(600*1024*1024),
grpc.MaxRecvMsgSize(600*1024*1024),
)
protobufs.RegisterValidationServiceServer(server, e)
if err := e.pubSub.StartDirectChannelListener(
e.pubSub.GetPeerID(),
"validation",
server,
); err != nil {
panic(err)
}
}()
2023-09-10 23:29:17 +00:00
go func() {
for {
e.logger.Info(
"peers in store",
zap.Int("peer_store_count", e.pubSub.GetPeerstoreCount()),
zap.Int("network_peer_count", e.pubSub.GetNetworkPeersCount()),
)
time.Sleep(10 * time.Second)
}
}()
2024-03-01 07:12:31 +00:00
go func() {
// Let it sit until we at least have a few more peers inbound
time.Sleep(30 * time.Second)
2024-03-01 07:12:31 +00:00
for {
2024-03-08 05:05:04 +00:00
head, err := e.masterTimeReel.Head()
if err != nil {
panic(err)
}
e.report.MasterHeadFrame = head.FrameNumber
2024-05-25 05:07:57 +00:00
parallelism := e.report.Cores - 1
skew := (e.report.DifficultyMetric * 12) / 10
challenge := binary.BigEndian.AppendUint64(
[]byte{},
e.report.MasterHeadFrame,
)
challenge = append(challenge, e.pubSub.GetPeerID()...)
ts, proofs, err := e.frameProver.CalculateChallengeProof(
challenge,
parallelism,
skew,
)
if err != nil {
panic(err)
}
proof := binary.BigEndian.AppendUint64([]byte{}, uint64(ts))
for i := 0; i < len(proofs); i++ {
proof = append(proof, proofs[i]...)
}
e.report.Proof = proof
e.logger.Info(
"broadcasting self-test info",
zap.Uint64("current_frame", e.report.MasterHeadFrame),
)
2024-03-01 07:12:31 +00:00
if err := e.publishMessage(e.filter, e.report); err != nil {
e.logger.Debug("error publishing message", zap.Error(err))
}
}
}()
2023-09-03 23:47:09 +00:00
go func() {
2024-02-13 07:04:56 +00:00
newFrameCh := e.masterTimeReel.NewFrameCh()
2023-09-03 23:47:09 +00:00
for e.state < consensus.EngineStateStopping {
var err error
2024-02-13 07:04:56 +00:00
select {
case frame := <-newFrameCh:
currentFrame := frame
latestFrame := frame
if latestFrame, err = e.collect(currentFrame); err != nil {
2023-09-03 23:47:09 +00:00
e.logger.Error("could not collect", zap.Error(err))
2024-01-03 07:31:42 +00:00
latestFrame = currentFrame
2024-02-13 07:04:56 +00:00
continue
2023-09-03 23:47:09 +00:00
}
if latestFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
2024-01-03 07:31:42 +00:00
latestFrame = currentFrame
2023-09-03 23:47:09 +00:00
}
if err = e.publishProof(latestFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
2024-02-13 07:04:56 +00:00
case <-time.After(20 * time.Second):
frame, err := e.masterTimeReel.Head()
if err != nil {
panic(err)
}
if frame, err = e.prove(frame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
continue
}
if err = e.publishProof(frame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
2023-09-03 23:47:09 +00:00
}
}
}()
go func() {
errChan <- nil
}()
return errChan
}
2024-03-01 07:12:31 +00:00
func (e *MasterClockConsensusEngine) PerformValidation(
ctx context.Context,
msg *protobufs.ValidationMessage,
) (*protobufs.ValidationMessage, error) {
return msg, nil
}
2023-09-03 23:47:09 +00:00
func (e *MasterClockConsensusEngine) Stop(force bool) <-chan error {
e.logger.Info("stopping consensus engine")
e.state = consensus.EngineStateStopping
errChan := make(chan error)
wg := sync.WaitGroup{}
wg.Add(len(e.executionEngines))
for name := range e.executionEngines {
name := name
go func(name string) {
2024-02-13 07:04:56 +00:00
frame, err := e.masterTimeReel.Head()
if err != nil {
errChan <- err
return
}
err = <-e.UnregisterExecutor(name, frame.FrameNumber, force)
2023-09-03 23:47:09 +00:00
if err != nil {
errChan <- err
}
wg.Done()
}(name)
}
e.logger.Info("waiting for execution engines to stop")
wg.Wait()
e.logger.Info("execution engines stopped")
2024-02-13 07:04:56 +00:00
e.masterTimeReel.Stop()
2024-03-21 07:14:45 +00:00
e.peerInfoManager.Stop()
2023-09-03 23:47:09 +00:00
e.state = consensus.EngineStateStopped
go func() {
errChan <- nil
}()
return errChan
}
func (e *MasterClockConsensusEngine) performBandwidthTest(peerID []byte) {
result := e.pubSub.GetMultiaddrOfPeer(peerID)
if result == "" {
return
}
cc, err := e.pubSub.GetDirectChannel(peerID, "validation")
if err != nil {
2024-03-24 08:11:00 +00:00
e.logger.Debug(
"could not connect to peer for validation",
zap.String("peer_id", base58.Encode(peerID)),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
client := protobufs.NewValidationServiceClient(cc)
verification := make([]byte, 1048576)
rand.Read(verification)
start := time.Now().UnixMilli()
validation, err := client.PerformValidation(
context.Background(),
&protobufs.ValidationMessage{
Validation: verification,
},
)
end := time.Now().UnixMilli()
if err != nil && err != io.EOF {
cc.Close()
2024-03-24 08:11:00 +00:00
e.logger.Debug(
"peer returned error",
zap.String("peer_id", base58.Encode(peerID)),
zap.Error(err),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
cc.Close()
if !bytes.Equal(verification, validation.Validation) {
2024-03-24 08:11:00 +00:00
e.logger.Debug(
"peer provided invalid verification",
zap.String("peer_id", base58.Encode(peerID)),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
if end-start > 2000 {
2024-03-24 08:11:00 +00:00
e.logger.Debug(
"peer has slow bandwidth, scoring out",
zap.String("peer_id", base58.Encode(peerID)),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
2024-03-21 07:14:45 +00:00
duration := end - start
bandwidth := uint64(1048576*1000) / uint64(duration)
manifest := e.peerInfoManager.GetPeerInfo(peerID)
if manifest == nil {
return
}
peerManifest := &p2p.PeerManifest{
PeerId: peerID,
Difficulty: manifest.Difficulty,
DifficultyMetric: manifest.DifficultyMetric,
Commit_16Metric: manifest.Commit_16Metric,
Commit_128Metric: manifest.Commit_128Metric,
Commit_1024Metric: manifest.Commit_1024Metric,
Commit_65536Metric: manifest.Commit_65536Metric,
Proof_16Metric: manifest.Proof_16Metric,
Proof_128Metric: manifest.Proof_128Metric,
Proof_1024Metric: manifest.Proof_1024Metric,
Proof_65536Metric: manifest.Proof_65536Metric,
Cores: manifest.Cores,
Memory: manifest.Memory,
Storage: manifest.Storage,
Capabilities: []p2p.Capability{},
MasterHeadFrame: manifest.MasterHeadFrame,
Bandwidth: bandwidth,
}
for _, capability := range manifest.Capabilities {
metadata := make([]byte, len(capability.AdditionalMetadata))
copy(metadata[:], capability.AdditionalMetadata[:])
peerManifest.Capabilities = append(
peerManifest.Capabilities,
p2p.Capability{
ProtocolIdentifier: capability.ProtocolIdentifier,
AdditionalMetadata: metadata,
},
)
}
e.peerInfoManager.AddPeerInfo(manifest)
}
2024-03-04 03:20:24 +00:00
func (
e *MasterClockConsensusEngine,
) GetPeerManifests() *protobufs.PeerManifestsResponse {
response := &protobufs.PeerManifestsResponse{
PeerManifests: []*protobufs.PeerManifest{},
}
2024-03-21 07:14:45 +00:00
peerMap := e.peerInfoManager.GetPeerMap()
for peerId, peerManifest := range peerMap {
2024-03-04 03:20:24 +00:00
peerId := peerId
peerManifest := peerManifest
manifest := &protobufs.PeerManifest{
PeerId: []byte(peerId),
Difficulty: peerManifest.Difficulty,
DifficultyMetric: peerManifest.DifficultyMetric,
Commit_16Metric: peerManifest.Commit_16Metric,
Commit_128Metric: peerManifest.Commit_128Metric,
Commit_1024Metric: peerManifest.Commit_1024Metric,
Commit_65536Metric: peerManifest.Commit_65536Metric,
Proof_16Metric: peerManifest.Proof_16Metric,
Proof_128Metric: peerManifest.Proof_128Metric,
Proof_1024Metric: peerManifest.Proof_1024Metric,
Proof_65536Metric: peerManifest.Proof_65536Metric,
Cores: peerManifest.Cores,
Memory: new(big.Int).SetBytes(peerManifest.Memory).Bytes(),
Storage: new(big.Int).SetBytes(peerManifest.Storage).Bytes(),
2024-03-08 05:05:04 +00:00
MasterHeadFrame: peerManifest.MasterHeadFrame,
2024-05-25 05:07:57 +00:00
LastSeen: peerManifest.LastSeen,
2024-03-04 03:20:24 +00:00
}
for _, capability := range peerManifest.Capabilities {
metadata := make([]byte, len(capability.AdditionalMetadata))
copy(metadata[:], capability.AdditionalMetadata[:])
manifest.Capabilities = append(
manifest.Capabilities,
&protobufs.Capability{
ProtocolIdentifier: capability.ProtocolIdentifier,
AdditionalMetadata: metadata,
},
)
}
response.PeerManifests = append(
response.PeerManifests,
manifest,
)
}
return response
}
2023-09-03 23:47:09 +00:00
func (e *MasterClockConsensusEngine) GetDifficulty() uint32 {
return e.difficulty
}
2024-01-03 07:31:42 +00:00
func (e *MasterClockConsensusEngine) GetFrame() *protobufs.ClockFrame {
2024-02-13 07:04:56 +00:00
frame, err := e.masterTimeReel.Head()
if err != nil {
panic(err)
}
return frame
2023-09-03 23:47:09 +00:00
}
func (e *MasterClockConsensusEngine) GetState() consensus.EngineState {
return e.state
}
2024-01-03 07:31:42 +00:00
func (
e *MasterClockConsensusEngine,
) GetFrameChannel() <-chan *protobufs.ClockFrame {
2023-09-03 23:47:09 +00:00
return e.frameChan
}
2024-01-03 07:31:42 +00:00
func (e *MasterClockConsensusEngine) buildHistoricFrameCache(
latestFrame *protobufs.ClockFrame,
) {
e.historicFrames = []*protobufs.ClockFrame{}
if latestFrame.FrameNumber != 0 {
min := uint64(0)
if latestFrame.FrameNumber-255 > min && latestFrame.FrameNumber > 255 {
min = latestFrame.FrameNumber - 255
}
iter, err := e.clockStore.RangeMasterClockFrames(
e.filter,
min,
latestFrame.FrameNumber-1,
)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.Value()
if err != nil {
panic(err)
}
e.historicFrames = append(e.historicFrames, frame)
}
if err = iter.Close(); err != nil {
panic(err)
}
}
e.historicFrames = append(e.historicFrames, latestFrame)
}
2024-03-21 07:14:45 +00:00
func (e *MasterClockConsensusEngine) addPeerManifestReport(
peerId []byte,
report *protobufs.SelfTestReport,
) {
manifest := &p2p.PeerManifest{
PeerId: peerId,
Difficulty: report.Difficulty,
DifficultyMetric: report.DifficultyMetric,
Commit_16Metric: report.Commit_16Metric,
Commit_128Metric: report.Commit_128Metric,
Commit_1024Metric: report.Commit_1024Metric,
Commit_65536Metric: report.Commit_65536Metric,
Proof_16Metric: report.Proof_16Metric,
Proof_128Metric: report.Proof_128Metric,
Proof_1024Metric: report.Proof_1024Metric,
Proof_65536Metric: report.Proof_65536Metric,
Cores: report.Cores,
Memory: report.Memory,
Storage: report.Storage,
Capabilities: []p2p.Capability{},
MasterHeadFrame: report.MasterHeadFrame,
2024-05-25 05:07:57 +00:00
LastSeen: time.Now().UnixMilli(),
2024-03-21 07:14:45 +00:00
}
for _, capability := range manifest.Capabilities {
metadata := make([]byte, len(capability.AdditionalMetadata))
copy(metadata[:], capability.AdditionalMetadata[:])
manifest.Capabilities = append(
manifest.Capabilities,
p2p.Capability{
ProtocolIdentifier: capability.ProtocolIdentifier,
AdditionalMetadata: metadata,
},
)
}
e.peerInfoManager.AddPeerInfo(manifest)
}