mirror of
https://github.com/ncarlier/webhookd.git
synced 2025-04-04 19:52:46 +00:00
Feature: add blocking request with response code (#99)
feat(hook): add blocking request with response code close #74
This commit is contained in:
parent
8e6cf467dd
commit
321ad7ef5d
78
README.md
78
README.md
|
@ -83,24 +83,37 @@ For example, you can execute a Node.js file if you give execution rights to the
|
|||
You can find sample scripts in the [example folder](./scripts/examples).
|
||||
In particular, examples of integration with Gitlab and Github.
|
||||
|
||||
### Webhook URL
|
||||
### Webhook call
|
||||
|
||||
The directory structure define the webhook URL.
|
||||
|
||||
You can omit the script extension. If you do, webhookd will search by default for a `.sh` file.
|
||||
You can change the default extension using the `WHD_HOOK_DEFAULT_EXT` environment variable or `-hook-default-ext` parameter.
|
||||
If the script exists, the output will be streamed to the HTTP response.
|
||||
If the script exists, the output will be send to the HTTP response.
|
||||
|
||||
The streaming technology depends on the HTTP request:
|
||||
Depending on the HTTP request, the HTTP response will be a HTTP `200` code with the script's output in real time (streaming), or the HTTP response will wait until the end of the script's execution and return the output (tuncated) of the script as well as an HTTP code relative to the script's output code.
|
||||
|
||||
- [Server-sent events][sse] is used when:
|
||||
- Using `GET` verb
|
||||
- Using `text/event-stream` in `Accept` request header
|
||||
- [Chunked Transfer Coding][chunked] is used otherwise.
|
||||
The streaming protocol depends on the HTTP request:
|
||||
|
||||
- [Server-sent events][sse] is used when `Accept` HTTP header is equal to `text/event-stream`.
|
||||
- [Chunked Transfer Coding][chunked] is used when `X-Hook-Mode` HTTP header is equal to `chunked`.
|
||||
It's the default mode.
|
||||
You can change the default mode using the `WHD_HOOK_DEFAULT_MODE` environment variable or `-hook-default-mode` parameter.
|
||||
|
||||
[sse]: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
|
||||
[chunked]: https://datatracker.ietf.org/doc/html/rfc2616#section-3.6.1
|
||||
|
||||
If no streaming protocol is needed, yous must set `X-Hook-Mode` HTTP header to `buffered`.
|
||||
The HTTP reponse will block until the script is over:
|
||||
|
||||
- Sends script output limited to the last 100 lines. You can modify this limit via the HTTP header `X-Hook-MaxBufferedLines`.
|
||||
- Convert the script exit code to HTTP code as follow:
|
||||
- 0: `200 OK`
|
||||
- Between 1 and 99: `500 Internal Server Error`
|
||||
- Between 100 and 255: Add 300 to get HTTP code between 400 and 555
|
||||
|
||||
> Remember: a process exit code is between 0 and 255. 0 means that the execution is successful.
|
||||
|
||||
*Example:*
|
||||
|
||||
The script: `./scripts/foo/bar.sh`
|
||||
|
@ -110,21 +123,11 @@ The script: `./scripts/foo/bar.sh`
|
|||
|
||||
echo "foo foo foo"
|
||||
echo "bar bar bar"
|
||||
|
||||
exit 118
|
||||
```
|
||||
|
||||
Output using `POST` or `GET` (`Chunked Transfer Coding`):
|
||||
|
||||
```bash
|
||||
$ curl -v -XPOST http://localhost:8080/foo/bar
|
||||
< HTTP/1.1 200 OK
|
||||
< Content-Type: text/plain; charset=utf-8
|
||||
< Transfer-Encoding: chunked
|
||||
< X-Hook-Id: 7
|
||||
foo foo foo
|
||||
bar bar bar
|
||||
```
|
||||
|
||||
Output using `GET` and `Accept` header (`Server-sent events`):
|
||||
Streamed output using `Server-sent events`:
|
||||
|
||||
```bash
|
||||
$ curl -v --header "Accept: text/event-stream" -XGET http://localhost:8080/foo/bar
|
||||
|
@ -132,11 +135,44 @@ $ curl -v --header "Accept: text/event-stream" -XGET http://localhost:8080/foo/b
|
|||
< Content-Type: text/event-stream
|
||||
< Transfer-Encoding: chunked
|
||||
< X-Hook-Id: 8
|
||||
|
||||
data: foo foo foo
|
||||
|
||||
data: bar bar bar
|
||||
|
||||
error: exit status 118
|
||||
```
|
||||
|
||||
Streamed output using `Chunked Transfer Coding`:
|
||||
|
||||
```bash
|
||||
$ curl -v -XPOST --header "X-Hook-Mode: chunked" http://localhost:8080/foo/bar
|
||||
< HTTP/1.1 200 OK
|
||||
< Content-Type: text/plain; charset=utf-8
|
||||
< Transfer-Encoding: chunked
|
||||
< X-Hook-Id: 7
|
||||
|
||||
foo foo foo
|
||||
bar bar bar
|
||||
error: exit status 118
|
||||
|
||||
```
|
||||
|
||||
Blocking HTTP request:
|
||||
|
||||
```bash
|
||||
$ curl -v -XPOST --header "X-Hook-Mode: buffered" http://localhost:8080/foo/bar
|
||||
< HTTP/1.1 418 I m a teapot
|
||||
< Content-Type: text/plain; charset=utf-8
|
||||
< X-Hook-Id: 9
|
||||
|
||||
foo foo foo
|
||||
bar bar bar
|
||||
error: exit status 118
|
||||
```
|
||||
|
||||
> Note that in this last example the HTTP response is equal to `exit code + 300` : `318 I'm a teapot`.
|
||||
|
||||
### Webhook parameters
|
||||
|
||||
You have several ways to provide parameters to your webhook script:
|
||||
|
@ -211,7 +247,7 @@ $ # Call webhook
|
|||
$ curl -v http://localhost:8080/echo?foo=bar
|
||||
...
|
||||
< HTTP/1.1 200 OK
|
||||
< Content-Type: text/event-stream
|
||||
< Content-Type: text/plain
|
||||
< X-Hook-Id: 2
|
||||
...
|
||||
$ # Retrieve logs afterwards
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
# Default extension for hook scripts, default is "sh"
|
||||
#WHD_HOOK_DEFAULT_EXT=sh
|
||||
# Default hook HTTP response mode (chunked or buffered), default is "chunked"
|
||||
#WHD_HOOK_DEFAULT_MODE=chunked
|
||||
# Maximum hook execution time in second, default is 10
|
||||
#WHD_HOOK_TIMEOUT=10
|
||||
# Scripts location, default is "scripts"
|
||||
|
|
122
pkg/api/index.go
122
pkg/api/index.go
|
@ -1,6 +1,8 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/ring"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
|
@ -20,11 +22,18 @@ import (
|
|||
var (
|
||||
defaultTimeout int
|
||||
defaultExt string
|
||||
defaultMode string
|
||||
scriptDir string
|
||||
outputDir string
|
||||
)
|
||||
|
||||
var supportedContentTypes = []string{"text/plain", "text/event-stream", "application/json", "text/*"}
|
||||
const (
|
||||
DefaultBufferLength = 100
|
||||
MaxBufferLength = 10000
|
||||
SSEContentType = "text/event-stream"
|
||||
)
|
||||
|
||||
var supportedContentTypes = []string{"text/plain", SSEContentType, "application/json", "text/*"}
|
||||
|
||||
func atoiFallback(str string, fallback int) int {
|
||||
if value, err := strconv.Atoi(str); err == nil && value > 0 {
|
||||
|
@ -39,6 +48,7 @@ func index(conf *config.Config) http.Handler {
|
|||
defaultExt = conf.Hook.DefaultExt
|
||||
scriptDir = conf.Hook.ScriptsDir
|
||||
outputDir = conf.Hook.LogDir
|
||||
defaultMode = conf.Hook.DefaultMode
|
||||
return http.HandlerFunc(webhookHandler)
|
||||
}
|
||||
|
||||
|
@ -53,9 +63,20 @@ func webhookHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func triggerWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
// Manage content negotiation
|
||||
negociatedContentType := helper.NegotiateContentType(r, supportedContentTypes, "text/plain")
|
||||
|
||||
// Extract streaming method
|
||||
mode := r.Header.Get("X-Hook-Mode")
|
||||
if mode != "buffered" && mode != "chunked" {
|
||||
mode = defaultMode
|
||||
}
|
||||
if negociatedContentType == SSEContentType {
|
||||
mode = "sse"
|
||||
}
|
||||
|
||||
// Check that streaming is supported
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
if _, ok := w.(http.Flusher); !ok && mode != "buffered" {
|
||||
http.Error(w, "streaming not supported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
@ -121,31 +142,98 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) {
|
|||
// Put work in queue
|
||||
worker.WorkQueue <- job
|
||||
|
||||
// Use content negotiation
|
||||
ct = helper.NegotiateContentType(r, supportedContentTypes, "text/plain")
|
||||
|
||||
// set respons headers
|
||||
w.Header().Set("Content-Type", ct+"; charset=utf-8")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
w.Header().Set("X-Hook-ID", strconv.FormatUint(job.ID(), 10))
|
||||
// Write hook ouput to the response regarding the asked method
|
||||
if mode != "buffered" {
|
||||
// Write hook response as Server Sent Event stream
|
||||
writeStreamedResponse(w, negociatedContentType, job, mode)
|
||||
} else {
|
||||
maxBufferLength := atoiFallback(r.Header.Get("X-Hook-MaxBufferedLines"), DefaultBufferLength)
|
||||
if maxBufferLength > MaxBufferLength {
|
||||
maxBufferLength = MaxBufferLength
|
||||
}
|
||||
// Write hook response after hook execution
|
||||
writeStandardResponse(w, negociatedContentType, job, maxBufferLength)
|
||||
}
|
||||
}
|
||||
|
||||
func writeStreamedResponse(w http.ResponseWriter, negociatedContentType string, job *hook.Job, mode string) {
|
||||
writeHeaders(w, negociatedContentType, job.ID())
|
||||
for {
|
||||
msg, open := <-job.MessageChan
|
||||
if !open {
|
||||
break
|
||||
}
|
||||
if ct == "text/event-stream" {
|
||||
fmt.Fprintf(w, "data: %s\n\n", msg) // Send SSE response
|
||||
|
||||
if mode == "sse" {
|
||||
// Send SSE response
|
||||
prefix := "data: "
|
||||
if bytes.HasPrefix(msg, []byte("error:")) {
|
||||
prefix = ""
|
||||
}
|
||||
fmt.Fprintf(w, "%s%s\n\n", prefix, msg)
|
||||
} else {
|
||||
fmt.Fprintf(w, "%s\n", msg) // Send chunked response
|
||||
// Send chunked response
|
||||
fmt.Fprintf(w, "%s\n", msg)
|
||||
}
|
||||
|
||||
// Flush the data immediately instead of buffering it for later.
|
||||
flusher.Flush()
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeStandardResponse(w http.ResponseWriter, negociatedContentType string, job *hook.Job, maxBufferLength int) {
|
||||
buffer := ring.New(maxBufferLength)
|
||||
overflow := false
|
||||
lines := 0
|
||||
|
||||
// Consume messages into a ring buffer
|
||||
for {
|
||||
msg, open := <-job.MessageChan
|
||||
if !open {
|
||||
break
|
||||
}
|
||||
buffer.Value = msg
|
||||
buffer = buffer.Next()
|
||||
lines++
|
||||
if lines > maxBufferLength {
|
||||
overflow = true
|
||||
}
|
||||
}
|
||||
|
||||
writeHeaders(w, negociatedContentType, job.ID())
|
||||
w.WriteHeader(getJobStatusCode(job))
|
||||
if overflow {
|
||||
w.Write([]byte("[output truncated]\n"))
|
||||
}
|
||||
// Write buffer to HTTP response
|
||||
buffer.Do(func(data interface{}) {
|
||||
if data != nil {
|
||||
fmt.Fprintf(w, "%s\n", data.([]byte))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func getJobStatusCode(job *hook.Job) int {
|
||||
switch {
|
||||
case job.ExitCode() == 0:
|
||||
return http.StatusOK
|
||||
case job.ExitCode() >= 100:
|
||||
return job.ExitCode() + 300
|
||||
default:
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
}
|
||||
|
||||
func writeHeaders(w http.ResponseWriter, contentType string, hookId uint64) {
|
||||
w.Header().Set("Content-Type", contentType+"; charset=utf-8")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
w.Header().Set("X-Hook-ID", strconv.FormatUint(hookId, 10))
|
||||
}
|
||||
|
||||
func getWebhookLog(w http.ResponseWriter, r *http.Request) {
|
||||
// Get hook ID
|
||||
id := path.Base(r.URL.Path)
|
||||
|
@ -160,7 +248,7 @@ func getWebhookLog(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// Retrieve log file
|
||||
logFile, err := hook.Logs(id, hookName, outputDir)
|
||||
logFile, err := hook.GetLogFile(id, hookName, outputDir)
|
||||
if err != nil {
|
||||
slog.Error(err.Error())
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
|
|
@ -20,11 +20,12 @@ type Config struct {
|
|||
|
||||
// HookConfig store Hook execution configuration
|
||||
type HookConfig struct {
|
||||
DefaultExt string `flag:"default-ext" desc:"Default extension for hook scripts" default:"sh"`
|
||||
Timeout int `flag:"timeout" desc:"Maximum hook execution time in second" default:"10"`
|
||||
ScriptsDir string `flag:"scripts" desc:"Scripts location" default:"scripts"`
|
||||
LogDir string `flag:"log-dir" desc:"Hook execution logs location" default:""`
|
||||
Workers int `flag:"workers" desc:"Number of workers to start" default:"2"`
|
||||
DefaultExt string `flag:"default-ext" desc:"Default extension for hook scripts" default:"sh"`
|
||||
DefaultMode string `flag:"default-mode" desc:"Hook default response mode (chuncked,buffered)" default:"chuncked"`
|
||||
Timeout int `flag:"timeout" desc:"Maximum hook execution time in second" default:"10"`
|
||||
ScriptsDir string `flag:"scripts" desc:"Scripts location" default:"scripts"`
|
||||
LogDir string `flag:"log-dir" desc:"Hook execution logs location" default:""`
|
||||
Workers int `flag:"workers" desc:"Number of workers to start" default:"2"`
|
||||
}
|
||||
|
||||
// LogConfig store logger configuration
|
||||
|
|
|
@ -36,6 +36,7 @@ type Job struct {
|
|||
status Status
|
||||
logFilename string
|
||||
err error
|
||||
exitCode int
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -85,7 +86,11 @@ func (job *Job) Terminate(err error) error {
|
|||
job.mutex.Lock()
|
||||
defer job.mutex.Unlock()
|
||||
job.status = Success
|
||||
|
||||
if err != nil {
|
||||
if exiterr, ok := err.(*exec.ExitError); ok {
|
||||
job.exitCode = exiterr.ExitCode()
|
||||
}
|
||||
job.status = Error
|
||||
job.err = err
|
||||
slog.Error(
|
||||
|
@ -93,6 +98,7 @@ func (job *Job) Terminate(err error) error {
|
|||
"hook", job.Name(),
|
||||
"id", job.ID(),
|
||||
"status", "error",
|
||||
"exitCode", job.exitCode,
|
||||
"err", err,
|
||||
"took", time.Since(job.start).Milliseconds(),
|
||||
)
|
||||
|
@ -134,14 +140,25 @@ func (job *Job) StatusLabel() string {
|
|||
}
|
||||
}
|
||||
|
||||
// ExitCode of the underlying process job
|
||||
// Can be 0 if the process is not over
|
||||
func (job *Job) ExitCode() int {
|
||||
return job.exitCode
|
||||
}
|
||||
|
||||
// SendMessage send message to the message channel
|
||||
func (job *Job) SendMessage(message string) {
|
||||
job.MessageChan <- []byte(message)
|
||||
}
|
||||
|
||||
// OpenLogFile open job log file
|
||||
func (job *Job) OpenLogFile() (*os.File, error) {
|
||||
return os.Open(job.logFilename)
|
||||
}
|
||||
|
||||
// Logs returns job logs filtered with the prefix
|
||||
func (job *Job) Logs(prefixFilter string) string {
|
||||
file, err := os.Open(job.logFilename)
|
||||
file, err := job.OpenLogFile()
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
|
|
|
@ -9,8 +9,8 @@ import (
|
|||
"github.com/ncarlier/webhookd/pkg/helper"
|
||||
)
|
||||
|
||||
// Logs get hook log with its name and id
|
||||
func Logs(id, name, base string) (*os.File, error) {
|
||||
// GetLogFile get hook log with its name and id
|
||||
func GetLogFile(id, name, base string) (*os.File, error) {
|
||||
logPattern := path.Join(base, fmt.Sprintf("%s_%s_*.txt", helper.ToSnake(name), id))
|
||||
files, err := filepath.Glob(logPattern)
|
||||
if err != nil {
|
||||
|
|
|
@ -46,7 +46,7 @@ func TestHookJob(t *testing.T) {
|
|||
|
||||
// Test that we can retrieve log file afterward
|
||||
id := strconv.FormatUint(job.ID(), 10)
|
||||
logFile, err := hook.Logs(id, "test", os.TempDir())
|
||||
logFile, err := hook.GetLogFile(id, "test", os.TempDir())
|
||||
assert.Nil(t, err, "Log file should exists")
|
||||
defer logFile.Close()
|
||||
assert.NotNil(t, logFile, "Log file should be retrieve")
|
||||
|
@ -70,6 +70,7 @@ func TestWorkRunnerWithError(t *testing.T) {
|
|||
assert.NotNil(t, err, "")
|
||||
assert.Equal(t, job.Status(), hook.Error, "")
|
||||
assert.Equal(t, "exit status 1", err.Error(), "")
|
||||
assert.Equal(t, 1, job.ExitCode(), "")
|
||||
}
|
||||
|
||||
func TestWorkRunnerWithTimeout(t *testing.T) {
|
||||
|
|
|
@ -8,7 +8,7 @@ type Middleware func(inner http.Handler) http.Handler
|
|||
// Middlewares list
|
||||
type Middlewares []Middleware
|
||||
|
||||
// UseBefore insert a middleware at the begining of the middleware chain
|
||||
// UseBefore insert a middleware at the beginning of the middleware chain
|
||||
func (ms Middlewares) UseBefore(m Middleware) Middlewares {
|
||||
return append([]Middleware{m}, ms...)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user