feat(notification): complete refactoring of the notification system

- URL based configuration
- Only prefixed output lines are notified
- HTTP notifier: send a JSON with notification in the text attribute
- SMTP notifier: send an email with notification text in body
This commit is contained in:
Nicolas Carlier 2018-12-24 13:40:00 +00:00
parent adead6d3b3
commit 1dab1e968d
19 changed files with 363 additions and 380 deletions

48
.env
View File

@ -1,48 +0,0 @@
###
# Webhookd configuration.
###
# Working directory.
# Defaults: /tmp
APP_WORKING_DIR=/var/opt/webhookd/work
# Scripts directory.
# Defaults: ./scripts
APP_SCRIPTS_DIR=/var/opt/webhookd/scripts
# Notifier.
# Notify script execution result and logs.
# Values:
# - "http": Send notification with an HTTP hook (compatible with Mailgun API).
# - "smtp": Send notification by mail.
# - "": No notification (defaults).
APP_NOTIFIER=http
# Notifier FROM directive.
# Defaults: "webhookd <noreply@nunux.org>"
APP_NOTIFIER_FROM=Mailgun Sandbox <postmaster@sandboxdexxxxxxxxxxxxxxxxxx.mailgun.org>
# Notifier TO directive.
# Defaults: "hostmaster@nunux.org"
APP_NOTIFIER_TO=foo@bar.org
# HTTP notifier URL.
APP_HTTP_NOTIFIER_URL=http://requestb.in/1gd3ond1
#APP_HTTP_NOTIFIER_URL=https://api.mailgun.net/v2/sandboxdexxxxxxxxxxxxxxxxx.mailgun.org/messages
# HTTP notifier user:password.
APP_HTTP_NOTIFIER_USER=api:key-xxxxxxxxxxxxxxxxxxxxxxxxxx
# SMTP notifier host.
# Defaults: localhost:25
APP_SMTP_NOTIFIER_HOST=localhost:25
# Authentication Method
# Defaults: none
# Values:
# - "none" : No authentication (defaults).
# - "basic": HTTP Basic authentication.
AUTH=none
# Authentication Parameter
AUTH_PARAM=username:password

View File

@ -26,16 +26,13 @@ $ sudo curl -s https://raw.githubusercontent.com/ncarlier/webhookd/master/instal
```bash
$ docker run -d --name=webhookd \
--env-file .env \
-v ${PWD}/scripts:/var/opt/webhookd/scripts \
-p 8080:8080 \
ncarlier/webhookd
```
Check the provided environment file [.env](.env) for configuration details.
> Note that this image extends `docker:dind` Docker image. Therefore you are
> able to interact with a Docker daemon with yours shell scripts.
> Note that this image extends `docker:dind` Docker image.
> Therefore you are able to interact with a Docker daemon with yours shell scripts.
## Configuration
@ -53,11 +50,7 @@ You can configure the daemon by:
| `APP_SCRIPTS_GIT_URL` | none | GIT repository that contains scripts (Note: this is only used by the Docker image or by using the Docker entrypoint script) |
| `APP_SCRIPTS_GIT_KEY` | none | GIT SSH private key used to clone the repository (Note: this is only used by the Docker image or by using the Docker entrypoint script) |
| `APP_LOG_DIR` | `/tmp` (OS temp dir) | Directory to store execution logs |
| `APP_NOTIFIER` | none | Post script notification (`http` or `smtp`) |
| `APP_NOTIFIER_FROM` | none | Sender of the notification |
| `APP_NOTIFIER_TO` | none | Recipient of the notification |
| `APP_HTTP_NOTIFIER_URL` | none | URL of the HTTP notifier |
| `APP_SMTP_NOTIFIER_HOST` | none | Hostname of the SMTP relay |
| `APP_NOTIFICATION_URI` | none | Notification configuration URI |
| `APP_DEBUG` | `false` | Output debug logs |
### Using command parameters:
@ -70,6 +63,8 @@ You can configure the daemon by:
| `--nb-workers <workers>` | `2` | The number of workers to start |
| `--scripts <dir>` | `./scripts` | Scripts directory |
| `--timeout <timeout>` | `10` | Hook maximum delay before timeout (in second) |
| `--notification-uri <uri>` | | Notification configuration URI |
| `--log-dir <dir>` | `/tmp` | Directory to store execution logs |
## Usage
@ -95,8 +90,7 @@ You can override the default using the `APP_SCRIPTS_DIR` environment variable.
The directory structure define the webhook URL.
The Webhook can only be call with HTTP POST verb.
If the script exists, the HTTP response will be a `text/event-stream` content
type (Server-sent events).
If the script exists, the HTTP response will be a `text/event-stream` content type (Server-sent events).
*Example:*
@ -122,15 +116,13 @@ data: done
You have several way to provide parameters to your webhook script:
- URL query parameters and HTTP headers are converted into environment
variables.
- URL query parameters and HTTP headers are converted into environment variables.
Variable names follows "snakecase" naming convention.
Therefore the name can be altered.
*ex: `CONTENT-TYPE` will become `content_type`.*
- Body content (text/plain or application/json) is transmit to the script as
parameter.
- Body content (text/plain or application/json) is transmit to the script as parameter.
*Example:*
@ -195,32 +187,68 @@ $ curl http://localhost:8080/echo/2
### Post hook notifications
The script's output is collected and stored into a log file (configured by the
`APP_WORKING_DIR` environment variable).
The output of the script is collected and stored into a log file
(configured by the `APP_LOG_DIR` environment variable).
Once the script executed, you can send the result and this log file to a
notification channel. Currently only two channels are supported: Email and HTTP.
Once the script is executed, you can send the result and this log file to a notification channel.
Currently, only two channels are supported: `Email` and `HTTP`.
Notifications configuration can be done as follow:
```bash
$ export APP_NOTIFICATION_URI=http://requestb.in/v9b229v9
$ # or
$ webhookd --notification-uri=http://requestb.in/v9b229v9
```
Note that only the output of the script prefixed by "notify:" is sent to the notification channel.
If the output does not contain a prefixed line, no notification will be sent.
**Example:**
```bash
#!/bin/bash
echo "notify: Hello World" # Will be notified
echo "Goodbye" # Will not be notified
```
You can overide the notification prefix by adding `prefix` as a query parameter to the configuration URL.
**Example:** http://requestb.in/v9b229v9?prefix="foo:"
#### HTTP notification
HTTP notification configuration:
Configuration URI: `http://example.org`
- **APP_NOTIFIER**=http
- **APP_NOTIFIER_FROM**=webhookd <noreply@nunux.org>
- **APP_NOTIFIER_TO**=hostmaster@nunux.org
- **APP_HTTP_NOTIFIER_URL**=http://requestb.in/v9b229v9
Options (using query parameters):
> Note that the HTTP notification is compatible with
[Mailgun](https://mailgun.com) API.
- `prefix`: Prefix to filter output log
The following JSON payload is POST to the target URL:
```json
{
"id": "42",
"name": "echo",
"text": "foo\nbar...\n",
"error": "Error cause... if present",
}
```
Note that because the payload have a `text` attribute, you can use a [Mattermost][mattermost] webhook endpoint.
[mattermost]: https://docs.mattermost.com/developer/webhooks-incoming.html
#### Email notification
SMTP notification configuration:
Configuration URI: `mailto:foo@bar.com`
- **APP_NOTIFIER**=smtp
- **APP_SMTP_NOTIFIER_HOST**=localhost:25
Options (using query parameters):
The log file will be sent as an GZIP attachment.
- `prefix`: Prefix to filter output log
- `smtp`: SMTP host to use (by default: `localhost:25`)
- `from`: Sender email (by default: `webhookd <noreply@nunux.org>`)
### Authentication

6
go.mod
View File

@ -1,3 +1,7 @@
module github.com/ncarlier/webhookd
require golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9
require (
github.com/derekparker/delve v1.1.0 // indirect
github.com/ncarlier/feedpushr v0.0.0-20181216134954-14f9db944a7c // indirect
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9
)

56
go.sum
View File

@ -1,2 +1,58 @@
github.com/PuerkitoBio/goquery v0.0.0-20180128195650-09540e565986/go.mod h1:T9ezsOHcCrDCgA8aF1Cqr3sSYbO/xgdy8/R/XiIMAhA=
github.com/RadhiFadlillah/go-readability v0.0.0-20180309001606-c5ea8f8edeeb/go.mod h1:O85PMgDnPPdxtlnpDVoCJsZT+xgdcJpJ9dl2q5IdcSA=
github.com/abadojack/whatlanggo v0.0.0-20180210192521-9f160b17f219/go.mod h1:JdU7lKuvX8qqAFrjw25JGWCVmbEzBknb3+L69+8BRJo=
github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/andybalholm/cascadia v0.0.0-20180220184336-901648c87902/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/armon/go-metrics v0.0.0-20180221182744-783273d70314/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/certifi/gocertifi v0.0.0-20180118203423-deb3ae2ef261/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4=
github.com/coreos/bbolt v0.0.0-20160818170152-583e8937c61f/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/delve v1.1.0 h1:icd65nMp7s2HiLz6y/6RCVXBdoED3xxYLwX09EMaRCc=
github.com/derekparker/delve v1.1.0/go.mod h1:pMSZMfp0Nhbm8qdZJkuE/yPGOkLpGXLS1I4poXQpuJU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598/go.mod h1:0FpDmbrt36utu8jEmeU05dPC9AB5tsLYVVi+ZHfyuwI=
github.com/dimfeld/httptreemux v0.0.0-20180213074414-7f532489e773/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/getsentry/raven-go v0.0.0-20181128221106-f04e7487e9a6/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goadesign/goa v1.4.0/go.mod h1:d/9lpuZBK7HFi/7O0oXfwvdoIl+nx2bwKqctZe/lQao=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-immutable-radix v0.0.0-20180129170900-7f3cd4390caa/go.mod h1:6ij3Z20p+OhOkCSrA0gImAWoHYQRGbnlcuk6XYTiaRw=
github.com/hashicorp/go-multierror v0.0.0-20171204182908-b7773ae21874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/inconshreveable/log15 v0.0.0-20180818164646-67afb5ed74ec/go.mod h1:cOaXtrgN4ScfRrD9Bre7U1thNq5RtJ8ZoP4iXVGRj6o=
github.com/inconshreveable/mousetrap v0.0.0-20141017200713-76626ae9c91c/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/kelseyhightower/envconfig v0.0.0-20170124162813-f611eb38b387/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d/go.mod h1:WZy8Q5coAB1zhY9AOBJP0O6J4BuDfbupUDavKY+I3+s=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mmcdole/gofeed v0.0.0-20180103222911-42010a154d24/go.mod h1:/BF9JneEL2/flujm8XHoxUcghdTV6vvb3xx/vKyChFU=
github.com/mmcdole/goxpp v0.0.0-20170720115402-77e4a51a73ed/go.mod h1:pasqhqstspkosTneA62Nc+2p9SOBBYAPbnmRRWPQ0V8=
github.com/ncarlier/feedpushr v0.0.0-20181216134954-14f9db944a7c h1:/Om0kMR5zqqAqyR9IK1MJ0YC/PBfiCCBUJDZIKGNdFo=
github.com/ncarlier/feedpushr v0.0.0-20181216134954-14f9db944a7c/go.mod h1:JlYQpiG4XglLo+qK5Z2jfyaCH7Q+f7lutJYCbol5e5Q=
github.com/pkg/errors v0.0.0-20160929014801-645ef00459ed/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rakyll/statik v0.1.5/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs=
github.com/rs/zerolog v0.0.0-20180213000527-56a970de5102/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/satori/go.uuid v0.0.0-20180103174451-36e9d2ebbde5/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/cobra v0.0.0-20171012182533-7b2c5ac9fc04/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v0.0.0-20170508184408-e57e3eeb33f7/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/ugorji/go/codec v0.0.0-20181209151446-772ced7fd4c2/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/zach-klippenstein/goregen v0.0.0-20160303162051-795b5e3961ea/go.mod h1:eNr558nEUjP8acGw8FFjTeWvSgU1stO7FAO6eknhHe4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.0.0-20171214130843-f21a4dfb5e38/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20181214171254-3c39ce7b6105/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -11,6 +11,7 @@ import (
"github.com/ncarlier/webhookd/pkg/api"
"github.com/ncarlier/webhookd/pkg/config"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/notification"
"github.com/ncarlier/webhookd/pkg/worker"
)
@ -38,6 +39,11 @@ func main() {
ErrorLog: logger.Error,
}
// Configure notification
if err := notification.Init(*conf.NotificationURI); err != nil {
logger.Error.Fatalf("Unable to create notification channel: %v\n", err)
}
// Start the dispatcher.
logger.Debug.Printf("Starting the dispatcher (%d workers)...\n", *conf.NbWorkers)
worker.StartDispatcher(*conf.NbWorkers)

View File

@ -11,6 +11,7 @@ import (
"github.com/ncarlier/webhookd/pkg/config"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
"github.com/ncarlier/webhookd/pkg/tools"
"github.com/ncarlier/webhookd/pkg/worker"
)
@ -76,7 +77,7 @@ func triggerWebhook(w http.ResponseWriter, r *http.Request) {
// Create work
timeout := atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout)
work := worker.NewWorkRequest(p, script, string(body), params, timeout)
work := model.NewWorkRequest(p, script, string(body), params, timeout)
// Put work in queue
worker.WorkQueue <- *work
@ -114,8 +115,8 @@ func getWebhookLog(w http.ResponseWriter, r *http.Request) {
return
}
// Get log file
logFile, err := worker.GetLogFile(id, name)
// Retrieve log file
logFile, err := worker.RetrieveLogFile(id, name)
if err != nil {
logger.Error.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -8,21 +8,25 @@ import (
// Config contain global configuration
type Config struct {
ListenAddr *string
NbWorkers *int
Debug *bool
Timeout *int
ScriptDir *string
PasswdFile *string
ListenAddr *string
NbWorkers *int
Debug *bool
Timeout *int
ScriptDir *string
PasswdFile *string
LogDir *string
NotificationURI *string
}
var config = &Config{
ListenAddr: flag.String("listen", getEnv("LISTEN_ADDR", ":8080"), "HTTP service address (e.g.address, ':8080')"),
NbWorkers: flag.Int("nb-workers", getIntEnv("NB_WORKERS", 2), "The number of workers to start"),
Debug: flag.Bool("debug", getBoolEnv("DEBUG", false), "Output debug logs"),
Timeout: flag.Int("timeout", getIntEnv("HOOK_TIMEOUT", 10), "Hook maximum delay before timeout (in second)"),
ScriptDir: flag.String("scripts", getEnv("SCRIPTS_DIR", "scripts"), "Scripts directory"),
PasswdFile: flag.String("passwd", getEnv("PASSWD_FILE", ".htpasswd"), "Password file (encoded with htpasswd)"),
ListenAddr: flag.String("listen", getEnv("LISTEN_ADDR", ":8080"), "HTTP service address (e.g.address, ':8080')"),
NbWorkers: flag.Int("nb-workers", getIntEnv("NB_WORKERS", 2), "The number of workers to start"),
Debug: flag.Bool("debug", getBoolEnv("DEBUG", false), "Output debug logs"),
Timeout: flag.Int("timeout", getIntEnv("HOOK_TIMEOUT", 10), "Hook maximum delay before timeout (in second)"),
ScriptDir: flag.String("scripts", getEnv("SCRIPTS_DIR", "scripts"), "Scripts directory"),
PasswdFile: flag.String("passwd", getEnv("PASSWD_FILE", ".htpasswd"), "Password file (encoded with htpasswd)"),
LogDir: flag.String("log-dir", getEnv("LOG_DIR", os.TempDir()), "Webhooks execution log directory"),
NotificationURI: flag.String("notification-uri", getEnv("NOTIFICATION_URI", ""), "Notification URI"),
}
func init() {

View File

@ -1,10 +1,19 @@
package worker
package model
import (
"bufio"
"bytes"
"fmt"
"os"
"path"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ncarlier/webhookd/pkg/config"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/tools"
)
var workID uint64
@ -33,12 +42,14 @@ type WorkRequest struct {
MessageChan chan []byte
Timeout int
Status WorkStatus
LogFilename string
Err error
mutex sync.Mutex
}
// NewWorkRequest creats new work request
func NewWorkRequest(name, script, payload string, args []string, timeout int) *WorkRequest {
return &WorkRequest{
w := &WorkRequest{
ID: atomic.AddUint64(&workID, 1),
Name: name,
Script: script,
@ -48,6 +59,8 @@ func NewWorkRequest(name, script, payload string, args []string, timeout int) *W
MessageChan: make(chan []byte),
Status: Idle,
}
w.LogFilename = path.Join(*config.Get().LogDir, fmt.Sprintf("%s_%d_%s.txt", tools.ToSnakeCase(w.Name), w.ID, time.Now().Format("20060102_1504")))
return w
}
// Terminate set work request as terminated
@ -56,6 +69,7 @@ func (wr *WorkRequest) Terminate(err error) error {
defer wr.mutex.Unlock()
if err != nil {
wr.Status = Error
wr.Err = err
logger.Info.Printf("Work %s#%d done [ERROR]\n", wr.Name, wr.ID)
return err
}
@ -70,3 +84,27 @@ func (wr *WorkRequest) IsTerminated() bool {
defer wr.mutex.Unlock()
return wr.Status == Success || wr.Status == Error
}
// GetLogContent returns work logs filtered with the prefix
func (wr *WorkRequest) GetLogContent(prefixFilter string) string {
file, err := os.Open(wr.LogFilename)
if err != nil {
return err.Error()
}
defer file.Close()
var result bytes.Buffer
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, prefixFilter) {
line = strings.TrimPrefix(line, prefixFilter)
line = strings.TrimLeft(line, " ")
result.WriteString(line + "\n")
}
}
if err := scanner.Err(); err != nil {
return err.Error()
}
return result.String()
}

View File

@ -2,139 +2,65 @@ package notification
import (
"bytes"
"fmt"
"io"
"mime/multipart"
"encoding/json"
"net/http"
"net/textproto"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
)
type notifPayload struct {
ID string `json:"id"`
Name string `json:"name"`
Text string `json:"text"`
Error error `json:"error,omitempty"`
}
// HTTPNotifier is able to send a notification to a HTTP endpoint.
type HTTPNotifier struct {
URL string
From string
To string
User []string
URL *url.URL
PrefixFilter string
}
func newHTTPNotifier() *HTTPNotifier {
notifier := new(HTTPNotifier)
notifier.URL = os.Getenv("APP_HTTP_NOTIFIER_URL")
if notifier.URL == "" {
logger.Error.Println("Unable to create HTTP notifier. APP_HTTP_NOTIFIER_URL not set.")
return nil
func newHTTPNotifier(uri *url.URL) *HTTPNotifier {
logger.Info.Println("Using HTTP notification system: ", uri.String())
return &HTTPNotifier{
URL: uri,
PrefixFilter: getValueOrAlt(uri.Query(), "prefix", "notify:"),
}
user := os.Getenv("APP_HTTP_NOTIFIER_USER")
if user != "" {
notifier.User = strings.Split(user, ":")
}
notifier.From = os.Getenv("APP_NOTIFIER_FROM")
if notifier.From == "" {
notifier.From = "webhookd <noreply@nunux.org>"
}
notifier.To = os.Getenv("APP_NOTIFIER_TO")
if notifier.To == "" {
notifier.To = "hostmaster@nunux.org"
}
return notifier
}
// Notify send a notification to a HTTP endpoint.
func (n *HTTPNotifier) Notify(subject string, text string, attachfile string) {
logger.Debug.Println("Sending notification '" + subject + "' to " + n.URL + " ...")
data := make(url.Values)
data.Set("from", n.From)
data.Set("to", n.To)
data.Set("subject", subject)
data.Set("text", text)
if attachfile != "" {
file, err := os.Open(attachfile)
if err != nil {
logger.Error.Println("Unable to open notification attachment file", err)
return
}
defer file.Close()
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
mh := make(textproto.MIMEHeader)
mh.Set("Content-Type", "application/x-gzip")
mh.Set("Content-Disposition", fmt.Sprintf("form-data; name=\"attachment\"; filename=\"%s\"", filepath.Base(attachfile)))
part, err := writer.CreatePart(mh)
if err != nil {
logger.Error.Println("Unable to create HTTP notification attachment", err)
return
}
_, err = io.Copy(part, file)
for key, val := range data {
_ = writer.WriteField(key, val[0])
}
err = writer.Close()
if err != nil {
logger.Error.Println("Unable to close the gzip writer", err)
return
}
req, err := http.NewRequest("POST", n.URL, body)
if err != nil {
logger.Error.Println("Unable to post HTTP notification", err)
}
defer req.Body.Close()
req.Header.Set("Content-Type", writer.FormDataContentType())
if len(n.User) == 2 {
req.SetBasicAuth(n.User[0], n.User[1])
}
// Submit the request
client := &http.Client{}
res, err := client.Do(req)
if err != nil {
logger.Error.Println("Unable to do HTTP notification request", err)
return
}
// Check the response
if res.StatusCode != http.StatusOK {
logger.Error.Println("HTTP notification bad response: ", res.Status)
logger.Debug.Println(res.Body)
return
}
logger.Info.Println("HTTP notification sent with attachment: ", attachfile)
} else {
req, err := http.NewRequest("POST", n.URL, bytes.NewBufferString(data.Encode()))
if err != nil {
logger.Error.Println("Unable to post HTTP notification request", err)
}
defer req.Body.Close()
if len(n.User) == 2 {
req.SetBasicAuth(n.User[0], n.User[1])
}
// Submit the request
client := &http.Client{}
res, err := client.Do(req)
if err != nil {
logger.Error.Println("Unable to do the HTTP notification request", err)
return
}
// Check the response
if res.StatusCode != http.StatusOK {
logger.Error.Println("HTTP notification bad response: ", res.Status)
logger.Debug.Println(res.Body)
return
}
logger.Info.Println("HTTP notification sent.")
func (n *HTTPNotifier) Notify(work *model.WorkRequest) error {
payload := work.GetLogContent(n.PrefixFilter)
if strings.TrimSpace(payload) == "" {
// Nothing to notify, abort
return nil
}
notif := &notifPayload{
ID: strconv.FormatUint(work.ID, 10),
Name: work.Name,
Text: payload,
Error: work.Err,
}
notifJSON, err := json.Marshal(notif)
if err != nil {
return err
}
req, err := http.NewRequest("POST", n.URL.String(), bytes.NewBuffer(notifJSON))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
resp.Body.Close()
logger.Info.Printf("Work %s#%d notified to %s\n", work.Name, work.ID, n.URL.String())
return nil
}

55
pkg/notification/main.go Normal file
View File

@ -0,0 +1,55 @@
package notification
import (
"fmt"
"net/url"
"strings"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
)
// Notifier is able to send a notification.
type Notifier interface {
Notify(work *model.WorkRequest) error
}
var notifier Notifier
// Notify is the global method to notify work
func Notify(work *model.WorkRequest) {
if notifier == nil {
return
}
if err := notifier.Notify(work); err != nil {
logger.Error.Printf("Unable to send notification of work %s#%d: %v\n", work.Name, work.ID, err)
}
}
// Init creates a notifier regarding the URI.
func Init(uri string) error {
if uri == "" {
return nil
}
u, err := url.Parse(uri)
if err != nil {
return fmt.Errorf("invalid notification URL: %s", uri)
}
switch u.Scheme {
case "mailto":
notifier = newSMTPNotifier(u)
case "http", "https":
notifier = newHTTPNotifier(u)
default:
return fmt.Errorf("unable to create notifier: %v", err)
}
return nil
}
func getValueOrAlt(values url.Values, key, alt string) string {
if val, ok := values[key]; ok {
return strings.Join(val[:], ",")
}
return alt
}

View File

@ -1,27 +0,0 @@
package notification
import (
"errors"
"os"
)
// Notifier is able to send a notification.
type Notifier interface {
Notify(subject string, text string, attachfile string)
}
// NotifierFactory creates a notifier regarding the configuration.
func NotifierFactory() (Notifier, error) {
notifier := os.Getenv("APP_NOTIFIER")
switch notifier {
case "http":
return newHTTPNotifier(), nil
case "smtp":
return newSMTPNotifier(), nil
default:
if notifier == "" {
return nil, errors.New("notification provider not configured")
}
return nil, errors.New("unknown notification provider: " + notifier)
}
}

View File

@ -4,75 +4,80 @@ import (
"fmt"
"log"
"net/smtp"
"os"
"net/url"
"strings"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
)
// SMTPNotifier is able to send notifcation to a email destination.
type SMTPNotifier struct {
Host string
From string
To string
Host string
From string
To string
PrefixFilter string
}
func newSMTPNotifier() *SMTPNotifier {
notifier := new(SMTPNotifier)
notifier.Host = os.Getenv("APP_SMTP_NOTIFIER_HOST")
if notifier.Host == "" {
notifier.Host = "localhost:25"
func newSMTPNotifier(uri *url.URL) *SMTPNotifier {
logger.Info.Println("Using SMTP notification system: ", uri.Opaque)
return &SMTPNotifier{
Host: getValueOrAlt(uri.Query(), "smtp", "localhost:25"),
From: getValueOrAlt(uri.Query(), "from", "noreply@nunux.org"),
To: uri.Opaque,
PrefixFilter: getValueOrAlt(uri.Query(), "prefix", "notify:"),
}
notifier.From = os.Getenv("APP_NOTIFIER_FROM")
if notifier.From == "" {
notifier.From = "webhookd <noreply@nunux.org>"
}
notifier.To = os.Getenv("APP_NOTIFIER_TO")
if notifier.To == "" {
notifier.To = "hostmaster@nunux.org"
}
return notifier
}
// Notify send a notification to a email destination.
func (n *SMTPNotifier) Notify(subject string, text string, attachfile string) {
logger.Debug.Println("SMTP notification: ", subject)
func (n *SMTPNotifier) Notify(work *model.WorkRequest) error {
// Get email body
payload := work.GetLogContent(n.PrefixFilter)
if strings.TrimSpace(payload) == "" {
// Nothing to notify, abort
return nil
}
// Buidl subject
var subject string
if work.Status == model.Success {
subject = fmt.Sprintf("Webhook %s#%d SUCCESS.", work.Name, work.ID)
} else {
subject = fmt.Sprintf("Webhook %s#%d FAILED.", work.Name, work.ID)
}
// Connect to the remote SMTP server.
c, err := smtp.Dial(n.Host)
if err != nil {
log.Println(err)
return
return err
}
// Set the sender and recipient first
if err := c.Mail(n.From); err != nil {
log.Println(err)
return
return err
}
if err := c.Rcpt(n.To); err != nil {
log.Println(err)
return
return err
}
// Send the email body.
wc, err := c.Data()
if err != nil {
log.Println(err)
return
return err
}
_, err = fmt.Fprintf(wc, text)
_, err = fmt.Fprintf(wc, "Subject: %s\r\n\r\n%s\r\n\r\n", subject, payload)
if err != nil {
log.Println(err)
return
return err
}
err = wc.Close()
if err != nil {
log.Println(err)
return
return err
}
logger.Info.Printf("Work %s#%d notified to %s\n", work.Name, work.ID, n.To)
// Send the QUIT command and close the connection.
err = c.Quit()
if err != nil {
log.Fatal(err)
}
return c.Quit()
}

View File

@ -1,39 +0,0 @@
package tools
import (
"bufio"
"compress/gzip"
"fmt"
"os"
"github.com/ncarlier/webhookd/pkg/logger"
)
// CompressFile is a simple file gzipper.
func CompressFile(filename string) (zipfile string, err error) {
zipfile = fmt.Sprintf("%s.gz", filename)
in, err := os.Open(filename)
if err != nil {
return
}
out, err := os.Create(zipfile)
if err != nil {
logger.Error.Println("Unable to create gzip file", err)
return
}
// buffer readers from file, writes to pipe
bufin := bufio.NewReader(in)
// gzip wraps buffer writer and wr
gw := gzip.NewWriter(out)
defer gw.Close()
_, err = bufin.WriteTo(gw)
if err != nil {
logger.Error.Println("Unable to write into the gzip file", err)
return
}
logger.Debug.Println("Gzip file created: ", zipfile)
return
}

View File

@ -2,15 +2,19 @@ package worker
import (
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
)
var WorkerQueue chan chan WorkRequest
var WorkQueue = make(chan WorkRequest, 100)
// WorkerQueue is the gloabl queue of Workers
var WorkerQueue chan chan model.WorkRequest
// WorkQueue is the global queue of work to dispatch
var WorkQueue = make(chan model.WorkRequest, 100)
// StartDispatcher is charged to start n workers.
func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the workers' work channels into.
WorkerQueue = make(chan chan WorkRequest, nworkers)
WorkerQueue = make(chan chan model.WorkRequest, nworkers)
// Now, create all of our workers.
for i := 0; i < nworkers; i++ {

View File

@ -5,27 +5,14 @@ import (
"os"
"path"
"path/filepath"
"time"
"github.com/ncarlier/webhookd/pkg/config"
"github.com/ncarlier/webhookd/pkg/tools"
)
// getLogDir returns log directory
func getLogDir() string {
if value, ok := os.LookupEnv("APP_LOG_DIR"); ok {
return value
}
return os.TempDir()
}
func createLogFile(work *WorkRequest) (*os.File, error) {
logFilename := path.Join(getLogDir(), fmt.Sprintf("%s_%d_%s.txt", tools.ToSnakeCase(work.Name), work.ID, time.Now().Format("20060102_1504")))
return os.Create(logFilename)
}
// GetLogFile retrieve work log with its name and id
func GetLogFile(id, name string) (*os.File, error) {
logPattern := path.Join(getLogDir(), fmt.Sprintf("%s_%s_*.txt", tools.ToSnakeCase(name), id))
// RetrieveLogFile retrieve work log with its name and id
func RetrieveLogFile(id, name string) (*os.File, error) {
logPattern := path.Join(*config.Get().LogDir, fmt.Sprintf("%s_%s_*.txt", tools.ToSnakeCase(name), id))
files, err := filepath.Glob(logPattern)
if err != nil {
return nil, err

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
)
// ChanWriter is a simple writer to a channel of byte.
@ -22,8 +23,8 @@ func (c *ChanWriter) Write(p []byte) (int, error) {
return len(p), nil
}
func run(work *WorkRequest) error {
work.Status = Running
func run(work *model.WorkRequest) error {
work.Status = model.Running
logger.Info.Printf("Work %s#%d started...\n", work.Name, work.ID)
logger.Debug.Printf("Work %s#%d script: %s\n", work.Name, work.ID, work.Script)
logger.Debug.Printf("Work %s#%d parameter: %v\n", work.Name, work.ID, work.Args)
@ -41,7 +42,7 @@ func run(work *WorkRequest) error {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// Open the log file for writing
logFile, err := createLogFile(work)
logFile, err := os.Create(work.LogFilename)
if err != nil {
return work.Terminate(err)
}

View File

@ -6,9 +6,10 @@ import (
"github.com/ncarlier/webhookd/pkg/assert"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
)
func printWorkMessages(work *WorkRequest) {
func printWorkMessages(work *model.WorkRequest) {
go func() {
for {
msg, open := <-work.MessageChan
@ -28,16 +29,17 @@ func TestWorkRunner(t *testing.T) {
"user_agent=test",
}
payload := "{\"foo\": \"bar\"}"
work := NewWorkRequest("test", script, payload, args, 5)
work := model.NewWorkRequest("test", script, payload, args, 5)
assert.NotNil(t, work, "")
printWorkMessages(work)
err := run(work)
assert.Nil(t, err, "")
assert.Equal(t, work.Status, Success, "")
assert.Equal(t, work.Status, model.Success, "")
assert.Equal(t, work.GetLogContent("notify:"), "OK\n", "")
// Test that log file is ok
// Test that we can retrieve log file afterward
id := strconv.FormatUint(work.ID, 10)
logFile, err := GetLogFile(id, "test")
logFile, err := RetrieveLogFile(id, "test")
defer logFile.Close()
assert.Nil(t, err, "Log file should exists")
assert.NotNil(t, logFile, "Log file should be retrieve")
@ -46,23 +48,23 @@ func TestWorkRunner(t *testing.T) {
func TestWorkRunnerWithError(t *testing.T) {
logger.Init("debug")
script := "../../tests/test_error.sh"
work := NewWorkRequest("test", script, "", []string{}, 5)
work := model.NewWorkRequest("test", script, "", []string{}, 5)
assert.NotNil(t, work, "")
printWorkMessages(work)
err := run(work)
assert.NotNil(t, err, "")
assert.Equal(t, work.Status, Error, "")
assert.Equal(t, work.Status, model.Error, "")
assert.Equal(t, "exit status 1", err.Error(), "")
}
func TestWorkRunnerWithTimeout(t *testing.T) {
logger.Init("debug")
script := "../../tests/test_timeout.sh"
work := NewWorkRequest("test", script, "", []string{}, 1)
work := model.NewWorkRequest("test", script, "", []string{}, 1)
assert.NotNil(t, work, "")
printWorkMessages(work)
err := run(work)
assert.NotNil(t, err, "")
assert.Equal(t, work.Status, Error, "")
assert.Equal(t, work.Status, model.Error, "")
assert.Equal(t, "signal: killed", err.Error(), "")
}

View File

@ -4,18 +4,18 @@ import (
"fmt"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
"github.com/ncarlier/webhookd/pkg/notification"
"github.com/ncarlier/webhookd/pkg/tools"
)
// NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {
func NewWorker(id int, workerQueue chan chan model.WorkRequest) Worker {
// Create, and return the worker.
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
Work: make(chan model.WorkRequest),
WorkerQueue: workerQueue,
QuitChan: make(chan bool)}
@ -25,8 +25,8 @@ func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {
// Worker is a go routine in charge of executing a work.
type Worker struct {
ID int
Work chan WorkRequest
WorkerQueue chan chan WorkRequest
Work chan model.WorkRequest
WorkerQueue chan chan model.WorkRequest
QuitChan chan bool
}
@ -45,11 +45,12 @@ func (w Worker) Start() {
err := run(&work)
if err != nil {
work.MessageChan <- []byte(fmt.Sprintf("error: %s", err.Error()))
// notify(subject, err.Error(), filename)
} else {
work.MessageChan <- []byte("done")
// notify(subject, "See attachment.", filename)
}
// Send notification
notification.Notify(&work)
close(work.MessageChan)
case <-w.QuitChan:
logger.Debug.Printf("Stopping worker #%d...\n", w.ID)
@ -66,26 +67,3 @@ func (w Worker) Stop() {
w.QuitChan <- true
}()
}
func notify(subject string, text string, outfilename string) {
var notifier, err = notification.NotifierFactory()
if err != nil {
logger.Debug.Println("Unable to get the notifier. Notification skipped:", err)
return
}
if notifier == nil {
logger.Debug.Println("Notification provider not found. Notification skipped.")
return
}
var zipfile string
if outfilename != "" {
zipfile, err = tools.CompressFile(outfilename)
if err != nil {
fmt.Println(err)
zipfile = outfilename
}
}
notifier.Notify(subject, text, zipfile)
}

View File

@ -11,4 +11,6 @@ echo "Testing payload..."
[ -z "$1" ] && echo "Payload undefined" && exit 1
[ "$1" != "{\"foo\": \"bar\"}" ] && echo "Invalid payload: $1" && exit 1
echo "notify: OK"
exit 0