diff --git a/pkg/api/api.go b/pkg/api/api.go index 8eb2e68..398db89 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -34,7 +34,7 @@ func Index(timeout int, scrDir string) http.Handler { func webhookHandler(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { - http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + http.Error(w, "Streaming not supported!", http.StatusInternalServerError) return } @@ -60,9 +60,10 @@ func webhookHandler(w http.ResponseWriter, r *http.Request) { } params := tools.QueryParamsToShellVars(r.URL.Query()) - logger.Debug.Printf("Calling hook script \"%s\" with params %s...\n", script, params) params = append(params, tools.HTTPHeadersToShellVars(r.Header)...) + // logger.Debug.Printf("API REQUEST: \"%s\" with params %s...\n", p, params) + // Create work timeout := atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout) work := worker.NewWorkRequest(p, script, string(body), params, timeout) @@ -75,9 +76,6 @@ func webhookHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") - logger.Debug.Println("Work request queued:", script) - // fmt.Fprintf(w, "data: Running \"%s\" ...\n\n", work.Name) - for { msg, open := <-work.MessageChan diff --git a/pkg/tools/script_resolver.go b/pkg/tools/script_resolver.go index d73a83a..0b1c3d7 100644 --- a/pkg/tools/script_resolver.go +++ b/pkg/tools/script_resolver.go @@ -5,14 +5,11 @@ import ( "fmt" "os" "path" - - "github.com/ncarlier/webhookd/pkg/logger" ) // ResolveScript is resolving the target script. func ResolveScript(dir, name string) (string, error) { script := path.Join(dir, fmt.Sprintf("%s.sh", name)) - logger.Debug.Println("Resolving script: ", script, "...") if _, err := os.Stat(script); os.IsNotExist(err) { return "", errors.New("Script not found: " + script) } diff --git a/pkg/worker/dispatcher.go b/pkg/worker/dispatcher.go index 206abbf..b7e23ed 100644 --- a/pkg/worker/dispatcher.go +++ b/pkg/worker/dispatcher.go @@ -23,11 +23,10 @@ func StartDispatcher(nworkers int) { for { select { case work := <-WorkQueue: - logger.Debug.Println("Received work request:", work.Name) go func() { worker := <-WorkerQueue - logger.Debug.Println("Dispatching work request:", work.Name) + logger.Debug.Printf("Dispatching work request: %s#%d", work.Name, work.ID) worker <- work }() } diff --git a/pkg/worker/script_runner.go b/pkg/worker/script_runner.go index 78cc08d..91c220a 100644 --- a/pkg/worker/script_runner.go +++ b/pkg/worker/script_runner.go @@ -33,7 +33,10 @@ func runScript(work *WorkRequest) (string, error) { workingdir = os.TempDir() } - logger.Info.Println("Executing script", work.Script, "...") + logger.Info.Printf("Work %s#%d started...\n", work.Name, work.ID) + logger.Debug.Printf("Work %s#%d script: %s\n", work.Name, work.ID, work.Script) + logger.Debug.Printf("Work %s#%d parameter: %v\n", work.Name, work.ID, work.Args) + binary, err := exec.LookPath(work.Script) if err != nil { return "", err @@ -47,13 +50,13 @@ func runScript(work *WorkRequest) (string, error) { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // Open the out file for writing - logFilename := path.Join(workingdir, fmt.Sprintf("%s_%s.txt", tools.ToSnakeCase(work.Name), time.Now().Format("20060102_1504"))) + logFilename := path.Join(workingdir, fmt.Sprintf("%s_%d_%s.txt", tools.ToSnakeCase(work.Name), work.ID, time.Now().Format("20060102_1504"))) logFile, err := os.Create(logFilename) if err != nil { return "", err } defer logFile.Close() - logger.Debug.Println("Writing output to file: ", logFilename, "...") + logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFilename) wLogFile := bufio.NewWriter(logFile) @@ -71,37 +74,36 @@ func runScript(work *WorkRequest) (string, error) { go func(reader io.Reader) { scanner := bufio.NewScanner(reader) for scanner.Scan() { - if work.Closed { - logger.Error.Println("Unable to write into the work channel. Work request closed.") - return - } - // writing to the work channel line := scanner.Text() - work.MessageChan <- []byte(line) + // writing to the work channel + if !work.Closed { + work.MessageChan <- []byte(line) + } else { + logger.Error.Printf("Work %s#%d is closed. Unable to write into the work channel: %s\n", work.Name, work.ID, line) + } // writing to outfile if _, err := wLogFile.WriteString(line + "\n"); err != nil { logger.Error.Println("Error while writing into the log file:", logFilename, err) } - if err = wLogFile.Flush(); err != nil { - logger.Error.Println("Error while flushing the log file:", logFilename, err) - } } if err := scanner.Err(); err != nil { - logger.Error.Println("Error scanning the script stdout: ", logFilename, err) + logger.Error.Printf("Work %s#%d unable to read script stdout: %v\n", work.Name, work.ID, err) + } + if err = wLogFile.Flush(); err != nil { + logger.Error.Println("Error while flushing the log file:", logFilename, err) } }(r) timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() { - logger.Warning.Printf("Timeout reached (%ds). Killing script: %s\n", work.Timeout, work.Script) + logger.Warning.Printf("Work %s#%d has timed out (%ds). Killing process #%d...\n", work.Name, work.ID, work.Timeout, cmd.Process.Pid) syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) }) err = cmd.Wait() + timer.Stop() if err != nil { - timer.Stop() - logger.Info.Println("Script", work.Script, "executed with ERROR.") + logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID) return logFilename, err } - timer.Stop() - logger.Info.Println("Script", work.Script, "executed with SUCCESS") + logger.Info.Printf("Work %s#%d done [SUCCESS]\n", work.Name, work.ID) return logFilename, nil } diff --git a/pkg/worker/work_request.go b/pkg/worker/work_request.go index 2b13696..736adbd 100644 --- a/pkg/worker/work_request.go +++ b/pkg/worker/work_request.go @@ -1,7 +1,12 @@ package worker +import "sync/atomic" + +var workID uint64 + // WorkRequest is a request of work for a worker type WorkRequest struct { + ID uint64 Name string Script string Payload string @@ -14,6 +19,7 @@ type WorkRequest struct { // NewWorkRequest creats new work request func NewWorkRequest(name, script, payload string, args []string, timeout int) *WorkRequest { return &WorkRequest{ + ID: atomic.AddUint64(&workID, 1), Name: name, Script: script, Payload: payload, diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index b27e15b..6a6a38e 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -41,21 +41,21 @@ func (w Worker) Start() { select { case work := <-w.Work: // Receive a work request. - logger.Debug.Printf("Worker%d received work request: %s\n", w.ID, work.Name) + logger.Debug.Printf("Worker #%d received work request: %s#%d\n", w.ID, work.Name, work.ID) filename, err := runScript(&work) if err != nil { - subject := fmt.Sprintf("Webhook %s FAILED.", work.Name) + subject := fmt.Sprintf("Webhook %s#%d FAILED.", work.Name, work.ID) work.MessageChan <- []byte(fmt.Sprintf("error: %s", err.Error())) notify(subject, err.Error(), filename) } else { - subject := fmt.Sprintf("Webhook %s SUCCEEDED.", work.Name) + subject := fmt.Sprintf("Webhook %s#%d SUCCEEDED.", work.Name, workID) work.MessageChan <- []byte("done") notify(subject, "See attachment.", filename) } work.Closed = true close(work.MessageChan) case <-w.QuitChan: - logger.Debug.Printf("Stopping worker%d...\n", w.ID) + logger.Debug.Printf("Stopping worker #%d...\n", w.ID) return } } @@ -77,7 +77,7 @@ func notify(subject string, text string, outfilename string) { return } if notifier == nil { - logger.Error.Println("Notification provider not found. Notification skipped.") + logger.Debug.Println("Notification provider not found. Notification skipped.") return } diff --git a/tests/test.sh b/tests/test.sh index c0dba4e..2b3afce 100755 --- a/tests/test.sh +++ b/tests/test.sh @@ -1,15 +1,25 @@ #!/bin/sh -URL=http://localhost:8080 +URL=http://localhost:8081 echo "Test URL: $URL" -echo "Test bad URL" + +echo "Testing bad request..." curl -H "Content-Type: application/json" \ --data @test.json \ $URL/bad/action -echo "Test hook" +echo "Testing nominal case..." curl -H "Content-Type: application/json" \ -H "X-API-Key: test" \ --data @test.json \ $URL/test?firstname=obi-wan\&lastname=kenobi + +echo "Testing parallel request..." +curl -XPOST $URL/test & +curl -XPOST $URL/test & +curl -XPOST $URL/test & + +wait + +echo "Done"