mirror of
https://github.com/ncarlier/webhookd.git
synced 2025-04-07 19:29:19 +00:00
fix(runner): fix concurrency and log file creation
This commit is contained in:
parent
9c1d59c56d
commit
c5e393eb92
|
@ -7,6 +7,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -59,10 +60,18 @@ func run(work *WorkRequest) (string, error) {
|
||||||
logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFilename)
|
logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFilename)
|
||||||
|
|
||||||
wLogFile := bufio.NewWriter(logFile)
|
wLogFile := bufio.NewWriter(logFile)
|
||||||
|
defer wLogFile.Flush()
|
||||||
|
|
||||||
r, w := io.Pipe()
|
// Combine cmd stdout and stderr
|
||||||
cmd.Stdout = w
|
outReader, err := cmd.StdoutPipe()
|
||||||
cmd.Stderr = w
|
if err != nil {
|
||||||
|
return logFilename, err
|
||||||
|
}
|
||||||
|
errReader, err := cmd.StderrPipe()
|
||||||
|
if err != nil {
|
||||||
|
return logFilename, err
|
||||||
|
}
|
||||||
|
cmdReader := io.MultiReader(outReader, errReader)
|
||||||
|
|
||||||
// Start the script...
|
// Start the script...
|
||||||
err = cmd.Start()
|
err = cmd.Start()
|
||||||
|
@ -70,7 +79,11 @@ func run(work *WorkRequest) (string, error) {
|
||||||
return logFilename, err
|
return logFilename, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write script output to log file and the work message channel.
|
// Create wait group to wait for command output completion
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
// Write script output to log file and the work message channel
|
||||||
go func(reader io.Reader) {
|
go func(reader io.Reader) {
|
||||||
scanner := bufio.NewScanner(reader)
|
scanner := bufio.NewScanner(reader)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
|
@ -91,18 +104,27 @@ func run(work *WorkRequest) (string, error) {
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
logger.Error.Printf("Work %s#%d unable to read script stdout: %v\n", work.Name, work.ID, err)
|
logger.Error.Printf("Work %s#%d unable to read script stdout: %v\n", work.Name, work.ID, err)
|
||||||
}
|
}
|
||||||
if err = wLogFile.Flush(); err != nil {
|
wg.Done()
|
||||||
logger.Error.Println("Error while flushing the log file:", logFilename, err)
|
}(cmdReader)
|
||||||
}
|
|
||||||
}(r)
|
|
||||||
|
|
||||||
|
// Start timeout timer
|
||||||
timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() {
|
timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() {
|
||||||
logger.Warning.Printf("Work %s#%d has timed out (%ds). Killing process #%d...\n", work.Name, work.ID, work.Timeout, cmd.Process.Pid)
|
logger.Warning.Printf("Work %s#%d has timed out (%ds). Killing process #%d...\n", work.Name, work.ID, work.Timeout, cmd.Process.Pid)
|
||||||
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Wait for command output completion
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Wait for command completion
|
||||||
err = cmd.Wait()
|
err = cmd.Wait()
|
||||||
|
|
||||||
|
// Stop timeout timer
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
|
|
||||||
|
// Mark work as terminated
|
||||||
work.Terminate()
|
work.Terminate()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID)
|
logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID)
|
||||||
return logFilename, err
|
return logFilename, err
|
||||||
|
|
Loading…
Reference in New Issue
Block a user