fix(runner): fix concurent access to the work request channel

This commit is contained in:
Nicolas Carlier 2018-07-27 08:05:05 +00:00 committed by Nicolas Carlier
parent c3d9201e6d
commit e8d1c6e581
4 changed files with 29 additions and 7 deletions

View File

@ -76,14 +76,16 @@ func runScript(work *WorkRequest) (string, error) {
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
// writing to the work channel // writing to the work channel
if !work.Closed { if !work.IsTerminated() {
work.MessageChan <- []byte(line) work.MessageChan <- []byte(line)
} else { } else {
logger.Error.Printf("Work %s#%d is closed. Unable to write into the work channel: %s\n", work.Name, work.ID, line) logger.Error.Printf("Work %s#%d is over. Unable to write more data into the channel: %s\n", work.Name, work.ID, line)
break
} }
// writing to outfile // writing to outfile
if _, err := wLogFile.WriteString(line + "\n"); err != nil { if _, err := wLogFile.WriteString(line + "\n"); err != nil {
logger.Error.Println("Error while writing into the log file:", logFilename, err) logger.Error.Println("Error while writing into the log file:", logFilename, err)
break
} }
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
@ -100,6 +102,7 @@ func runScript(work *WorkRequest) (string, error) {
}) })
err = cmd.Wait() err = cmd.Wait()
timer.Stop() timer.Stop()
work.Terminate()
if err != nil { if err != nil {
logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID) logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID)
return logFilename, err return logFilename, err

View File

@ -1,6 +1,9 @@
package worker package worker
import "sync/atomic" import (
"sync"
"sync/atomic"
)
var workID uint64 var workID uint64
@ -13,7 +16,8 @@ type WorkRequest struct {
Args []string Args []string
MessageChan chan []byte MessageChan chan []byte
Timeout int Timeout int
Closed bool Terminated bool
mutex sync.Mutex
} }
// NewWorkRequest creats new work request // NewWorkRequest creats new work request
@ -26,6 +30,22 @@ func NewWorkRequest(name, script, payload string, args []string, timeout int) *W
Args: args, Args: args,
Timeout: timeout, Timeout: timeout,
MessageChan: make(chan []byte), MessageChan: make(chan []byte),
Closed: false, Terminated: false,
} }
} }
// Terminate set work request as terminated
func (wr *WorkRequest) Terminate() {
wr.mutex.Lock()
defer wr.mutex.Unlock()
if !wr.Terminated {
wr.Terminated = true
}
}
// IsTerminated ask if the work request is terminated
func (wr *WorkRequest) IsTerminated() bool {
wr.mutex.Lock()
defer wr.mutex.Unlock()
return wr.Terminated
}

View File

@ -52,7 +52,6 @@ func (w Worker) Start() {
work.MessageChan <- []byte("done") work.MessageChan <- []byte("done")
notify(subject, "See attachment.", filename) notify(subject, "See attachment.", filename)
} }
work.Closed = true
close(work.MessageChan) close(work.MessageChan)
case <-w.QuitChan: case <-w.QuitChan:
logger.Debug.Printf("Stopping worker #%d...\n", w.ID) logger.Debug.Printf("Stopping worker #%d...\n", w.ID)

View File

@ -1,6 +1,6 @@
#!/bin/sh #!/bin/sh
URL=http://localhost:8081 URL=http://localhost:8080
echo "Test URL: $URL" echo "Test URL: $URL"