From b71c506587571dc3dcdbb9d5cc14311f5cd2ddd4 Mon Sep 17 00:00:00 2001 From: Nicolas Carlier Date: Fri, 31 Oct 2014 23:18:38 +0000 Subject: [PATCH] feat: Create worker queue. --- Makefile | 10 +-- src/notification/http_notifier.go | 2 +- src/tools/compress.go | 6 +- src/webhookd.go | 120 +++++++----------------------- src/worker/dispatcher.go | 35 +++++++++ src/worker/script_runner.go | 50 +++++++++++++ src/worker/work_request.go | 7 ++ src/worker/worker.go | 89 ++++++++++++++++++++++ 8 files changed, 216 insertions(+), 103 deletions(-) create mode 100644 src/worker/dispatcher.go create mode 100644 src/worker/script_runner.go create mode 100644 src/worker/work_request.go create mode 100644 src/worker/worker.go diff --git a/Makefile b/Makefile index 6076e6c..3be2ff6 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ .SILENT : -.PHONY : volume build clean run shell test +.PHONY : volume dev build clean run shell test dist USERNAME:=ncarlier APPNAME:=webhookd @@ -18,16 +18,16 @@ define docker_run_flags -i -t endef -ifdef DEVMODE - docker_run_flags += --volumes-from $(APPNAME)_volumes -endif - all: build volume: echo "Building $(APPNAME) volumes..." sudo docker run -v $(PWD):/opt/$(APPNAME) -v ~/var/$(APPNAME):/var/opt/$(APPNAME) --name $(APPNAME)_volumes busybox true +dev: + $(eval docker_run_flags += --volumes-from $(APPNAME)_volumes) + echo "DEVMODE: Using volumes from $(APPNAME)_volumes" + build: echo "Building $(IMAGE) docker image..." sudo docker build --rm -t $(IMAGE) . diff --git a/src/notification/http_notifier.go b/src/notification/http_notifier.go index 64485b0..d88e068 100644 --- a/src/notification/http_notifier.go +++ b/src/notification/http_notifier.go @@ -102,7 +102,7 @@ func (n *HttpNotifier) Notify(subject string, text string, attachfile string) { // Check the response if res.StatusCode != http.StatusOK { - log.Println("bad status: %s", res.Status) + log.Println("bad status: ", res.Status) log.Println(res.Body) return } diff --git a/src/tools/compress.go b/src/tools/compress.go index f1e8479..c6b14cb 100644 --- a/src/tools/compress.go +++ b/src/tools/compress.go @@ -16,7 +16,7 @@ func CompressFile(filename string) (zipfile string, err error) { } out, err := os.Create(zipfile) if err != nil { - log.Println("Unable to create zip file", err) + log.Println("Unable to create gzip file", err) return } @@ -29,9 +29,9 @@ func CompressFile(filename string) (zipfile string, err error) { _, err = bufin.WriteTo(gw) if err != nil { - log.Println("Unable to write into the zip file", err) + log.Println("Unable to write into the gzip file", err) return } - log.Println("Zip file created: ", zipfile) + log.Println("Gzip file created: ", zipfile) return } diff --git a/src/webhookd.go b/src/webhookd.go index 01fedff..5ab39cd 100644 --- a/src/webhookd.go +++ b/src/webhookd.go @@ -5,105 +5,32 @@ import ( "fmt" "github.com/gorilla/mux" "github.com/ncarlier/webhookd/hook" - "github.com/ncarlier/webhookd/notification" - "github.com/ncarlier/webhookd/tools" + "github.com/ncarlier/webhookd/worker" "log" "net/http" - "os" - "os/exec" - "path" ) var ( - laddr = flag.String("l", ":8080", "HTTP service address (e.g.address, ':8080')") - workingdir = os.Getenv("APP_WORKING_DIR") - scriptsdir = os.Getenv("APP_SCRIPTS_DIR") + LAddr = flag.String("l", ":8080", "HTTP service address (e.g.address, ':8080')") + NWorkers = flag.Int("n", 2, "The number of workers to start") ) -type HookContext struct { - Hook string - Action string - args []string -} - -func Notify(subject string, text string, outfilename string) { - var notifier, err = notification.NotifierFactory() - if err != nil { - log.Println(err) - return - } - if notifier == nil { - log.Println("Notification provider not found.") - return - } - - var zipfile string - if outfilename != "" { - zipfile, err = tools.CompressFile(outfilename) - if err != nil { - log.Println(err) - zipfile = outfilename - } - } - - notifier.Notify(subject, text, zipfile) -} - -func RunScript(w http.ResponseWriter, context *HookContext) { - scriptname := path.Join(scriptsdir, context.Hook, fmt.Sprintf("%s.sh", context.Action)) - log.Println("Exec script: ", scriptname) - - cmd := exec.Command(scriptname, context.args...) - var ErrorHandler func(err error, out string) - ErrorHandler = func(err error, out string) { - subject := fmt.Sprintf("Webhook %s/%s FAILED.", context.Hook, context.Action) - Notify(subject, err.Error(), out) - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - // open the out file for writing - outfilename := path.Join(workingdir, fmt.Sprintf("%s-%s.txt", context.Hook, context.Action)) - outfile, err := os.Create(outfilename) - if err != nil { - ErrorHandler(err, "") - return - } - - defer outfile.Close() - cmd.Stdout = outfile - - err = cmd.Start() - if err != nil { - ErrorHandler(err, "") - return - } - - err = cmd.Wait() - if err != nil { - ErrorHandler(err, outfilename) - return - } - - subject := fmt.Sprintf("Webhook %s/%s SUCCEEDED.", context.Hook, context.Action) - Notify(subject, "See attached file for logs.", outfilename) - fmt.Fprintf(w, subject) -} - func Handler(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) - context := new(HookContext) - context.Hook = params["hookname"] - context.Action = params["action"] + hookname := params["hookname"] + action := params["action"] - var record, err = hook.RecordFactory(context.Hook) + // Get hook decoder + record, err := hook.RecordFactory(hookname) if err != nil { log.Println(err.Error()) http.Error(w, err.Error(), http.StatusNotFound) return } - log.Println("Using hook: ", context.Hook) + fmt.Printf("Using hook %s with action %s.\n", hookname, action) + // Decode request err = record.Decode(r) if err != nil { log.Println(err.Error()) @@ -111,27 +38,32 @@ func Handler(w http.ResponseWriter, r *http.Request) { return } - log.Println("Extracted data: ", record.GetURL(), record.GetName()) - context.args = []string{record.GetURL(), record.GetName()} + // Create work + work := new(worker.WorkRequest) + work.Name = hookname + work.Action = action + fmt.Println("Extracted data: ", record.GetURL(), record.GetName()) + work.Args = []string{record.GetURL(), record.GetName()} - RunScript(w, context) + //Put work in queue + worker.WorkQueue <- *work + fmt.Printf("Work request queued: %s/%s\n", hookname, action) + + fmt.Fprintf(w, "Action %s of hook %s queued.", action, hookname) } func main() { - if workingdir == "" { - workingdir = os.TempDir() - } - if scriptsdir == "" { - scriptsdir = "scripts" - } - flag.Parse() + // Start the dispatcher. + fmt.Println("Starting the dispatcher") + worker.StartDispatcher(*NWorkers) + rtr := mux.NewRouter() rtr.HandleFunc("/{hookname:[a-z]+}/{action:[a-z]+}", Handler).Methods("POST") http.Handle("/", rtr) - log.Println("webhookd server listening...") - log.Fatal(http.ListenAndServe(*laddr, nil)) + fmt.Println("webhookd server listening...") + log.Fatal(http.ListenAndServe(*LAddr, nil)) } diff --git a/src/worker/dispatcher.go b/src/worker/dispatcher.go new file mode 100644 index 0000000..c07de51 --- /dev/null +++ b/src/worker/dispatcher.go @@ -0,0 +1,35 @@ +package worker + +import ( + "fmt" +) + +var WorkerQueue chan chan WorkRequest +var WorkQueue = make(chan WorkRequest, 100) + +func StartDispatcher(nworkers int) { + // First, initialize the channel we are going to but the workers' work channels into. + WorkerQueue = make(chan chan WorkRequest, nworkers) + + // Now, create all of our workers. + for i := 0; i < nworkers; i++ { + fmt.Println("Starting worker", i+1) + worker := NewWorker(i+1, WorkerQueue) + worker.Start() + } + + go func() { + for { + select { + case work := <-WorkQueue: + fmt.Println("Received work requeust") + go func() { + worker := <-WorkerQueue + + fmt.Println("Dispatching work request") + worker <- work + }() + } + } + }() +} diff --git a/src/worker/script_runner.go b/src/worker/script_runner.go new file mode 100644 index 0000000..e562c73 --- /dev/null +++ b/src/worker/script_runner.go @@ -0,0 +1,50 @@ +package worker + +import ( + "fmt" + "os" + "os/exec" + "path" +) + +var ( + workingdir = os.Getenv("APP_WORKING_DIR") + scriptsdir = os.Getenv("APP_SCRIPTS_DIR") +) + +func RunScript(work *WorkRequest) (string, error) { + if workingdir == "" { + workingdir = os.TempDir() + } + if scriptsdir == "" { + scriptsdir = "scripts" + } + + scriptname := path.Join(scriptsdir, work.Name, fmt.Sprintf("%s.sh", work.Action)) + fmt.Println("Exec script: ", scriptname) + + // Exec script... + cmd := exec.Command(scriptname, work.Args...) + + // Open the out file for writing + outfilename := path.Join(workingdir, fmt.Sprintf("%s-%s.txt", work.Name, work.Action)) + outfile, err := os.Create(outfilename) + if err != nil { + return "", err + } + + defer outfile.Close() + cmd.Stdout = outfile + + err = cmd.Start() + if err != nil { + return "", err + } + + err = cmd.Wait() + if err != nil { + return "", err + } + + return outfilename, nil +} diff --git a/src/worker/work_request.go b/src/worker/work_request.go new file mode 100644 index 0000000..0f12e5b --- /dev/null +++ b/src/worker/work_request.go @@ -0,0 +1,7 @@ +package worker + +type WorkRequest struct { + Name string + Action string + Args []string +} diff --git a/src/worker/worker.go b/src/worker/worker.go new file mode 100644 index 0000000..1d66016 --- /dev/null +++ b/src/worker/worker.go @@ -0,0 +1,89 @@ +package worker + +import ( + "fmt" + "github.com/ncarlier/webhookd/notification" + "github.com/ncarlier/webhookd/tools" +) + +// NewWorker creates, and returns a new Worker object. Its only argument +// is a channel that the worker can add itself to whenever it is done its +// work. +func NewWorker(id int, workerQueue chan chan WorkRequest) Worker { + // Create, and return the worker. + worker := Worker{ + ID: id, + Work: make(chan WorkRequest), + WorkerQueue: workerQueue, + QuitChan: make(chan bool)} + + return worker +} + +type Worker struct { + ID int + Work chan WorkRequest + WorkerQueue chan chan WorkRequest + QuitChan chan bool +} + +// This function "starts" the worker by starting a goroutine, that is +// an infinite "for-select" loop. +func (w Worker) Start() { + go func() { + for { + // Add ourselves into the worker queue. + w.WorkerQueue <- w.Work + + select { + case work := <-w.Work: + // Receive a work request. + fmt.Printf("worker%d: Received work request %s/%s\n", w.ID, work.Name, work.Action) + filename, err := RunScript(&work) + if err != nil { + subject := fmt.Sprintf("Webhook %s/%s FAILED.", work.Name, work.Action) + Notify(subject, err.Error(), "") + } else { + subject := fmt.Sprintf("Webhook %s/%s SUCCEEDED.", work.Name, work.Action) + Notify(subject, "See attachment.", filename) + } + case <-w.QuitChan: + // We have been asked to stop. + fmt.Printf("worker%d stopping\n", w.ID) + return + } + } + }() +} + +// Stop tells the worker to stop listening for work requests. +// +// Note that the worker will only stop *after* it has finished its work. +func (w Worker) Stop() { + go func() { + w.QuitChan <- true + }() +} + +func Notify(subject string, text string, outfilename string) { + var notifier, err = notification.NotifierFactory() + if err != nil { + fmt.Println(err) + return + } + if notifier == nil { + fmt.Println("Notification provideri not found.") + return + } + + var zipfile string + if outfilename != "" { + zipfile, err = tools.CompressFile(outfilename) + if err != nil { + fmt.Println(err) + zipfile = outfilename + } + } + + notifier.Notify(subject, text, zipfile) +}