refactor(): internals rescafolding

This commit is contained in:
Nicolas Carlier 2022-05-26 09:05:49 +02:00
parent bc1de4e5da
commit 5e65febceb
27 changed files with 524 additions and 395 deletions

View File

@ -158,7 +158,7 @@ The script:
```bash ```bash
#!/bin/bash #!/bin/bash
echo "Hook information: name=$hook_name, id=$hook_id" echo "Hook information: name=$hook_name, id=$hook_id, method=$hook_method"
echo "Query parameter: foo=$foo" echo "Query parameter: foo=$foo"
echo "Header parameter: user-agent=$user_agent" echo "Header parameter: user-agent=$user_agent"
echo "Script parameters: $1" echo "Script parameters: $1"
@ -168,7 +168,7 @@ The result:
```bash ```bash
$ curl --data @test.json http://localhost:8080/echo?foo=bar $ curl --data @test.json http://localhost:8080/echo?foo=bar
Hook information: name=echo, id=1 Hook information: name=echo, id=1, method=POST
Query parameter: foo=bar Query parameter: foo=bar
Header parameter: user-agent=curl/7.52.1 Header parameter: user-agent=curl/7.52.1
Script parameter: {"message": "this is a test"} Script parameter: {"message": "this is a test"}

View File

@ -12,8 +12,8 @@ import (
"strings" "strings"
"github.com/ncarlier/webhookd/pkg/config" "github.com/ncarlier/webhookd/pkg/config"
"github.com/ncarlier/webhookd/pkg/hook"
"github.com/ncarlier/webhookd/pkg/logger" "github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
"github.com/ncarlier/webhookd/pkg/worker" "github.com/ncarlier/webhookd/pkg/worker"
) )
@ -56,13 +56,13 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) {
return return
} }
// Get script location // Get hook location
p := strings.TrimPrefix(r.URL.Path, "/") hookName := strings.TrimPrefix(r.URL.Path, "/")
if p == "" { if hookName == "" {
infoHandler(w, r) infoHandler(w, r)
return return
} }
script, err := worker.ResolveScript(scriptDir, p) _, err := hook.ResolveScript(scriptDir, hookName)
if err != nil { if err != nil {
logger.Error.Println(err.Error()) logger.Error.Println(err.Error())
http.Error(w, "hook not found", http.StatusNotFound) http.Error(w, "hook not found", http.StatusNotFound)
@ -97,10 +97,23 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) {
// Create work // Create work
timeout := atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout) timeout := atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout)
work := model.NewWorkRequest(p, script, string(body), outputDir, params, timeout) job, err := hook.NewHookJob(&hook.Request{
Name: hookName,
Method: r.Method,
Payload: string(body),
Args: params,
Timeout: timeout,
BaseDir: scriptDir,
OutputDir: outputDir,
})
if err != nil {
logger.Error.Printf("error creating hook job: %v", err)
http.Error(w, "unable to create hook job", http.StatusInternalServerError)
return
}
// Put work in queue // Put work in queue
worker.WorkQueue <- *work worker.WorkQueue <- job
// Use content negotiation to enable Server-Sent Events // Use content negotiation to enable Server-Sent Events
useSSE := r.Method == "GET" && r.Header.Get("Accept") == "text/event-stream" useSSE := r.Method == "GET" && r.Header.Get("Accept") == "text/event-stream"
@ -113,21 +126,18 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) {
} }
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Hook-ID", strconv.FormatUint(work.ID, 10)) w.Header().Set("X-Hook-ID", strconv.FormatUint(job.ID(), 10))
for { for {
msg, open := <-work.MessageChan msg, open := <-job.MessageChan
if !open { if !open {
break break
} }
if useSSE { if useSSE {
fmt.Fprintf(w, "data: %s\n\n", msg) // Send SSE response fmt.Fprintf(w, "data: %s\n\n", msg) // Send SSE response
} else { } else {
fmt.Fprintf(w, "%s\n", msg) // Send chunked response fmt.Fprintf(w, "%s\n", msg) // Send chunked response
} }
// Flush the data immediately instead of buffering it for later. // Flush the data immediately instead of buffering it for later.
flusher.Flush() flusher.Flush()
} }
@ -138,8 +148,8 @@ func getWebhookLog(w http.ResponseWriter, r *http.Request) {
id := path.Base(r.URL.Path) id := path.Base(r.URL.Path)
// Get script location // Get script location
name := path.Dir(strings.TrimPrefix(r.URL.Path, "/")) hookName := path.Dir(strings.TrimPrefix(r.URL.Path, "/"))
_, err := worker.ResolveScript(scriptDir, name) _, err := hook.ResolveScript(scriptDir, hookName)
if err != nil { if err != nil {
logger.Error.Println(err.Error()) logger.Error.Println(err.Error())
http.Error(w, err.Error(), http.StatusNotFound) http.Error(w, err.Error(), http.StatusNotFound)
@ -147,7 +157,7 @@ func getWebhookLog(w http.ResponseWriter, r *http.Request) {
} }
// Retrieve log file // Retrieve log file
logFile, err := worker.RetrieveLogFile(id, name, outputDir) logFile, err := hook.Logs(id, hookName, outputDir)
if err != nil { if err != nil {
logger.Error.Println(err.Error()) logger.Error.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -47,7 +47,7 @@ func routes(conf *config.Config) Routes {
route( route(
"/", "/",
index, index,
middlewares.UseBefore(middleware.Methods("GET", "PATCH", "POST"))..., middlewares...,
), ),
route( route(
staticPath, staticPath,

View File

@ -1,4 +1,4 @@
package worker package hook
import ( import (
"errors" "errors"

256
pkg/hook/job.go Normal file
View File

@ -0,0 +1,256 @@
package hook
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/strcase"
)
var hookID uint64
// Job a hook job
type Job struct {
id uint64
name string
script string
method string
payload string
args []string
MessageChan chan []byte
timeout int
status Status
logFilename string
err error
mutex sync.Mutex
}
// NewHookJob creates new hook job
func NewHookJob(request *Request) (*Job, error) {
script, err := ResolveScript(request.BaseDir, request.Name)
if err != nil {
return nil, err
}
job := &Job{
id: atomic.AddUint64(&hookID, 1),
name: request.Name,
script: script,
method: request.Method,
payload: request.Payload,
args: request.Args,
timeout: request.Timeout,
MessageChan: make(chan []byte),
status: Idle,
}
job.logFilename = path.Join(request.OutputDir, fmt.Sprintf("%s_%d_%s.txt", strcase.ToSnake(job.name), job.id, time.Now().Format("20060102_1504")))
return job, nil
}
func (job *Job) ID() uint64 {
return job.id
}
func (job *Job) Name() string {
return job.name
}
func (job *Job) Err() error {
return job.err
}
// Meta return job meta
func (job *Job) Meta() []string {
return []string{
"hook_id=" + strconv.FormatUint(job.id, 10),
"hook_name=" + job.name,
"hook_method=" + job.method,
}
}
// Terminate set job as terminated
func (job *Job) Terminate(err error) error {
job.mutex.Lock()
defer job.mutex.Unlock()
if err != nil {
job.status = Error
job.err = err
logger.Info.Printf("hook %s#%d done [ERROR]\n", job.Name(), job.ID())
return err
}
job.status = Success
logger.Info.Printf("hook %s#%d done [SUCCESS]\n", job.Name(), job.ID())
return nil
}
// IsTerminated ask if the job is terminated
func (job *Job) IsTerminated() bool {
job.mutex.Lock()
defer job.mutex.Unlock()
return job.status == Success || job.status == Error
}
// Status get job status
func (job *Job) Status() Status {
return job.status
}
// StatusLabel return job status as string
func (job *Job) StatusLabel() string {
switch job.status {
case Error:
return "error"
case Success:
return "success"
case Running:
return "running"
default:
return "idle"
}
}
// SendMessage send message to the message channel
func (job *Job) SendMessage(message string) {
job.MessageChan <- []byte(message)
}
// Logs returns job logs filtered with the prefix
func (job *Job) Logs(prefixFilter string) string {
file, err := os.Open(job.logFilename)
if err != nil {
return err.Error()
}
defer file.Close()
var result bytes.Buffer
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, prefixFilter) {
line = strings.TrimPrefix(line, prefixFilter)
line = strings.TrimLeft(line, " ")
result.WriteString(line + "\n")
}
}
if err := scanner.Err(); err != nil {
return err.Error()
}
return result.String()
}
// Close job message chan
func (job *Job) Close() {
close(job.MessageChan)
}
// Run hook job
func (job *Job) Run() error {
if job.status != Idle {
return fmt.Errorf("unable to run job: status=%s", job.StatusLabel())
}
job.status = Running
logger.Info.Printf("hook %s#%d started...\n", job.name, job.id)
logger.Debug.Printf("hook %s#%d script: %s\n", job.name, job.id, job.script)
logger.Debug.Printf("hook %s#%d parameter: %v\n", job.name, job.id, job.args)
binary, err := exec.LookPath(job.script)
if err != nil {
return job.Terminate(err)
}
// Exec script with parameter...
cmd := exec.Command(binary, job.payload)
// with env variables and hook arguments...
cmd.Env = append(os.Environ(), job.args...)
// and hook meta...
cmd.Env = append(cmd.Env, job.Meta()...)
// using a process group...
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// Open the log file for writing
logFile, err := os.Create(job.logFilename)
if err != nil {
return job.Terminate(err)
}
defer logFile.Close()
logger.Debug.Printf("hook %s#%d output file: %s\n", job.name, job.id, logFile.Name())
wLogFile := bufio.NewWriter(logFile)
defer wLogFile.Flush()
// Combine cmd stdout and stderr
outReader, err := cmd.StdoutPipe()
if err != nil {
return job.Terminate(err)
}
errReader, err := cmd.StderrPipe()
if err != nil {
return job.Terminate(err)
}
cmdReader := io.MultiReader(outReader, errReader)
// Start the script...
err = cmd.Start()
if err != nil {
return job.Terminate(err)
}
// Create wait group to wait for command output completion
var wg sync.WaitGroup
wg.Add(1)
// Write script output to log file and the work message channel
go func(reader io.Reader) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
// writing to the work channel
if !job.IsTerminated() {
job.MessageChan <- []byte(line)
} else {
logger.Error.Printf("hook %s#%d is over ; unable to write more data into the channel: %s\n", job.name, job.id, line)
break
}
// write to stdout if configured
logger.Output.Println(line)
// writing to outfile
if _, err := wLogFile.WriteString(line + "\n"); err != nil {
logger.Error.Println("error while writing into the log file:", logFile.Name(), err)
break
}
}
if err := scanner.Err(); err != nil {
logger.Error.Printf("hook %s#%d is unable to read script stdout: %v\n", job.name, job.id, err)
}
wg.Done()
}(cmdReader)
// Start timeout timer
timer := time.AfterFunc(time.Duration(job.timeout)*time.Second, func() {
logger.Warning.Printf("hook %s#%d has timed out (%ds): killing process #%d ...\n", job.name, job.id, job.timeout, cmd.Process.Pid)
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
})
// Wait for command output completion
wg.Wait()
// Wait for command completion
err = cmd.Wait()
// Stop timeout timer
timer.Stop()
// Mark work as terminated
return job.Terminate(err)
}

View File

@ -1,4 +1,4 @@
package worker package hook
import ( import (
"fmt" "fmt"
@ -9,8 +9,8 @@ import (
"github.com/ncarlier/webhookd/pkg/strcase" "github.com/ncarlier/webhookd/pkg/strcase"
) )
// RetrieveLogFile retrieve work log with its name and id // Logs get hook log with its name and id
func RetrieveLogFile(id, name, base string) (*os.File, error) { func Logs(id, name, base string) (*os.File, error) {
logPattern := path.Join(base, fmt.Sprintf("%s_%s_*.txt", strcase.ToSnake(name), id)) logPattern := path.Join(base, fmt.Sprintf("%s_%s_*.txt", strcase.ToSnake(name), id))
files, err := filepath.Glob(logPattern) files, err := filepath.Glob(logPattern)
if err != nil { if err != nil {

View File

@ -4,29 +4,29 @@ import (
"testing" "testing"
"github.com/ncarlier/webhookd/pkg/assert" "github.com/ncarlier/webhookd/pkg/assert"
"github.com/ncarlier/webhookd/pkg/worker" "github.com/ncarlier/webhookd/pkg/hook"
) )
func TestResolveScript(t *testing.T) { func TestResolveScript(t *testing.T) {
script, err := worker.ResolveScript("../../../scripts", "../scripts/echo") script, err := hook.ResolveScript("../../../scripts", "../scripts/echo")
assert.Nil(t, err, "") assert.Nil(t, err, "")
assert.Equal(t, "../../../scripts/echo.sh", script, "") assert.Equal(t, "../../../scripts/echo.sh", script, "")
} }
func TestNotResolveScript(t *testing.T) { func TestNotResolveScript(t *testing.T) {
_, err := worker.ResolveScript("../../scripts", "foo") _, err := hook.ResolveScript("../../scripts", "foo")
assert.NotNil(t, err, "") assert.NotNil(t, err, "")
assert.Equal(t, "Script not found: ../../scripts/foo.sh", err.Error(), "") assert.Equal(t, "Script not found: ../../scripts/foo.sh", err.Error(), "")
} }
func TestResolveBadScript(t *testing.T) { func TestResolveBadScript(t *testing.T) {
_, err := worker.ResolveScript("../../scripts", "../tests/test_simple") _, err := hook.ResolveScript("../../scripts", "../tests/test_simple")
assert.NotNil(t, err, "") assert.NotNil(t, err, "")
assert.Equal(t, "Invalid script path: ../tests/test_simple.sh", err.Error(), "") assert.Equal(t, "Invalid script path: ../tests/test_simple.sh", err.Error(), "")
} }
func TestResolveScriptWithExtension(t *testing.T) { func TestResolveScriptWithExtension(t *testing.T) {
_, err := worker.ResolveScript("../../scripts", "node.js") _, err := hook.ResolveScript("../../scripts", "node.js")
assert.NotNil(t, err, "") assert.NotNil(t, err, "")
assert.Equal(t, "Script not found: ../../scripts/node.js", err.Error(), "") assert.Equal(t, "Script not found: ../../scripts/node.js", err.Error(), "")
} }

96
pkg/hook/test/job_test.go Normal file
View File

@ -0,0 +1,96 @@
package test
import (
"os"
"strconv"
"testing"
"github.com/ncarlier/webhookd/pkg/assert"
"github.com/ncarlier/webhookd/pkg/hook"
"github.com/ncarlier/webhookd/pkg/logger"
)
func printJobMessages(job *hook.Job) {
go func() {
for {
msg, open := <-job.MessageChan
if !open {
break
}
logger.Info.Println(string(msg))
}
}()
}
func TestHookJob(t *testing.T) {
logger.Init("debug", "out")
req := &hook.Request{
Name: "test_simple",
Method: "GET",
Payload: "{\"foo\": \"bar\"}",
Args: []string{
"name=foo",
"user_agent=test",
},
Timeout: 5,
BaseDir: "../test",
OutputDir: os.TempDir(),
}
job, err := hook.NewHookJob(req)
assert.Nil(t, err, "")
assert.NotNil(t, job, "")
printJobMessages(job)
err = job.Run()
assert.Nil(t, err, "")
assert.Equal(t, job.Status(), hook.Success, "")
assert.Equal(t, job.Logs("notify:"), "OK\n", "")
// Test that we can retrieve log file afterward
id := strconv.FormatUint(job.ID(), 10)
logFile, err := hook.Logs(id, "test", os.TempDir())
assert.Nil(t, err, "Log file should exists")
defer logFile.Close()
assert.NotNil(t, logFile, "Log file should be retrieve")
}
func TestWorkRunnerWithError(t *testing.T) {
logger.Init("debug")
req := &hook.Request{
Name: "test_error",
Method: "POST",
Payload: "",
Args: []string{},
Timeout: 5,
BaseDir: "../test",
OutputDir: os.TempDir(),
}
job, err := hook.NewHookJob(req)
assert.Nil(t, err, "")
assert.NotNil(t, job, "")
printJobMessages(job)
err = job.Run()
assert.NotNil(t, err, "")
assert.Equal(t, job.Status(), hook.Error, "")
assert.Equal(t, "exit status 1", err.Error(), "")
}
func TestWorkRunnerWithTimeout(t *testing.T) {
logger.Init("debug")
req := &hook.Request{
Name: "test_timeout",
Method: "POST",
Payload: "",
Args: []string{},
Timeout: 1,
BaseDir: "../test",
OutputDir: os.TempDir(),
}
job, err := hook.NewHookJob(req)
assert.Nil(t, err, "")
assert.NotNil(t, job, "")
printJobMessages(job)
err = job.Run()
assert.NotNil(t, err, "")
assert.Equal(t, job.Status(), hook.Error, "")
assert.Equal(t, "signal: killed", err.Error(), "")
}

View File

@ -6,6 +6,9 @@ echo "Testing parameters..."
[ -z "$name" ] && echo "Name variable undefined" && exit 1 [ -z "$name" ] && echo "Name variable undefined" && exit 1
[ -z "$user_agent" ] && echo "User-Agent variable undefined" && exit 1 [ -z "$user_agent" ] && echo "User-Agent variable undefined" && exit 1
[ "$user_agent" != "test" ] && echo "Invalid User-Agent variable: $user_agent" && exit 1 [ "$user_agent" != "test" ] && echo "Invalid User-Agent variable: $user_agent" && exit 1
[ -z "$hook_id" ] && echo "Hook ID variable undefined" && exit 1
[ "$hook_name" != "test_simple" ] && echo "Invalid hook name variable: $hook_name" && exit 1
[ "$hook_method" != "GET" ] && echo "Invalid hook method variable: $hook_method" && exit 1
echo "Testing payload..." echo "Testing payload..."
[ -z "$1" ] && echo "Payload undefined" && exit 1 [ -z "$1" ] && echo "Payload undefined" && exit 1

26
pkg/hook/types.go Normal file
View File

@ -0,0 +1,26 @@
package hook
// Status is the status of a hook
type Status int
const (
// Idle means that the hook is not yet started
Idle Status = iota
// Running means that the hook is running
Running
// Success means that the hook over
Success
// Error means that the hook is over but in error
Error
)
// Request is a hook request
type Request struct {
Name string
Method string
Payload string
Args []string
Timeout int
BaseDir string
OutputDir string
}

View File

@ -8,7 +8,7 @@ import (
func Cors(inner http.Handler) http.Handler { func Cors(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "PATCH, POST, GET, OPTIONS") w.Header().Set("Access-Control-Allow-Methods", "*")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization") w.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Content-Length, Accept-Encoding, Authorization")
if r.Method != "OPTIONS" { if r.Method != "OPTIONS" {

View File

@ -1,132 +0,0 @@
package model
import (
"bufio"
"bytes"
"fmt"
"os"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/strcase"
)
var workID uint64
// WorkStatus is the status of a workload
type WorkStatus int
const (
// Idle means that the work is not yet started
Idle WorkStatus = iota
// Running means that the work is running
Running
// Success means that the work over
Success
// Error means that the work is over but in error
Error
)
// WorkRequest is a request of work for a worker
type WorkRequest struct {
ID uint64
Name string
Script string
Payload string
Args []string
MessageChan chan []byte
Timeout int
Status WorkStatus
LogFilename string
Err error
mutex sync.Mutex
}
// NewWorkRequest creates new work request
func NewWorkRequest(name, script, payload, output string, args []string, timeout int) *WorkRequest {
w := &WorkRequest{
ID: atomic.AddUint64(&workID, 1),
Name: name,
Script: script,
Payload: payload,
Args: args,
Timeout: timeout,
MessageChan: make(chan []byte),
Status: Idle,
}
w.LogFilename = path.Join(output, fmt.Sprintf("%s_%d_%s.txt", strcase.ToSnake(w.Name), w.ID, time.Now().Format("20060102_1504")))
return w
}
// Meta return work request meta
func (wr *WorkRequest) Meta() []string {
return []string{
"hook_id=" + strconv.FormatUint(wr.ID, 10),
"hook_name=" + wr.Name,
}
}
// Terminate set work request as terminated
func (wr *WorkRequest) Terminate(err error) error {
wr.mutex.Lock()
defer wr.mutex.Unlock()
if err != nil {
wr.Status = Error
wr.Err = err
logger.Info.Printf("hook %s#%d done [ERROR]\n", wr.Name, wr.ID)
return err
}
wr.Status = Success
logger.Info.Printf("hook %s#%d done [SUCCESS]\n", wr.Name, wr.ID)
return nil
}
// IsTerminated ask if the work request is terminated
func (wr *WorkRequest) IsTerminated() bool {
wr.mutex.Lock()
defer wr.mutex.Unlock()
return wr.Status == Success || wr.Status == Error
}
// StatusLabel return work status as string
func (wr *WorkRequest) StatusLabel() string {
switch wr.Status {
case Error:
return "error"
case Success:
return "success"
case Running:
return "running"
default:
return "idle"
}
}
// GetLogContent returns work logs filtered with the prefix
func (wr *WorkRequest) GetLogContent(prefixFilter string) string {
file, err := os.Open(wr.LogFilename)
if err != nil {
return err.Error()
}
defer file.Close()
var result bytes.Buffer
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, prefixFilter) {
line = strings.TrimPrefix(line, prefixFilter)
line = strings.TrimLeft(line, " ")
result.WriteString(line + "\n")
}
}
if err := scanner.Err(); err != nil {
return err.Error()
}
return result.String()
}

View File

@ -9,7 +9,6 @@ import (
"strings" "strings"
"github.com/ncarlier/webhookd/pkg/logger" "github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
) )
type notifPayload struct { type notifPayload struct {
@ -34,18 +33,18 @@ func newHTTPNotifier(uri *url.URL) *HTTPNotifier {
} }
// Notify send a notification to a HTTP endpoint. // Notify send a notification to a HTTP endpoint.
func (n *HTTPNotifier) Notify(work *model.WorkRequest) error { func (n *HTTPNotifier) Notify(result HookResult) error {
payload := work.GetLogContent(n.PrefixFilter) payload := result.Logs(n.PrefixFilter)
if strings.TrimSpace(payload) == "" { if strings.TrimSpace(payload) == "" {
// Nothing to notify, abort // Nothing to notify, abort
return nil return nil
} }
notif := &notifPayload{ notif := &notifPayload{
ID: strconv.FormatUint(work.ID, 10), ID: strconv.FormatUint(result.ID(), 10),
Name: work.Name, Name: result.Name(),
Text: payload, Text: payload,
Error: work.Err, Error: result.Err(),
} }
notifJSON, err := json.Marshal(notif) notifJSON, err := json.Marshal(notif)
if err != nil { if err != nil {
@ -64,6 +63,6 @@ func (n *HTTPNotifier) Notify(work *model.WorkRequest) error {
return err return err
} }
resp.Body.Close() resp.Body.Close()
logger.Info.Printf("job %s#%d notification sent to %s\n", work.Name, work.ID, n.URL.String()) logger.Info.Printf("job %s#%d notification sent to %s\n", result.Name(), result.ID(), n.URL.String())
return nil return nil
} }

View File

@ -6,23 +6,22 @@ import (
"strings" "strings"
"github.com/ncarlier/webhookd/pkg/logger" "github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
) )
// Notifier is able to send a notification. // Notifier is able to send a notification.
type Notifier interface { type Notifier interface {
Notify(work *model.WorkRequest) error Notify(result HookResult) error
} }
var notifier Notifier var notifier Notifier
// Notify is the global method to notify work // Notify is the global method to notify hook result
func Notify(work *model.WorkRequest) { func Notify(result HookResult) {
if notifier == nil { if notifier == nil {
return return
} }
if err := notifier.Notify(work); err != nil { if err := notifier.Notify(result); err != nil {
logger.Error.Printf("unable to send notification for webhook %s#%d: %v\n", work.Name, work.ID, err) logger.Error.Printf("unable to send notification for webhook %s#%d: %v\n", result.Name(), result.ID(), err)
} }
} }

View File

@ -11,7 +11,6 @@ import (
"time" "time"
"github.com/ncarlier/webhookd/pkg/logger" "github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
) )
// SMTPNotifier is able to send notification to a email destination. // SMTPNotifier is able to send notification to a email destination.
@ -41,15 +40,15 @@ func newSMTPNotifier(uri *url.URL) *SMTPNotifier {
} }
} }
func (n *SMTPNotifier) buildEmailPayload(work *model.WorkRequest) string { func (n *SMTPNotifier) buildEmailPayload(result HookResult) string {
// Get email body // Get email body
body := work.GetLogContent(n.PrefixFilter) body := result.Logs(n.PrefixFilter)
if strings.TrimSpace(body) == "" { if strings.TrimSpace(body) == "" {
return "" return ""
} }
// Build email subject // Build email subject
subject := buildSubject(n.Subject, work) subject := buildSubject(n.Subject, result)
// Build email headers // Build email headers
headers := make(map[string]string) headers := make(map[string]string)
@ -67,9 +66,9 @@ func (n *SMTPNotifier) buildEmailPayload(work *model.WorkRequest) string {
} }
// Notify send a notification to a email destination. // Notify send a notification to a email destination.
func (n *SMTPNotifier) Notify(work *model.WorkRequest) error { func (n *SMTPNotifier) Notify(result HookResult) error {
hostname, _, _ := net.SplitHostPort(n.Host) hostname, _, _ := net.SplitHostPort(n.Host)
payload := n.buildEmailPayload(work) payload := n.buildEmailPayload(result)
if payload == "" { if payload == "" {
// Nothing to notify, abort // Nothing to notify, abort
return nil return nil
@ -127,15 +126,15 @@ func (n *SMTPNotifier) Notify(work *model.WorkRequest) error {
return err return err
} }
logger.Info.Printf("job %s#%d notification sent to %s\n", work.Name, work.ID, n.To) logger.Info.Printf("job %s#%d notification sent to %s\n", result.Name(), result.ID(), n.To)
// Send the QUIT command and close the connection. // Send the QUIT command and close the connection.
return client.Quit() return client.Quit()
} }
func buildSubject(template string, work *model.WorkRequest) string { func buildSubject(template string, result HookResult) string {
result := strings.ReplaceAll(template, "{name}", work.Name) subject := strings.ReplaceAll(template, "{name}", result.Name())
result = strings.ReplaceAll(result, "{id}", strconv.FormatUint(uint64(work.ID), 10)) subject = strings.ReplaceAll(subject, "{id}", strconv.FormatUint(uint64(result.ID()), 10))
result = strings.ReplaceAll(result, "{status}", work.StatusLabel()) subject = strings.ReplaceAll(subject, "{status}", result.StatusLabel())
return result return subject
} }

View File

@ -0,0 +1,9 @@
package notification
type HookResult interface {
ID() uint64
Name() string
Logs(filter string) string
StatusLabel() string
Err() error
}

View File

@ -2,19 +2,18 @@ package worker
import ( import (
"github.com/ncarlier/webhookd/pkg/logger" "github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
) )
// WorkerQueue is the global queue of Workers // WorkerQueue is the global queue of Workers
var WorkerQueue chan chan model.WorkRequest var WorkerQueue chan chan Work
// WorkQueue is the global queue of work to dispatch // WorkQueue is the global queue of work to dispatch
var WorkQueue = make(chan model.WorkRequest, 100) var WorkQueue = make(chan Work, 100)
// StartDispatcher is charged to start n workers. // StartDispatcher is charged to start n workers.
func StartDispatcher(nworkers int) { func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the workers' work channels into. // First, initialize the channel we are going to but the workers' work channels into.
WorkerQueue = make(chan chan model.WorkRequest, nworkers) WorkerQueue = make(chan chan Work, nworkers)
// Now, create all of our workers. // Now, create all of our workers.
for i := 0; i < nworkers; i++ { for i := 0; i < nworkers; i++ {
@ -30,7 +29,7 @@ func StartDispatcher(nworkers int) {
go func() { go func() {
worker := <-WorkerQueue worker := <-WorkerQueue
logger.Debug.Printf("dispatching hook request: %s#%d", work.Name, work.ID) logger.Debug.Printf("dispatching hook request: %s#%d", work.Name(), work.ID())
worker <- work worker <- work
}() }()
} }

View File

@ -1,72 +0,0 @@
package test
import (
"os"
"strconv"
"testing"
"github.com/ncarlier/webhookd/pkg/assert"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
"github.com/ncarlier/webhookd/pkg/worker"
)
func printWorkMessages(work *model.WorkRequest) {
go func() {
for {
msg, open := <-work.MessageChan
if !open {
break
}
logger.Info.Println(string(msg))
}
}()
}
func TestWorkRunner(t *testing.T) {
logger.Init("debug", "out")
script := "./test_simple.sh"
args := []string{
"name=foo",
"user_agent=test",
}
payload := "{\"foo\": \"bar\"}"
work := model.NewWorkRequest("test", script, payload, os.TempDir(), args, 5)
assert.NotNil(t, work, "")
printWorkMessages(work)
err := worker.Run(work)
assert.Nil(t, err, "")
assert.Equal(t, work.Status, model.Success, "")
assert.Equal(t, work.GetLogContent("notify:"), "OK\n", "")
// Test that we can retrieve log file afterward
id := strconv.FormatUint(work.ID, 10)
logFile, err := worker.RetrieveLogFile(id, "test", os.TempDir())
defer logFile.Close()
assert.Nil(t, err, "Log file should exists")
assert.NotNil(t, logFile, "Log file should be retrieve")
}
func TestWorkRunnerWithError(t *testing.T) {
logger.Init("debug")
script := "./test_error.sh"
work := model.NewWorkRequest("test", script, "", os.TempDir(), []string{}, 5)
assert.NotNil(t, work, "")
printWorkMessages(work)
err := worker.Run(work)
assert.NotNil(t, err, "")
assert.Equal(t, work.Status, model.Error, "")
assert.Equal(t, "exit status 1", err.Error(), "")
}
func TestWorkRunnerWithTimeout(t *testing.T) {
logger.Init("debug")
script := "./test_timeout.sh"
work := model.NewWorkRequest("test", script, "", os.TempDir(), []string{}, 1)
assert.NotNil(t, work, "")
printWorkMessages(work)
err := worker.Run(work)
assert.NotNil(t, err, "")
assert.Equal(t, work.Status, model.Error, "")
assert.Equal(t, "signal: killed", err.Error(), "")
}

22
pkg/worker/types.go Normal file
View File

@ -0,0 +1,22 @@
package worker
// ChanWriter is a simple writer to a channel of byte.
type ChanWriter struct {
ByteChan chan []byte
}
func (c *ChanWriter) Write(p []byte) (int, error) {
c.ByteChan <- p
return len(p), nil
}
type Work interface {
ID() uint64
Name() string
Run() error
Close()
SendMessage(message string)
Logs(filter string) string
StatusLabel() string
Err() error
}

View File

@ -1,122 +0,0 @@
package worker
import (
"bufio"
"io"
"os"
"os/exec"
"sync"
"syscall"
"time"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
)
// ChanWriter is a simple writer to a channel of byte.
type ChanWriter struct {
ByteChan chan []byte
}
func (c *ChanWriter) Write(p []byte) (int, error) {
c.ByteChan <- p
return len(p), nil
}
// Run work request
func Run(work *model.WorkRequest) error {
work.Status = model.Running
logger.Info.Printf("hook %s#%d started...\n", work.Name, work.ID)
logger.Debug.Printf("hook %s#%d script: %s\n", work.Name, work.ID, work.Script)
logger.Debug.Printf("hook %s#%d parameter: %v\n", work.Name, work.ID, work.Args)
binary, err := exec.LookPath(work.Script)
if err != nil {
return work.Terminate(err)
}
// Exec script with parameter...
cmd := exec.Command(binary, work.Payload)
// with env variables and hook arguments...
cmd.Env = append(os.Environ(), work.Args...)
// and hook meta...
cmd.Env = append(cmd.Env, work.Meta()...)
// using a process group...
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// Open the log file for writing
logFile, err := os.Create(work.LogFilename)
if err != nil {
return work.Terminate(err)
}
defer logFile.Close()
logger.Debug.Printf("hook %s#%d output file: %s\n", work.Name, work.ID, logFile.Name())
wLogFile := bufio.NewWriter(logFile)
defer wLogFile.Flush()
// Combine cmd stdout and stderr
outReader, err := cmd.StdoutPipe()
if err != nil {
return work.Terminate(err)
}
errReader, err := cmd.StderrPipe()
if err != nil {
return work.Terminate(err)
}
cmdReader := io.MultiReader(outReader, errReader)
// Start the script...
err = cmd.Start()
if err != nil {
return work.Terminate(err)
}
// Create wait group to wait for command output completion
var wg sync.WaitGroup
wg.Add(1)
// Write script output to log file and the work message channel
go func(reader io.Reader) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
// writing to the work channel
if !work.IsTerminated() {
work.MessageChan <- []byte(line)
} else {
logger.Error.Printf("hook %s#%d is over ; unable to write more data into the channel: %s\n", work.Name, work.ID, line)
break
}
// write to stdout if configured
logger.Output.Println(line)
// writing to outfile
if _, err := wLogFile.WriteString(line + "\n"); err != nil {
logger.Error.Println("error while writing into the log file:", logFile.Name(), err)
break
}
}
if err := scanner.Err(); err != nil {
logger.Error.Printf("hook %s#%d is unable to read script stdout: %v\n", work.Name, work.ID, err)
}
wg.Done()
}(cmdReader)
// Start timeout timer
timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() {
logger.Warning.Printf("hook %s#%d has timed out (%ds): killing process #%d ...\n", work.Name, work.ID, work.Timeout, cmd.Process.Pid)
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
})
// Wait for command output completion
wg.Wait()
// Wait for command completion
err = cmd.Wait()
// Stop timeout timer
timer.Stop()
// Mark work as terminated
return work.Terminate(err)
}

View File

@ -6,16 +6,15 @@ import (
"github.com/ncarlier/webhookd/pkg/metric" "github.com/ncarlier/webhookd/pkg/metric"
"github.com/ncarlier/webhookd/pkg/logger" "github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
"github.com/ncarlier/webhookd/pkg/notification" "github.com/ncarlier/webhookd/pkg/notification"
) )
// NewWorker creates, and returns a new Worker object. // NewWorker creates, and returns a new Worker object.
func NewWorker(id int, workerQueue chan chan model.WorkRequest) Worker { func NewWorker(id int, workerQueue chan chan Work) Worker {
// Create, and return the worker. // Create, and return the worker.
worker := Worker{ worker := Worker{
ID: id, ID: id,
Work: make(chan model.WorkRequest), Work: make(chan Work),
WorkerQueue: workerQueue, WorkerQueue: workerQueue,
QuitChan: make(chan bool), QuitChan: make(chan bool),
} }
@ -26,8 +25,8 @@ func NewWorker(id int, workerQueue chan chan model.WorkRequest) Worker {
// Worker is a go routine in charge of executing a work. // Worker is a go routine in charge of executing a work.
type Worker struct { type Worker struct {
ID int ID int
Work chan model.WorkRequest Work chan Work
WorkerQueue chan chan model.WorkRequest WorkerQueue chan chan Work
QuitChan chan bool QuitChan chan bool
} }
@ -42,17 +41,17 @@ func (w Worker) Start() {
select { select {
case work := <-w.Work: case work := <-w.Work:
// Receive a work request. // Receive a work request.
logger.Debug.Printf("worker #%d received hook request: %s#%d\n", w.ID, work.Name, work.ID) logger.Debug.Printf("worker #%d received hook request: %s#%d\n", w.ID, work.Name(), work.ID())
metric.Requests.Add(1) metric.Requests.Add(1)
err := Run(&work) err := work.Run()
if err != nil { if err != nil {
metric.RequestsFailed.Add(1) metric.RequestsFailed.Add(1)
work.MessageChan <- []byte(fmt.Sprintf("error: %s", err.Error())) work.SendMessage(fmt.Sprintf("error: %s", err.Error()))
} }
// Send notification // Send notification
go notification.Notify(&work) go notification.Notify(work)
close(work.MessageChan) work.Close()
case <-w.QuitChan: case <-w.QuitChan:
logger.Debug.Printf("stopping worker #%d...\n", w.ID) logger.Debug.Printf("stopping worker #%d...\n", w.ID)
return return

10
scripts/async.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/bash
echo "Starting background job..."
nohup ./scripts/long.sh >/tmp/long.log 2>&1 &
echo "Background job started."

View File

@ -4,7 +4,7 @@
echo "This is a simple echo hook." echo "This is a simple echo hook."
echo "Hook information: name=$hook_name, id=$hook_id" echo "Hook information: name=$hook_name, id=$hook_id, method=$hook_method"
echo "Command result: hostname=`hostname`" echo "Command result: hostname=`hostname`"

12
scripts/long.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/bash
echo "Running long script..."
for i in {1..20}; do
sleep 1
echo "running ${i} ..."
done
echo "Long script end"
exit 0

16
tooling/bench/simple.js Normal file
View File

@ -0,0 +1,16 @@
import http from 'k6/http';
import { check, sleep } from 'k6';
export const options = {
stages: [
{ duration: '30s', target: 20 },
{ duration: '1m', target: 10 },
{ duration: '20s', target: 0 },
],
};
export default function () {
const res = http.get('http://localhost:8080/echo');
check(res, { 'status was 200': (r) => r.status == 200 });
sleep(1);
}