From 5c5643b9cea78e951619d286499c4f47545d99b8 Mon Sep 17 00:00:00 2001 From: Tan Jiang Date: Thu, 12 May 2016 16:11:28 +0800 Subject: [PATCH] introduce worker pool to handle jobs --- job/scheduler.go | 5 +-- jobservice/main.go | 74 ++++++++++++++++++++++++++++++++++++++++-- jobservice/my_start.sh | 1 + 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/job/scheduler.go b/job/scheduler.go index 80d9859fa..566e18242 100644 --- a/job/scheduler.go +++ b/job/scheduler.go @@ -6,9 +6,10 @@ import ( "github.com/vmware/harbor/utils/log" ) +var JobQueue chan int64 = make(chan int64) + func Schedule(jobID int64) { - //TODO: introduce jobqueue to better control concurrent job numbers - go HandleRepJob(jobID) + JobQueue <- jobID } func HandleRepJob(id int64) { diff --git a/jobservice/main.go b/jobservice/main.go index 51efa3046..322b73a88 100644 --- a/jobservice/main.go +++ b/jobservice/main.go @@ -3,12 +3,82 @@ package main import ( "github.com/astaxie/beego" "github.com/vmware/harbor/dao" - _ "github.com/vmware/harbor/job/imgout" - // "github.com/vmware/harbor/utils/log" + "github.com/vmware/harbor/job" + "github.com/vmware/harbor/utils/log" + "os" + "strconv" ) +const defaultMaxWorkers int = 10 + +type Worker struct { + ID int + RepJobs chan int64 + quit chan bool +} + +func (w *Worker) Start() { + go func() { + for { + WorkerPool <- w + select { + case jobID := <-w.RepJobs: + log.Debugf("worker: %d, will handle job: %d", w.ID, jobID) + job.HandleRepJob(jobID) + case q := <-w.quit: + if q { + log.Debugf("worker: %d, will stop.", w.ID) + return + } + } + } + }() +} + +func (w *Worker) Stop() { + go func() { + w.quit <- true + }() +} + +var WorkerPool chan *Worker + func main() { dao.InitDB() initRouters() + initWorkerPool() + go dispatch() beego.Run() } + +func initWorkerPool() { + maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS") + maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32) + maxWorkers := int(maxWorkers64) + if err != nil { + log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers) + maxWorkers = defaultMaxWorkers + } + WorkerPool = make(chan *Worker, maxWorkers) + for i := 0; i < maxWorkers; i++ { + worker := &Worker{ + ID: i, + RepJobs: make(chan int64), + quit: make(chan bool), + } + worker.Start() + } +} + +func dispatch() { + for { + select { + case job := <-job.JobQueue: + go func(jobID int64) { + log.Debugf("Trying to dispatch job: %d", jobID) + worker := <-WorkerPool + worker.RepJobs <- jobID + }(job) + } + } +} diff --git a/jobservice/my_start.sh b/jobservice/my_start.sh index bbcff279c..a2343840d 100755 --- a/jobservice/my_start.sh +++ b/jobservice/my_start.sh @@ -6,5 +6,6 @@ export LOG_LEVEL=debug export UI_URL=http://127.0.0.1/ export UI_USR=admin export UI_PWD=Harbor12345 +export MAX_JOB_WORKERS=1 ./jobservice