diff --git a/pkg/worker/work_runner.go b/pkg/worker/work_runner.go index af5e21c..59b4598 100644 --- a/pkg/worker/work_runner.go +++ b/pkg/worker/work_runner.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path" + "sync" "syscall" "time" @@ -59,10 +60,18 @@ func run(work *WorkRequest) (string, error) { logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFilename) wLogFile := bufio.NewWriter(logFile) + defer wLogFile.Flush() - r, w := io.Pipe() - cmd.Stdout = w - cmd.Stderr = w + // Combine cmd stdout and stderr + outReader, err := cmd.StdoutPipe() + if err != nil { + return logFilename, err + } + errReader, err := cmd.StderrPipe() + if err != nil { + return logFilename, err + } + cmdReader := io.MultiReader(outReader, errReader) // Start the script... err = cmd.Start() @@ -70,7 +79,11 @@ func run(work *WorkRequest) (string, error) { return logFilename, err } - // Write script output to log file and the work message channel. + // Create wait group to wait for command output completion + var wg sync.WaitGroup + wg.Add(1) + + // Write script output to log file and the work message channel go func(reader io.Reader) { scanner := bufio.NewScanner(reader) for scanner.Scan() { @@ -91,18 +104,27 @@ func run(work *WorkRequest) (string, error) { if err := scanner.Err(); err != nil { logger.Error.Printf("Work %s#%d unable to read script stdout: %v\n", work.Name, work.ID, err) } - if err = wLogFile.Flush(); err != nil { - logger.Error.Println("Error while flushing the log file:", logFilename, err) - } - }(r) + wg.Done() + }(cmdReader) + // Start timeout timer timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() { logger.Warning.Printf("Work %s#%d has timed out (%ds). Killing process #%d...\n", work.Name, work.ID, work.Timeout, cmd.Process.Pid) syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) }) + + // Wait for command output completion + wg.Wait() + + // Wait for command completion err = cmd.Wait() + + // Stop timeout timer timer.Stop() + + // Mark work as terminated work.Terminate() + if err != nil { logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID) return logFilename, err