mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-27 16:15:18 +00:00
Merge branch 'feat-data-worker-direct-config' into 'v1.4.19'
feat: support detached configuration mode for data workers See merge request quilibrium/ceremonyclient!7
This commit is contained in:
commit
6b0dd69e93
@ -16,6 +16,8 @@ type EngineConfig struct {
|
|||||||
// system, base listen port of 40000 will listen on 40000, 40001, 40002)
|
// system, base listen port of 40000 will listen on 40000, 40001, 40002)
|
||||||
DataWorkerBaseListenPort uint16 `yaml:"dataWorkerBaseListenPort"`
|
DataWorkerBaseListenPort uint16 `yaml:"dataWorkerBaseListenPort"`
|
||||||
DataWorkerMemoryLimit int64 `yaml:"dataWorkerMemoryLimit"`
|
DataWorkerMemoryLimit int64 `yaml:"dataWorkerMemoryLimit"`
|
||||||
|
// Alternative configuration path to manually specify data workers by multiaddr
|
||||||
|
DataWorkerMultiaddrs []string `yaml:"dataWorkerMultiaddrs"`
|
||||||
|
|
||||||
// Values used only for testing – do not override these in production, your
|
// Values used only for testing – do not override these in production, your
|
||||||
// node will get kicked out
|
// node will get kicked out
|
||||||
|
@ -253,9 +253,19 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
|
|||||||
panic("invalid system configuration, minimum system configuration must be four cores")
|
panic("invalid system configuration, minimum system configuration must be four cores")
|
||||||
}
|
}
|
||||||
|
|
||||||
clients, err := e.createParallelDataClients(int(parallelism))
|
var clients []protobufs.DataIPCServiceClient
|
||||||
if err != nil {
|
if len(e.engineConfig.DataWorkerMultiaddrs) != 0 {
|
||||||
panic(err)
|
clients, err = e.createParallelDataClientsFromList()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
clients, err = e.createParallelDataClientsFromBaseMultiaddr(
|
||||||
|
int(parallelism),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -378,12 +388,60 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
|
|||||||
return errChan
|
return errChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *MasterClockConsensusEngine) createParallelDataClients(
|
func (e *MasterClockConsensusEngine) createParallelDataClientsFromList() (
|
||||||
paralellism int,
|
[]protobufs.DataIPCServiceClient,
|
||||||
|
error,
|
||||||
|
) {
|
||||||
|
parallelism := len(e.engineConfig.DataWorkerMultiaddrs)
|
||||||
|
|
||||||
|
e.logger.Info(
|
||||||
|
"connecting to data worker processes",
|
||||||
|
zap.Int("parallelism", parallelism),
|
||||||
|
)
|
||||||
|
|
||||||
|
clients := make([]protobufs.DataIPCServiceClient, parallelism)
|
||||||
|
|
||||||
|
for i := 0; i < parallelism; i++ {
|
||||||
|
ma, err := multiaddr.NewMultiaddr(e.engineConfig.DataWorkerMultiaddrs[i])
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, addr, err := mn.DialArgs(ma)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := grpc.Dial(
|
||||||
|
addr,
|
||||||
|
grpc.WithTransportCredentials(
|
||||||
|
insecure.NewCredentials(),
|
||||||
|
),
|
||||||
|
grpc.WithDefaultCallOptions(
|
||||||
|
grpc.MaxCallSendMsgSize(10*1024*1024),
|
||||||
|
grpc.MaxCallRecvMsgSize(10*1024*1024),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
clients[i] = protobufs.NewDataIPCServiceClient(conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.logger.Info(
|
||||||
|
"connected to data worker processes",
|
||||||
|
zap.Int("parallelism", parallelism),
|
||||||
|
)
|
||||||
|
return clients, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *MasterClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr(
|
||||||
|
parallelism int,
|
||||||
) ([]protobufs.DataIPCServiceClient, error) {
|
) ([]protobufs.DataIPCServiceClient, error) {
|
||||||
e.logger.Info(
|
e.logger.Info(
|
||||||
"connecting to data worker processes",
|
"connecting to data worker processes",
|
||||||
zap.Int("parallelism", paralellism),
|
zap.Int("parallelism", parallelism),
|
||||||
)
|
)
|
||||||
|
|
||||||
if e.engineConfig.DataWorkerBaseListenMultiaddr == "" {
|
if e.engineConfig.DataWorkerBaseListenMultiaddr == "" {
|
||||||
@ -394,9 +452,9 @@ func (e *MasterClockConsensusEngine) createParallelDataClients(
|
|||||||
e.engineConfig.DataWorkerBaseListenPort = 40000
|
e.engineConfig.DataWorkerBaseListenPort = 40000
|
||||||
}
|
}
|
||||||
|
|
||||||
clients := make([]protobufs.DataIPCServiceClient, paralellism)
|
clients := make([]protobufs.DataIPCServiceClient, parallelism)
|
||||||
|
|
||||||
for i := 0; i < paralellism; i++ {
|
for i := 0; i < parallelism; i++ {
|
||||||
ma, err := multiaddr.NewMultiaddr(
|
ma, err := multiaddr.NewMultiaddr(
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
e.engineConfig.DataWorkerBaseListenMultiaddr,
|
e.engineConfig.DataWorkerBaseListenMultiaddr,
|
||||||
@ -431,7 +489,7 @@ func (e *MasterClockConsensusEngine) createParallelDataClients(
|
|||||||
|
|
||||||
e.logger.Info(
|
e.logger.Info(
|
||||||
"connected to data worker processes",
|
"connected to data worker processes",
|
||||||
zap.Int("parallelism", paralellism),
|
zap.Int("parallelism", parallelism),
|
||||||
)
|
)
|
||||||
return clients, nil
|
return clients, nil
|
||||||
}
|
}
|
||||||
|
29
node/main.go
29
node/main.go
@ -289,9 +289,6 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
done := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
|
|
||||||
if !*dbConsole && *core == 0 {
|
if !*dbConsole && *core == 0 {
|
||||||
printLogo()
|
printLogo()
|
||||||
printVersion()
|
printVersion()
|
||||||
@ -336,6 +333,8 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if *dhtOnly {
|
if *dhtOnly {
|
||||||
|
done := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
|
||||||
dht, err := app.NewDHTNode(nodeConfig)
|
dht, err := app.NewDHTNode(nodeConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -368,7 +367,7 @@ func main() {
|
|||||||
nodeConfig.Engine.DataWorkerBaseListenPort = 40000
|
nodeConfig.Engine.DataWorkerBaseListenPort = 40000
|
||||||
}
|
}
|
||||||
|
|
||||||
if *parentProcess == 0 {
|
if *parentProcess == 0 && len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 {
|
||||||
panic("parent process pid not specified")
|
panic("parent process pid not specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -381,6 +380,11 @@ func main() {
|
|||||||
nodeConfig.Engine.DataWorkerBaseListenMultiaddr,
|
nodeConfig.Engine.DataWorkerBaseListenMultiaddr,
|
||||||
int(nodeConfig.Engine.DataWorkerBaseListenPort)+*core-1,
|
int(nodeConfig.Engine.DataWorkerBaseListenPort)+*core-1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 {
|
||||||
|
rpcMultiaddr = nodeConfig.Engine.DataWorkerMultiaddrs[*core-1]
|
||||||
|
}
|
||||||
|
|
||||||
srv, err := rpc.NewDataWorkerIPCServer(
|
srv, err := rpc.NewDataWorkerIPCServer(
|
||||||
rpcMultiaddr,
|
rpcMultiaddr,
|
||||||
l,
|
l,
|
||||||
@ -400,12 +404,14 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Loading ceremony state and starting node...")
|
fmt.Println("Loading ceremony state and starting node...")
|
||||||
go spawnDataWorkers()
|
go spawnDataWorkers(nodeConfig)
|
||||||
|
|
||||||
kzg.Init()
|
kzg.Init()
|
||||||
|
|
||||||
report := RunSelfTestIfNeeded(*configDirectory, nodeConfig)
|
report := RunSelfTestIfNeeded(*configDirectory, nodeConfig)
|
||||||
|
|
||||||
|
done := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
|
||||||
var node *app.Node
|
var node *app.Node
|
||||||
if *debug {
|
if *debug {
|
||||||
node, err = app.NewDebugNode(nodeConfig, report)
|
node, err = app.NewDebugNode(nodeConfig, report)
|
||||||
@ -450,7 +456,14 @@ func main() {
|
|||||||
|
|
||||||
var dataWorkers []*exec.Cmd
|
var dataWorkers []*exec.Cmd
|
||||||
|
|
||||||
func spawnDataWorkers() {
|
func spawnDataWorkers(nodeConfig *config.Config) {
|
||||||
|
if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 {
|
||||||
|
fmt.Println(
|
||||||
|
"Data workers configured by multiaddr, be sure these are running...",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
process, err := os.Executable()
|
process, err := os.Executable()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -520,6 +533,10 @@ func RunSelfTestIfNeeded(
|
|||||||
logger, _ := zap.NewProduction()
|
logger, _ := zap.NewProduction()
|
||||||
|
|
||||||
cores := runtime.GOMAXPROCS(0)
|
cores := runtime.GOMAXPROCS(0)
|
||||||
|
if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 {
|
||||||
|
cores = len(nodeConfig.Engine.DataWorkerMultiaddrs) + 1
|
||||||
|
}
|
||||||
|
|
||||||
memory := memory.TotalMemory()
|
memory := memory.TotalMemory()
|
||||||
d, err := os.Stat(filepath.Join(configDir, "store"))
|
d, err := os.Stat(filepath.Join(configDir, "store"))
|
||||||
if d == nil {
|
if d == nil {
|
||||||
|
@ -91,6 +91,10 @@ func (r *DataWorkerIPCServer) Start() error {
|
|||||||
|
|
||||||
go r.monitorParent()
|
go r.monitorParent()
|
||||||
|
|
||||||
|
r.logger.Info(
|
||||||
|
"data worker listening",
|
||||||
|
zap.String("address", r.listenAddrGRPC),
|
||||||
|
)
|
||||||
if err := s.Serve(mn.NetListener(lis)); err != nil {
|
if err := s.Serve(mn.NetListener(lis)); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -99,6 +103,14 @@ func (r *DataWorkerIPCServer) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *DataWorkerIPCServer) monitorParent() {
|
func (r *DataWorkerIPCServer) monitorParent() {
|
||||||
|
if r.parentProcessId == 0 {
|
||||||
|
r.logger.Info(
|
||||||
|
"no parent process id specified, running in detached worker mode",
|
||||||
|
zap.Uint32("core_id", r.coreId),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
proc, err := os.FindProcess(r.parentProcessId)
|
proc, err := os.FindProcess(r.parentProcessId)
|
||||||
|
Loading…
Reference in New Issue
Block a user