diff --git a/pkg/worker/script_runner.go b/pkg/worker/script_runner.go index 91c220a..a11105e 100644 --- a/pkg/worker/script_runner.go +++ b/pkg/worker/script_runner.go @@ -76,14 +76,16 @@ func runScript(work *WorkRequest) (string, error) { for scanner.Scan() { line := scanner.Text() // writing to the work channel - if !work.Closed { + if !work.IsTerminated() { work.MessageChan <- []byte(line) } else { - logger.Error.Printf("Work %s#%d is closed. Unable to write into the work channel: %s\n", work.Name, work.ID, line) + logger.Error.Printf("Work %s#%d is over. Unable to write more data into the channel: %s\n", work.Name, work.ID, line) + break } // writing to outfile if _, err := wLogFile.WriteString(line + "\n"); err != nil { logger.Error.Println("Error while writing into the log file:", logFilename, err) + break } } if err := scanner.Err(); err != nil { @@ -100,6 +102,7 @@ func runScript(work *WorkRequest) (string, error) { }) err = cmd.Wait() timer.Stop() + work.Terminate() if err != nil { logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID) return logFilename, err diff --git a/pkg/worker/work_request.go b/pkg/worker/work_request.go index 736adbd..274c2df 100644 --- a/pkg/worker/work_request.go +++ b/pkg/worker/work_request.go @@ -1,6 +1,9 @@ package worker -import "sync/atomic" +import ( + "sync" + "sync/atomic" +) var workID uint64 @@ -13,7 +16,8 @@ type WorkRequest struct { Args []string MessageChan chan []byte Timeout int - Closed bool + Terminated bool + mutex sync.Mutex } // NewWorkRequest creats new work request @@ -26,6 +30,22 @@ func NewWorkRequest(name, script, payload string, args []string, timeout int) *W Args: args, Timeout: timeout, MessageChan: make(chan []byte), - Closed: false, + Terminated: false, } } + +// Terminate set work request as terminated +func (wr *WorkRequest) Terminate() { + wr.mutex.Lock() + defer wr.mutex.Unlock() + if !wr.Terminated { + wr.Terminated = true + } +} + +// IsTerminated ask if the work request is terminated +func (wr *WorkRequest) IsTerminated() bool { + wr.mutex.Lock() + defer wr.mutex.Unlock() + return wr.Terminated +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6a6a38e..b4f3b0a 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -52,7 +52,6 @@ func (w Worker) Start() { work.MessageChan <- []byte("done") notify(subject, "See attachment.", filename) } - work.Closed = true close(work.MessageChan) case <-w.QuitChan: logger.Debug.Printf("Stopping worker #%d...\n", w.ID) diff --git a/tests/test.sh b/tests/test.sh index 2b3afce..2e72bf0 100755 --- a/tests/test.sh +++ b/tests/test.sh @@ -1,6 +1,6 @@ #!/bin/sh -URL=http://localhost:8081 +URL=http://localhost:8080 echo "Test URL: $URL"