feat(worker): add worker status lifecycle

This commit is contained in:
Nicolas Carlier 2018-12-21 11:09:07 +00:00
parent 2ca5d671b9
commit adead6d3b3
4 changed files with 37 additions and 21 deletions

View File

@ -3,10 +3,26 @@ package worker
import ( import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/ncarlier/webhookd/pkg/logger"
) )
var workID uint64 var workID uint64
// WorkStatus is the status of a worload
type WorkStatus int
const (
// Idle means that the work is not yet started
Idle WorkStatus = iota
// Running means that the work is running
Running
// Success means that the work over
Success
// Error means that the work is over but in error
Error
)
// WorkRequest is a request of work for a worker // WorkRequest is a request of work for a worker
type WorkRequest struct { type WorkRequest struct {
ID uint64 ID uint64
@ -16,7 +32,7 @@ type WorkRequest struct {
Args []string Args []string
MessageChan chan []byte MessageChan chan []byte
Timeout int Timeout int
Terminated bool Status WorkStatus
mutex sync.Mutex mutex sync.Mutex
} }
@ -30,22 +46,27 @@ func NewWorkRequest(name, script, payload string, args []string, timeout int) *W
Args: args, Args: args,
Timeout: timeout, Timeout: timeout,
MessageChan: make(chan []byte), MessageChan: make(chan []byte),
Terminated: false, Status: Idle,
} }
} }
// Terminate set work request as terminated // Terminate set work request as terminated
func (wr *WorkRequest) Terminate() { func (wr *WorkRequest) Terminate(err error) error {
wr.mutex.Lock() wr.mutex.Lock()
defer wr.mutex.Unlock() defer wr.mutex.Unlock()
if !wr.Terminated { if err != nil {
wr.Terminated = true wr.Status = Error
logger.Info.Printf("Work %s#%d done [ERROR]\n", wr.Name, wr.ID)
return err
} }
wr.Status = Success
logger.Info.Printf("Work %s#%d done [SUCCESS]\n", wr.Name, wr.ID)
return nil
} }
// IsTerminated ask if the work request is terminated // IsTerminated ask if the work request is terminated
func (wr *WorkRequest) IsTerminated() bool { func (wr *WorkRequest) IsTerminated() bool {
wr.mutex.Lock() wr.mutex.Lock()
defer wr.mutex.Unlock() defer wr.mutex.Unlock()
return wr.Terminated return wr.Status == Success || wr.Status == Error
} }

View File

@ -23,13 +23,14 @@ func (c *ChanWriter) Write(p []byte) (int, error) {
} }
func run(work *WorkRequest) error { func run(work *WorkRequest) error {
work.Status = Running
logger.Info.Printf("Work %s#%d started...\n", work.Name, work.ID) logger.Info.Printf("Work %s#%d started...\n", work.Name, work.ID)
logger.Debug.Printf("Work %s#%d script: %s\n", work.Name, work.ID, work.Script) logger.Debug.Printf("Work %s#%d script: %s\n", work.Name, work.ID, work.Script)
logger.Debug.Printf("Work %s#%d parameter: %v\n", work.Name, work.ID, work.Args) logger.Debug.Printf("Work %s#%d parameter: %v\n", work.Name, work.ID, work.Args)
binary, err := exec.LookPath(work.Script) binary, err := exec.LookPath(work.Script)
if err != nil { if err != nil {
return err return work.Terminate(err)
} }
// Exec script with args... // Exec script with args...
@ -42,7 +43,7 @@ func run(work *WorkRequest) error {
// Open the log file for writing // Open the log file for writing
logFile, err := createLogFile(work) logFile, err := createLogFile(work)
if err != nil { if err != nil {
return err return work.Terminate(err)
} }
defer logFile.Close() defer logFile.Close()
logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFile.Name()) logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFile.Name())
@ -53,18 +54,18 @@ func run(work *WorkRequest) error {
// Combine cmd stdout and stderr // Combine cmd stdout and stderr
outReader, err := cmd.StdoutPipe() outReader, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return err return work.Terminate(err)
} }
errReader, err := cmd.StderrPipe() errReader, err := cmd.StderrPipe()
if err != nil { if err != nil {
return err return work.Terminate(err)
} }
cmdReader := io.MultiReader(outReader, errReader) cmdReader := io.MultiReader(outReader, errReader)
// Start the script... // Start the script...
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
return err return work.Terminate(err)
} }
// Create wait group to wait for command output completion // Create wait group to wait for command output completion
@ -111,12 +112,5 @@ func run(work *WorkRequest) error {
timer.Stop() timer.Stop()
// Mark work as terminated // Mark work as terminated
work.Terminate() return work.Terminate(err)
if err != nil {
logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID)
return err
}
logger.Info.Printf("Work %s#%d done [SUCCESS]\n", work.Name, work.ID)
return nil
} }

View File

@ -33,6 +33,7 @@ func TestWorkRunner(t *testing.T) {
printWorkMessages(work) printWorkMessages(work)
err := run(work) err := run(work)
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, work.Status, Success, "")
// Test that log file is ok // Test that log file is ok
id := strconv.FormatUint(work.ID, 10) id := strconv.FormatUint(work.ID, 10)
@ -50,6 +51,7 @@ func TestWorkRunnerWithError(t *testing.T) {
printWorkMessages(work) printWorkMessages(work)
err := run(work) err := run(work)
assert.NotNil(t, err, "") assert.NotNil(t, err, "")
assert.Equal(t, work.Status, Error, "")
assert.Equal(t, "exit status 1", err.Error(), "") assert.Equal(t, "exit status 1", err.Error(), "")
} }
@ -61,5 +63,6 @@ func TestWorkRunnerWithTimeout(t *testing.T) {
printWorkMessages(work) printWorkMessages(work)
err := run(work) err := run(work)
assert.NotNil(t, err, "") assert.NotNil(t, err, "")
assert.Equal(t, work.Status, Error, "")
assert.Equal(t, "signal: killed", err.Error(), "") assert.Equal(t, "signal: killed", err.Error(), "")
} }

View File

@ -44,11 +44,9 @@ func (w Worker) Start() {
logger.Debug.Printf("Worker #%d received work request: %s#%d\n", w.ID, work.Name, work.ID) logger.Debug.Printf("Worker #%d received work request: %s#%d\n", w.ID, work.Name, work.ID)
err := run(&work) err := run(&work)
if err != nil { if err != nil {
// subject := fmt.Sprintf("Webhook %s#%d FAILED.", work.Name, work.ID)
work.MessageChan <- []byte(fmt.Sprintf("error: %s", err.Error())) work.MessageChan <- []byte(fmt.Sprintf("error: %s", err.Error()))
// notify(subject, err.Error(), filename) // notify(subject, err.Error(), filename)
} else { } else {
// subject := fmt.Sprintf("Webhook %s#%d SUCCEEDED.", work.Name, workID)
work.MessageChan <- []byte("done") work.MessageChan <- []byte("done")
// notify(subject, "See attachment.", filename) // notify(subject, "See attachment.", filename)
} }