ceremonyclient/node/consensus/master/execution_registration.go

125 lines
2.6 KiB
Go
Raw Permalink Normal View History

2023-09-03 23:47:09 +00:00
package master
import (
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/execution"
)
func (e *MasterClockConsensusEngine) RegisterExecutor(
exec execution.ExecutionEngine,
frame uint64,
) <-chan error {
logger := e.logger.With(zap.String("execution_engine_name", exec.GetName()))
logger.Info("registering execution engine")
errChan := make(chan error)
go func() {
2024-02-13 07:04:56 +00:00
masterFrame, err := e.masterTimeReel.Head()
if err != nil {
panic(err)
}
2023-09-03 23:47:09 +00:00
logger.Info(
"starting execution engine at frame",
2024-02-13 07:04:56 +00:00
zap.Uint64("current_frame", masterFrame.FrameNumber),
2023-09-03 23:47:09 +00:00
)
2024-02-13 07:04:56 +00:00
err = <-exec.Start()
2023-09-03 23:47:09 +00:00
if err != nil {
logger.Error("could not start execution engine", zap.Error(err))
errChan <- err
return
}
for {
2024-02-13 07:04:56 +00:00
masterFrame, err = e.masterTimeReel.Head()
if err != nil {
panic(err)
}
2023-09-03 23:47:09 +00:00
logger.Info(
"awaiting frame",
2024-02-13 07:04:56 +00:00
zap.Uint64("current_frame", masterFrame.FrameNumber),
2023-09-03 23:47:09 +00:00
zap.Uint64("target_frame", frame),
)
2024-02-13 07:04:56 +00:00
newFrame := masterFrame.FrameNumber
2023-09-03 23:47:09 +00:00
if newFrame >= frame {
logger.Info(
"injecting execution engine at frame",
zap.Uint64("current_frame", newFrame),
)
e.engineMx.Lock()
e.executionEngines[exec.GetName()] = exec
e.engineMx.Unlock()
errChan <- nil
break
}
}
}()
return errChan
}
func (e *MasterClockConsensusEngine) UnregisterExecutor(
name string,
frame uint64,
force bool,
) <-chan error {
logger := e.logger.With(zap.String("execution_engine_name", name))
logger.Info("unregistering execution engine")
errChan := make(chan error)
exec, ok := e.executionEngines[name]
if !ok {
logger.Error(
"could not unregister execution engine",
zap.Error(errors.New("execution engine not registered")),
)
errChan <- errors.New("execution engine not registered")
return errChan
}
go func() {
for {
2024-02-13 07:04:56 +00:00
masterFrame, err := e.masterTimeReel.Head()
if err != nil {
panic(err)
}
2023-09-03 23:47:09 +00:00
logger.Info(
"awaiting frame",
2024-02-13 07:04:56 +00:00
zap.Uint64("current_frame", masterFrame.FrameNumber),
2023-09-03 23:47:09 +00:00
zap.Uint64("target_frame", frame),
)
2024-02-13 07:04:56 +00:00
newFrame := masterFrame.FrameNumber
2023-09-03 23:47:09 +00:00
if newFrame >= frame {
logger.Info(
"removing execution engine at frame",
zap.Uint64("current_frame", newFrame),
)
e.engineMx.Lock()
delete(e.executionEngines, name)
e.engineMx.Unlock()
logger.Info(
"stopping execution engine at frame",
zap.Uint64("current_frame", newFrame),
)
err := <-exec.Stop(force)
if err != nil {
logger.Error("could not stop execution engine", zap.Error(err))
}
errChan <- err
break
}
}
}()
return errChan
}