From adead6d3b379c05a2ee83aa69b6f825a0b2b77bd Mon Sep 17 00:00:00 2001 From: Nicolas Carlier Date: Fri, 21 Dec 2018 11:09:07 +0000 Subject: [PATCH] feat(worker): add worker status lifecycle --- pkg/worker/work_request.go | 33 +++++++++++++++++++++++++++------ pkg/worker/work_runner.go | 20 +++++++------------- pkg/worker/work_runner_test.go | 3 +++ pkg/worker/worker.go | 2 -- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/pkg/worker/work_request.go b/pkg/worker/work_request.go index 274c2df..ff4e30e 100644 --- a/pkg/worker/work_request.go +++ b/pkg/worker/work_request.go @@ -3,10 +3,26 @@ package worker import ( "sync" "sync/atomic" + + "github.com/ncarlier/webhookd/pkg/logger" ) var workID uint64 +// WorkStatus is the status of a worload +type WorkStatus int + +const ( + // Idle means that the work is not yet started + Idle WorkStatus = iota + // Running means that the work is running + Running + // Success means that the work over + Success + // Error means that the work is over but in error + Error +) + // WorkRequest is a request of work for a worker type WorkRequest struct { ID uint64 @@ -16,7 +32,7 @@ type WorkRequest struct { Args []string MessageChan chan []byte Timeout int - Terminated bool + Status WorkStatus mutex sync.Mutex } @@ -30,22 +46,27 @@ func NewWorkRequest(name, script, payload string, args []string, timeout int) *W Args: args, Timeout: timeout, MessageChan: make(chan []byte), - Terminated: false, + Status: Idle, } } // Terminate set work request as terminated -func (wr *WorkRequest) Terminate() { +func (wr *WorkRequest) Terminate(err error) error { wr.mutex.Lock() defer wr.mutex.Unlock() - if !wr.Terminated { - wr.Terminated = true + if err != nil { + wr.Status = Error + logger.Info.Printf("Work %s#%d done [ERROR]\n", wr.Name, wr.ID) + return err } + wr.Status = Success + logger.Info.Printf("Work %s#%d done [SUCCESS]\n", wr.Name, wr.ID) + return nil } // IsTerminated ask if the work request is terminated func (wr *WorkRequest) IsTerminated() bool { wr.mutex.Lock() defer wr.mutex.Unlock() - return wr.Terminated + return wr.Status == Success || wr.Status == Error } diff --git a/pkg/worker/work_runner.go b/pkg/worker/work_runner.go index 56bd83d..afa038e 100644 --- a/pkg/worker/work_runner.go +++ b/pkg/worker/work_runner.go @@ -23,13 +23,14 @@ func (c *ChanWriter) Write(p []byte) (int, error) { } func run(work *WorkRequest) error { + work.Status = Running logger.Info.Printf("Work %s#%d started...\n", work.Name, work.ID) logger.Debug.Printf("Work %s#%d script: %s\n", work.Name, work.ID, work.Script) logger.Debug.Printf("Work %s#%d parameter: %v\n", work.Name, work.ID, work.Args) binary, err := exec.LookPath(work.Script) if err != nil { - return err + return work.Terminate(err) } // Exec script with args... @@ -42,7 +43,7 @@ func run(work *WorkRequest) error { // Open the log file for writing logFile, err := createLogFile(work) if err != nil { - return err + return work.Terminate(err) } defer logFile.Close() logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFile.Name()) @@ -53,18 +54,18 @@ func run(work *WorkRequest) error { // Combine cmd stdout and stderr outReader, err := cmd.StdoutPipe() if err != nil { - return err + return work.Terminate(err) } errReader, err := cmd.StderrPipe() if err != nil { - return err + return work.Terminate(err) } cmdReader := io.MultiReader(outReader, errReader) // Start the script... err = cmd.Start() if err != nil { - return err + return work.Terminate(err) } // Create wait group to wait for command output completion @@ -111,12 +112,5 @@ func run(work *WorkRequest) error { timer.Stop() // Mark work as terminated - work.Terminate() - - if err != nil { - logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID) - return err - } - logger.Info.Printf("Work %s#%d done [SUCCESS]\n", work.Name, work.ID) - return nil + return work.Terminate(err) } diff --git a/pkg/worker/work_runner_test.go b/pkg/worker/work_runner_test.go index accb848..8a94acb 100644 --- a/pkg/worker/work_runner_test.go +++ b/pkg/worker/work_runner_test.go @@ -33,6 +33,7 @@ func TestWorkRunner(t *testing.T) { printWorkMessages(work) err := run(work) assert.Nil(t, err, "") + assert.Equal(t, work.Status, Success, "") // Test that log file is ok id := strconv.FormatUint(work.ID, 10) @@ -50,6 +51,7 @@ func TestWorkRunnerWithError(t *testing.T) { printWorkMessages(work) err := run(work) assert.NotNil(t, err, "") + assert.Equal(t, work.Status, Error, "") assert.Equal(t, "exit status 1", err.Error(), "") } @@ -61,5 +63,6 @@ func TestWorkRunnerWithTimeout(t *testing.T) { printWorkMessages(work) err := run(work) assert.NotNil(t, err, "") + assert.Equal(t, work.Status, Error, "") assert.Equal(t, "signal: killed", err.Error(), "") } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 8745f3d..c926fc7 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -44,11 +44,9 @@ func (w Worker) Start() { logger.Debug.Printf("Worker #%d received work request: %s#%d\n", w.ID, work.Name, work.ID) err := run(&work) if err != nil { - // subject := fmt.Sprintf("Webhook %s#%d FAILED.", work.Name, work.ID) work.MessageChan <- []byte(fmt.Sprintf("error: %s", err.Error())) // notify(subject, err.Error(), filename) } else { - // subject := fmt.Sprintf("Webhook %s#%d SUCCEEDED.", work.Name, workID) work.MessageChan <- []byte("done") // notify(subject, "See attachment.", filename) }