diff --git a/node/config/engine.go b/node/config/engine.go index 14d2f98..c31690a 100644 --- a/node/config/engine.go +++ b/node/config/engine.go @@ -16,6 +16,8 @@ type EngineConfig struct { // system, base listen port of 40000 will listen on 40000, 40001, 40002) DataWorkerBaseListenPort uint16 `yaml:"dataWorkerBaseListenPort"` DataWorkerMemoryLimit int64 `yaml:"dataWorkerMemoryLimit"` + // Alternative configuration path to manually specify data workers by multiaddr + DataWorkerMultiaddrs []string `yaml:"dataWorkerMultiaddrs"` // Values used only for testing – do not override these in production, your // node will get kicked out diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index 6f21464..c56d1e3 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -253,9 +253,19 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { panic("invalid system configuration, minimum system configuration must be four cores") } - clients, err := e.createParallelDataClients(int(parallelism)) - if err != nil { - panic(err) + var clients []protobufs.DataIPCServiceClient + if len(e.engineConfig.DataWorkerMultiaddrs) != 0 { + clients, err = e.createParallelDataClientsFromList() + if err != nil { + panic(err) + } + } else { + clients, err = e.createParallelDataClientsFromBaseMultiaddr( + int(parallelism), + ) + if err != nil { + panic(err) + } } for { @@ -378,12 +388,60 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { return errChan } -func (e *MasterClockConsensusEngine) createParallelDataClients( - paralellism int, +func (e *MasterClockConsensusEngine) createParallelDataClientsFromList() ( + []protobufs.DataIPCServiceClient, + error, +) { + parallelism := len(e.engineConfig.DataWorkerMultiaddrs) + + e.logger.Info( + "connecting to data worker processes", + zap.Int("parallelism", parallelism), + ) + + clients := make([]protobufs.DataIPCServiceClient, parallelism) + + for i := 0; i < parallelism; i++ { + ma, err := multiaddr.NewMultiaddr(e.engineConfig.DataWorkerMultiaddrs[i]) + if err != nil { + panic(err) + } + + _, addr, err := mn.DialArgs(ma) + if err != nil { + panic(err) + } + + conn, err := grpc.Dial( + addr, + grpc.WithTransportCredentials( + insecure.NewCredentials(), + ), + grpc.WithDefaultCallOptions( + grpc.MaxCallSendMsgSize(10*1024*1024), + grpc.MaxCallRecvMsgSize(10*1024*1024), + ), + ) + if err != nil { + panic(err) + } + + clients[i] = protobufs.NewDataIPCServiceClient(conn) + } + + e.logger.Info( + "connected to data worker processes", + zap.Int("parallelism", parallelism), + ) + return clients, nil +} + +func (e *MasterClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr( + parallelism int, ) ([]protobufs.DataIPCServiceClient, error) { e.logger.Info( "connecting to data worker processes", - zap.Int("parallelism", paralellism), + zap.Int("parallelism", parallelism), ) if e.engineConfig.DataWorkerBaseListenMultiaddr == "" { @@ -394,9 +452,9 @@ func (e *MasterClockConsensusEngine) createParallelDataClients( e.engineConfig.DataWorkerBaseListenPort = 40000 } - clients := make([]protobufs.DataIPCServiceClient, paralellism) + clients := make([]protobufs.DataIPCServiceClient, parallelism) - for i := 0; i < paralellism; i++ { + for i := 0; i < parallelism; i++ { ma, err := multiaddr.NewMultiaddr( fmt.Sprintf( e.engineConfig.DataWorkerBaseListenMultiaddr, @@ -431,7 +489,7 @@ func (e *MasterClockConsensusEngine) createParallelDataClients( e.logger.Info( "connected to data worker processes", - zap.Int("parallelism", paralellism), + zap.Int("parallelism", parallelism), ) return clients, nil } diff --git a/node/main.go b/node/main.go index ecae0c3..e77edfe 100644 --- a/node/main.go +++ b/node/main.go @@ -289,9 +289,6 @@ func main() { return } - done := make(chan os.Signal, 1) - signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) - if !*dbConsole && *core == 0 { printLogo() printVersion() @@ -336,6 +333,8 @@ func main() { } if *dhtOnly { + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) dht, err := app.NewDHTNode(nodeConfig) if err != nil { panic(err) @@ -368,7 +367,7 @@ func main() { nodeConfig.Engine.DataWorkerBaseListenPort = 40000 } - if *parentProcess == 0 { + if *parentProcess == 0 && len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 { panic("parent process pid not specified") } @@ -381,6 +380,11 @@ func main() { nodeConfig.Engine.DataWorkerBaseListenMultiaddr, int(nodeConfig.Engine.DataWorkerBaseListenPort)+*core-1, ) + + if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 { + rpcMultiaddr = nodeConfig.Engine.DataWorkerMultiaddrs[*core-1] + } + srv, err := rpc.NewDataWorkerIPCServer( rpcMultiaddr, l, @@ -400,12 +404,14 @@ func main() { } fmt.Println("Loading ceremony state and starting node...") - go spawnDataWorkers() + go spawnDataWorkers(nodeConfig) kzg.Init() report := RunSelfTestIfNeeded(*configDirectory, nodeConfig) + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) var node *app.Node if *debug { node, err = app.NewDebugNode(nodeConfig, report) @@ -450,7 +456,14 @@ func main() { var dataWorkers []*exec.Cmd -func spawnDataWorkers() { +func spawnDataWorkers(nodeConfig *config.Config) { + if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 { + fmt.Println( + "Data workers configured by multiaddr, be sure these are running...", + ) + return + } + process, err := os.Executable() if err != nil { panic(err) @@ -520,6 +533,10 @@ func RunSelfTestIfNeeded( logger, _ := zap.NewProduction() cores := runtime.GOMAXPROCS(0) + if len(nodeConfig.Engine.DataWorkerMultiaddrs) != 0 { + cores = len(nodeConfig.Engine.DataWorkerMultiaddrs) + 1 + } + memory := memory.TotalMemory() d, err := os.Stat(filepath.Join(configDir, "store")) if d == nil { diff --git a/node/rpc/data_worker_ipc_server.go b/node/rpc/data_worker_ipc_server.go index 068979a..23b5e03 100644 --- a/node/rpc/data_worker_ipc_server.go +++ b/node/rpc/data_worker_ipc_server.go @@ -91,6 +91,10 @@ func (r *DataWorkerIPCServer) Start() error { go r.monitorParent() + r.logger.Info( + "data worker listening", + zap.String("address", r.listenAddrGRPC), + ) if err := s.Serve(mn.NetListener(lis)); err != nil { panic(err) } @@ -99,6 +103,14 @@ func (r *DataWorkerIPCServer) Start() error { } func (r *DataWorkerIPCServer) monitorParent() { + if r.parentProcessId == 0 { + r.logger.Info( + "no parent process id specified, running in detached worker mode", + zap.Uint32("core_id", r.coreId), + ) + return + } + for { time.Sleep(1 * time.Second) proc, err := os.FindProcess(r.parentProcessId)