feat(): add webhook timeout

This commit is contained in:
Nicolas Carlier 2018-01-09 09:18:16 +00:00
parent f50091131b
commit 7154828eca
4 changed files with 57 additions and 16 deletions

View File

@ -126,7 +126,22 @@ data: Script parameter: {"foo": "bar"}
data: done data: done
``` ```
### Notifications ### Webhook timeout configuration
By default a webhook as a timeout of 10 seconds.
This timeout is globally configurable by setting the environment variable:
`APP_HOOK_TIMEOUT` (in seconds).
You can override this global behavior per request by setting the HTTP header:
`X-Hook-Timeout` (in seconds).
*Example:*
```bash
$ curl -XPOST -H "X-Hook-Timeout: 5" http://localhost/echo?foo=bar
```
### Post hook notifications
The script's output is collected and stored into a log file (configured by the The script's output is collected and stored into a log file (configured by the
`APP_WORKING_DIR` environment variable). `APP_WORKING_DIR` environment variable).

View File

@ -3,17 +3,35 @@ package api
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
"os"
"strconv"
"strings" "strings"
"github.com/ncarlier/webhookd/pkg/hook" "github.com/ncarlier/webhookd/pkg/hook"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/tools" "github.com/ncarlier/webhookd/pkg/tools"
"github.com/ncarlier/webhookd/pkg/worker" "github.com/ncarlier/webhookd/pkg/worker"
) )
// WebhookHandler is the main handler of the API. var (
func WebhookHandler(w http.ResponseWriter, r *http.Request) { defaultTimeout = atoiFallback(os.Getenv("APP_HOOK_TIMEOUT"), 10)
)
func atoiFallback(str string, fallback int) int {
value, err := strconv.Atoi(str)
if err != nil || value < 0 {
return fallback
}
return value
}
// Index is the main handler of the API.
func Index() http.Handler {
return http.HandlerFunc(webhookHandler)
}
func webhookHandler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher) flusher, ok := w.(http.Flusher)
if !ok { if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
@ -29,20 +47,20 @@ func WebhookHandler(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/") p := strings.TrimPrefix(r.URL.Path, "/")
script, err := hook.ResolveScript(p) script, err := hook.ResolveScript(p)
if err != nil { if err != nil {
log.Println(err.Error()) logger.Error.Println(err.Error())
http.Error(w, err.Error(), http.StatusNotFound) http.Error(w, err.Error(), http.StatusNotFound)
return return
} }
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
log.Printf("Error reading body: %v", err) logger.Error.Printf("Error reading body: %v", err)
http.Error(w, "can't read body", http.StatusBadRequest) http.Error(w, "can't read body", http.StatusBadRequest)
return return
} }
params := tools.QueryParamsToShellVars(r.URL.Query()) params := tools.QueryParamsToShellVars(r.URL.Query())
log.Printf("Calling hook script \"%s\" with params %s...\n", script, params) logger.Debug.Printf("Calling hook script \"%s\" with params %s...\n", script, params)
params = append(params, tools.HTTPHeadersToShellVars(r.Header)...) params = append(params, tools.HTTPHeadersToShellVars(r.Header)...)
// Create work // Create work
@ -52,6 +70,7 @@ func WebhookHandler(w http.ResponseWriter, r *http.Request) {
work.Payload = string(body) work.Payload = string(body)
work.Args = params work.Args = params
work.MessageChan = make(chan []byte) work.MessageChan = make(chan []byte)
work.Timeout = atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout)
// Put work in queue // Put work in queue
worker.WorkQueue <- *work worker.WorkQueue <- *work
@ -61,7 +80,7 @@ func WebhookHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
log.Println("Work request queued:", script) logger.Debug.Println("Work request queued:", script)
fmt.Fprintf(w, "data: Hook work request \"%s\" queued...\n\n", work.Name) fmt.Fprintf(w, "data: Hook work request \"%s\" queued...\n\n", work.Name)
for { for {

View File

@ -4,12 +4,12 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"io" "io"
"log"
"os" "os"
"os/exec" "os/exec"
"path" "path"
"time" "time"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/tools" "github.com/ncarlier/webhookd/pkg/tools"
) )
@ -32,7 +32,7 @@ func runScript(work *WorkRequest) (string, error) {
workingdir = os.TempDir() workingdir = os.TempDir()
} }
log.Println("Starting script:", work.Script, "...") logger.Info.Println("Executing script", work.Script, "...")
binary, err := exec.LookPath(work.Script) binary, err := exec.LookPath(work.Script)
if err != nil { if err != nil {
return "", err return "", err
@ -50,7 +50,7 @@ func runScript(work *WorkRequest) (string, error) {
return "", err return "", err
} }
defer logFile.Close() defer logFile.Close()
log.Println("Writing output to file: ", logFilename, "...") logger.Debug.Println("Writing output to file: ", logFilename, "...")
wLogFile := bufio.NewWriter(logFile) wLogFile := bufio.NewWriter(logFile)
@ -73,22 +73,28 @@ func runScript(work *WorkRequest) (string, error) {
work.MessageChan <- []byte(line) work.MessageChan <- []byte(line)
// writing to outfile // writing to outfile
if _, err := wLogFile.WriteString(line + "\n"); err != nil { if _, err := wLogFile.WriteString(line + "\n"); err != nil {
log.Println("Error while writing into the log file:", logFilename, err) logger.Error.Println("Error while writing into the log file:", logFilename, err)
} }
if err = wLogFile.Flush(); err != nil { if err = wLogFile.Flush(); err != nil {
log.Println("Error while flushing the log file:", logFilename, err) logger.Error.Println("Error while flushing the log file:", logFilename, err)
} }
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
log.Println("Error scanning the script stdout: ", logFilename, err) logger.Error.Println("Error scanning the script stdout: ", logFilename, err)
} }
}(r) }(r)
timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() {
logger.Warning.Printf("Timeout reached (%ds). Killing script: %s\n", work.Timeout, work.Script)
cmd.Process.Kill()
})
err = cmd.Wait() err = cmd.Wait()
if err != nil { if err != nil {
log.Println("Starting script:", work.Script, "-> ERROR") timer.Stop()
logger.Info.Println("Script", work.Script, "executed with ERROR.")
return logFilename, err return logFilename, err
} }
log.Println("Starting script:", work.Script, "-> OK") timer.Stop()
logger.Info.Println("Script", work.Script, "executed wit SUCCESS")
return logFilename, nil return logFilename, nil
} }

View File

@ -7,4 +7,5 @@ type WorkRequest struct {
Payload string Payload string
Args []string Args []string
MessageChan chan []byte MessageChan chan []byte
Timeout int
} }