diff --git a/src/jobservice_v2/api/handler.go b/src/jobservice_v2/api/handler.go index 13c34a46e..4c7c2d8ed 100644 --- a/src/jobservice_v2/api/handler.go +++ b/src/jobservice_v2/api/handler.go @@ -123,22 +123,28 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re return } - if jobActionReq.Action == opm.CtlCommandStop { + switch jobActionReq.Action { + case opm.CtlCommandStop: if err := dh.controller.StopJob(jobID); err != nil { dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err)) return } - } else if jobActionReq.Action == opm.CtlCommandCancel { - if err := dh.controller.StopJob(jobID); err != nil { - dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err)) + case opm.CtlCommandCancel: + if err := dh.controller.CancelJob(jobID); err != nil { + dh.handleError(w, http.StatusInternalServerError, errs.CancelJobError(err)) return } - } else { + case opm.CtlCommandRetry: + if err := dh.controller.RetryJob(jobID); err != nil { + dh.handleError(w, http.StatusInternalServerError, errs.RetryJobError(err)) + return + } + default: dh.handleError(w, http.StatusNotImplemented, errs.UnknownActionNameError(fmt.Errorf("%s", jobID))) return } - w.WriteHeader(http.StatusOK) + w.WriteHeader(http.StatusNoContent) //only header, no content returned } //HandleCheckStatusReq is implementation of method defined in interface 'Handler' diff --git a/src/jobservice_v2/errs/errors.go b/src/jobservice_v2/errs/errors.go index df245d613..5a472d526 100644 --- a/src/jobservice_v2/errs/errors.go +++ b/src/jobservice_v2/errs/errors.go @@ -28,6 +28,8 @@ const ( StopJobErrorCode //CancelJobErrorCode is code for the error of cancelling job CancelJobErrorCode + //RetryJobErrorCode is code for the error of retry job + RetryJobErrorCode //UnknownActionNameErrorCode is code for the case of unknown action name UnknownActionNameErrorCode ) @@ -97,6 +99,11 @@ func CancelJobError(err error) error { return New(CancelJobErrorCode, "Cancel job failed with error", err.Error()) } +//RetryJobError is error for the case of retrying job failed +func RetryJobError(err error) error { + return New(RetryJobErrorCode, "Retry job failed with error", err.Error()) +} + //UnknownActionNameError is error for the case of getting unknown job action func UnknownActionNameError(err error) error { return New(UnknownActionNameErrorCode, "Unknown job action name", err.Error()) @@ -124,7 +131,7 @@ type jobCancelledError struct { //JobCancelledError is error wrapper for the case of cancelling job. func JobCancelledError() error { - return jobStoppedError{ + return jobCancelledError{ baseError{ Code: JobStoppedErrorCode, Err: "Job is cancelled", diff --git a/src/jobservice_v2/job/impl/context.go b/src/jobservice_v2/job/impl/context.go index b3b072d27..7cf9cc0db 100644 --- a/src/jobservice_v2/job/impl/context.go +++ b/src/jobservice_v2/job/impl/context.go @@ -4,6 +4,7 @@ package impl import ( "context" + "errors" "reflect" hlog "github.com/vmware/harbor/src/common/utils/log" @@ -21,6 +22,9 @@ type Context struct { //op command func opCommandFunc job.CheckOPCmdFunc + + //checkin func + checkInFunc job.CheckInFunc } //NewContext ... @@ -51,6 +55,14 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) { } } + if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok { + if reflect.TypeOf(checkInFunc).Kind() == reflect.Func { + if funcRef, ok := checkInFunc.(job.CheckInFunc); ok { + jContext.checkInFunc = funcRef + } + } + } + return jContext, nil } @@ -66,6 +78,12 @@ func (c *Context) SystemContext() context.Context { //Checkin is bridge func for reporting detailed status func (c *Context) Checkin(status string) error { + if c.checkInFunc != nil { + c.checkInFunc(status) + } else { + return errors.New("nil check in function") + } + return nil } diff --git a/src/jobservice_v2/job/impl/replication_job.go b/src/jobservice_v2/job/impl/replication_job.go index dfb9ee9e0..5feb03cfa 100644 --- a/src/jobservice_v2/job/impl/replication_job.go +++ b/src/jobservice_v2/job/impl/replication_job.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "github.com/vmware/harbor/src/jobservice_v2/opm" + "github.com/vmware/harbor/src/jobservice_v2/errs" "github.com/vmware/harbor/src/jobservice_v2/env" @@ -18,7 +20,12 @@ type ReplicationJob struct{} //MaxFails is implementation of same method in Interface. func (rj *ReplicationJob) MaxFails() uint { - return 2 + return 3 +} + +//ShouldRetry ... +func (rj *ReplicationJob) ShouldRetry() bool { + return true } //Validate is implementation of same method in Interface. @@ -47,13 +54,33 @@ func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}) fmt.Printf("params: %#v\n", params) fmt.Printf("context: %#v\n", ctx) + /*if 1 != 0 { + return errors.New("I suicide") + }*/ + + fmt.Println("check in 30%") + ctx.Checkin("30%") + time.Sleep(5 * time.Second) + fmt.Println("check in 60%") + ctx.Checkin("60%") + time.Sleep(5 * time.Second) + fmt.Println("check in 100%") + ctx.Checkin("100%") + time.Sleep(1 * time.Second) + //HOLD ON FOR A WHILE fmt.Println("Holding for 20 sec") <-time.After(20 * time.Second) - fmt.Println("I'm back, check if I'm stopped") + fmt.Println("I'm back, check if I'm stopped/cancelled") if cmd, ok := ctx.OPCommand(); ok { fmt.Printf("cmd=%s\n", cmd) + if cmd == opm.CtlCommandCancel { + fmt.Println("exit for receiving cancel signal") + return errs.JobCancelledError() + } + + fmt.Println("exit for receiving stop signal") return errs.JobStoppedError() } diff --git a/src/jobservice_v2/job/interface.go b/src/jobservice_v2/job/interface.go index 5662c6a11..15d21703e 100644 --- a/src/jobservice_v2/job/interface.go +++ b/src/jobservice_v2/job/interface.go @@ -9,14 +9,24 @@ import "github.com/vmware/harbor/src/jobservice_v2/env" //command code for job to determine if take corresponding action. type CheckOPCmdFunc func() (string, bool) +//CheckInFunc is designed for job to report more detailed progress info +type CheckInFunc func(message string) + //Interface defines the related injection and run entry methods. type Interface interface { //Declare how many times the job can be retried if failed. // //Return: - // uint: the failure count allowed + // uint: the failure count allowed. If it is set to 0, then default value 4 is used. MaxFails() uint + //Tell the worker pool if retry the failed job when the fails is + //still less that the number declared by the method 'MaxFails'. + // + //Returns: + // true for retry and false for none-retry + ShouldRetry() bool + //Indicate whether the parameters of job are valid. // //Return: diff --git a/src/jobservice_v2/job/redis_job_wrapper.go b/src/jobservice_v2/job/redis_job_wrapper.go deleted file mode 100644 index ddb5d2e7e..000000000 --- a/src/jobservice_v2/job/redis_job_wrapper.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2018 The Harbor Authors. All rights reserved. - -package job - -import ( - "github.com/gocraft/work" - "github.com/vmware/harbor/src/jobservice_v2/env" - "github.com/vmware/harbor/src/jobservice_v2/errs" -) - -//StatusChangeCallback is the func called when job status changed -type StatusChangeCallback func(jobID string, status string) - -//CheckOPCmdFuncFactoryFunc is used to generate CheckOPCmdFunc func for the specified job -type CheckOPCmdFuncFactoryFunc func(jobID string) CheckOPCmdFunc - -//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool. -type RedisJob struct { - job interface{} - context *env.Context - callback StatusChangeCallback - opCmdFuncFactory CheckOPCmdFuncFactoryFunc -} - -//NewRedisJob is constructor of RedisJob -func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback, opCmdFuncFactory CheckOPCmdFuncFactoryFunc) *RedisJob { - return &RedisJob{j, ctx, statusChangeCallback, opCmdFuncFactory} -} - -//Run the job -func (rj *RedisJob) Run(j *work.Job) error { - //Build job execution context - jData := env.JobData{ - ID: j.ID, - Name: j.Name, - Args: j.Args, - ExtraData: make(map[string]interface{}), - } - jData.ExtraData["opCommandFunc"] = rj.opCmdFuncFactory(j.ID) - execContext, err := rj.context.JobContext.Build(jData) - if err != nil { - return err - } - - //Inject data - runningJob := Wrap(rj.job) - //Start to run - rj.callback(j.ID, JobStatusRunning) - - //TODO: Check function should be defined - err = runningJob.Run(execContext, j.Args) - - if err == nil { - rj.callback(j.ID, JobStatusSuccess) - return nil - } - - if errs.IsJobStoppedError(err) { - rj.callback(j.ID, JobStatusStopped) - return nil // no need to put it into the dead queue for resume - } - - if errs.IsJobCancelledError(err) { - rj.callback(j.ID, JobStatusCancelled) - return err //need to resume - } - - rj.callback(j.ID, JobStatusError) - return err - - //TODO: - //Need to consider how to rm the retry option -} diff --git a/src/jobservice_v2/job/runner.go b/src/jobservice_v2/job/runner.go deleted file mode 100644 index 1a224d728..000000000 --- a/src/jobservice_v2/job/runner.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2018 The Harbor Authors. All rights reserved. - -package job - -import "reflect" - -//Wrap returns a new (job.)Interface based on the wrapped job handler reference. -func Wrap(j interface{}) Interface { - theType := reflect.TypeOf(j) - - if theType.Kind() == reflect.Ptr { - theType = theType.Elem() - } - - //Crate new - v := reflect.New(theType).Elem() - return v.Addr().Interface().(Interface) -} diff --git a/src/jobservice_v2/models/models.go b/src/jobservice_v2/models/models.go index c48794428..248ac66be 100644 --- a/src/jobservice_v2/models/models.go +++ b/src/jobservice_v2/models/models.go @@ -44,6 +44,7 @@ type JobStatData struct { RunAt int64 `json:"run_at,omitempty"` CheckIn string `json:"check_in,omitempty"` CheckInAt int64 `json:"check_in_at,omitempty"` + DieAt int64 `json:"die_at,omitempty"` } //JobPoolStats represent the healthy and status of the job service. diff --git a/src/jobservice_v2/opm/job_stats_mgr.go b/src/jobservice_v2/opm/job_stats_mgr.go index 4ea2d182a..ec7a0d713 100644 --- a/src/jobservice_v2/opm/job_stats_mgr.go +++ b/src/jobservice_v2/opm/job_stats_mgr.go @@ -54,7 +54,7 @@ type JobStatsManager interface { // error if meet any problems Retry(jobID string) error - //CtlCommand check if control command is fired for the specified job. + //CtlCommand checks if control command is fired for the specified job. // //jobID string : ID of the job // @@ -62,4 +62,18 @@ type JobStatsManager interface { // the command if it was fired // error if it was not fired yet to meet some other problems CtlCommand(jobID string) (string, error) + + //CheckIn message for the specified job like detailed progress info. + // + //jobID string : ID of the job + //message string : The message being checked in + // + CheckIn(jobID string, message string) + + //DieAt marks the failed jobs with the time they put into dead queue. + // + //jobID string : ID of the job + //message string : The message being checked in + // + DieAt(jobID string, dieAt int64) } diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr.go b/src/jobservice_v2/opm/redis_job_stats_mgr.go index 216db0637..b26b7d579 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr.go @@ -5,6 +5,7 @@ package opm import ( "context" "errors" + "fmt" "math/rand" "strconv" "time" @@ -26,12 +27,16 @@ const ( processBufferSize = 1024 opSaveStats = "save_job_stats" opUpdateStatus = "update_job_status" + opCheckIn = "check_in" + opDieAt = "mark_die_at" maxFails = 3 //CtlCommandStop : command stop CtlCommandStop = "stop" //CtlCommandCancel : command cancel CtlCommandCancel = "cancel" + //CtlCommandRetry : command retry + CtlCommandRetry = "retry" //Copy from period.enqueuer periodicEnqueuerHorizon = 4 * time.Minute @@ -90,6 +95,7 @@ func (rjs *RedisJobStatsManager) Shutdown() { } //Save is implementation of same method in JobStatsManager interface. +//Async method func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) { item := &queueItem{ op: opSaveStats, @@ -100,6 +106,7 @@ func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) { } //Retrieve is implementation of same method in JobStatsManager interface. +//Sync method func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) { if utils.IsEmptyStr(jobID) { return models.JobStats{}, errors.New("empty job ID") @@ -109,6 +116,7 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) } //SetJobStatus is implementation of same method in JobStatsManager interface. +//Async method func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) { if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(status) { return @@ -135,25 +143,27 @@ func (rjs *RedisJobStatsManager) loop() { for { select { case item := <-rjs.processChan: - if err := rjs.process(item); err != nil { - item.fails++ - if item.fails < maxFails { - //Retry after a random interval - go func() { - timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second) - defer timer.Stop() + go func(item *queueItem) { + if err := rjs.process(item); err != nil { + item.fails++ + if item.fails < maxFails { + //Retry after a random interval + go func() { + timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second) + defer timer.Stop() - select { - case <-timer.C: - rjs.processChan <- item - return - case <-controlChan: - } - }() - } else { - log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) + select { + case <-timer.C: + rjs.processChan <- item + return + case <-controlChan: + } + }() + } else { + log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) + } } - } + }(item) break case <-rjs.stopChan: rjs.doneChan <- struct{}{} @@ -165,7 +175,6 @@ func (rjs *RedisJobStatsManager) loop() { } //Stop the specified job. -//Async method, not blocking func (rjs *RedisJobStatsManager) Stop(jobID string) error { if utils.IsEmptyStr(jobID) { return errors.New("empty job ID") @@ -188,12 +197,17 @@ func (rjs *RedisJobStatsManager) Stop(jobID string) error { } } case job.JobKindPeriodic: - //firstly we need try to delete the job instances scheduled for this periodic job, a try best action - rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged - //secondly delete the periodic job policy + //firstly delete the periodic job policy if err := rjs.scheduler.UnSchedule(jobID); err != nil { return err } + //secondly we need try to delete the job instances scheduled for this periodic job, a try best action + rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged + //thirdly expire the job stats of this periodic job if exists + if err := rjs.expirePeriodicJobStats(theJob.Stats.JobID); err != nil { + //only logged + log.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err) + } default: break } @@ -212,13 +226,64 @@ func (rjs *RedisJobStatsManager) Stop(jobID string) error { //Cancel the specified job. //Async method, not blocking func (rjs *RedisJobStatsManager) Cancel(jobID string) error { + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + theJob, err := rjs.getJobStats(jobID) + if err != nil { + return err + } + + switch theJob.Stats.JobKind { + case job.JobKindGeneric: + if theJob.Stats.Status != job.JobStatusRunning { + return fmt.Errorf("only running job can be cancelled, job '%s' seems not running now", theJob.Stats.JobID) + } + + //Send 'cancel' ctl command to the running instance + if err := rjs.writeCtlCommand(jobID, CtlCommandCancel); err != nil { + return err + } + break + default: + return fmt.Errorf("job kind '%s' does not support 'cancel' operation", theJob.Stats.JobKind) + } + return nil } //Retry the specified job. //Async method, not blocking func (rjs *RedisJobStatsManager) Retry(jobID string) error { - return nil + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + theJob, err := rjs.getJobStats(jobID) + if err != nil { + return err + } + + if theJob.Stats.DieAt == 0 { + return fmt.Errorf("job '%s' is not a retryable job", jobID) + } + + return rjs.client.RetryDeadJob(theJob.Stats.DieAt, jobID) +} + +//CheckIn mesage +func (rjs *RedisJobStatsManager) CheckIn(jobID string, message string) { + if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(message) { + return + } + + item := &queueItem{ + op: opCheckIn, + data: []string{jobID, message}, + } + + rjs.processChan <- item } //CtlCommand checks if control command is fired for the specified job. @@ -230,6 +295,33 @@ func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) { return rjs.getCrlCommand(jobID) } +//DieAt marks the failed jobs with the time they put into dead queue. +func (rjs *RedisJobStatsManager) DieAt(jobID string, dieAt int64) { + if utils.IsEmptyStr(jobID) || dieAt == 0 { + return + } + + item := &queueItem{ + op: opDieAt, + data: []interface{}{jobID, dieAt}, + } + + rjs.processChan <- item +} + +func (rjs *RedisJobStatsManager) expirePeriodicJobStats(jobID string) error { + conn := rjs.redisPool.Get() + defer conn.Close() + + //The periodic job (policy) is stopped/unscheduled and then + //the stats of periodic job now can be expired + key := utils.KeyJobStats(rjs.namespace, jobID) + expireTime := 24 * 60 * 60 //1 day + _, err := conn.Do("EXPIRE", key, expireTime) + + return err +} + func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error { schedule, err := cron.Parse(cronSpec) if err != nil { @@ -245,7 +337,6 @@ func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID st //return the last error if occurred for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) { epoch := t.Unix() - log.Infof("epoch=%d\n", epoch) if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil { //only logged log.Errorf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err) @@ -301,11 +392,55 @@ func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) er key := utils.KeyJobStats(rjs.namespace, jobID) args := make([]interface{}, 0, 5) args = append(args, key, "status", status, "update_time", time.Now().Unix()) + if status == job.JobStatusSuccess { + //make sure the 'die_at' is reset in case it's a retrying job + args = append(args, "die_at", 0) + } _, err := conn.Do("HMSET", args...) return err } +func (rjs *RedisJobStatsManager) checkIn(jobID string, message string) error { + conn := rjs.redisPool.Get() + defer conn.Close() + + now := time.Now().Unix() + key := utils.KeyJobStats(rjs.namespace, jobID) + args := make([]interface{}, 0, 7) + args = append(args, key, "check_in", message, "check_in_at", now, "update_time", now) + _, err := conn.Do("HMSET", args...) + + return err +} + +func (rjs *RedisJobStatsManager) dieAt(jobID string, baseTime int64) error { + conn := rjs.redisPool.Get() + defer conn.Close() + + //Query the dead job in the time scope of [baseTime,baseTime+5] + key := utils.RedisKeyDead(rjs.namespace) + jobWithScores, err := utils.GetZsetByScore(rjs.redisPool, key, []int64{baseTime, baseTime + 5}) + if err != nil { + return err + } + + for _, jws := range jobWithScores { + if j, err := utils.DeSerializeJob(jws.JobBytes); err == nil { + if j.ID == jobID { + //Found + statsKey := utils.KeyJobStats(rjs.namespace, jobID) + args := make([]interface{}, 0, 7) + args = append(args, statsKey, "die_at", jws.Score, "update_time", time.Now().Unix()) + _, err := conn.Do("HMSET", args...) + return err + } + } + } + + return fmt.Errorf("seems %s is not a dead job", jobID) +} + func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, error) { conn := rjs.redisPool.Get() defer conn.Close() @@ -365,6 +500,9 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err case "cron_spec": res.Stats.CronSpec = value break + case "die_at": + v, _ := strconv.ParseInt(value, 10, 64) + res.Stats.DieAt = v default: } } @@ -397,6 +535,9 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error { "check_in_at", jobStats.Stats.CheckInAt, ) } + if jobStats.Stats.DieAt > 0 { + args = append(args, "die_at", jobStats.Stats.DieAt) + } conn.Send("HMSET", args...) //If job kind is periodic job, expire time should not be set @@ -425,6 +566,12 @@ func (rjs *RedisJobStatsManager) process(item *queueItem) error { case opUpdateStatus: data := item.data.([]string) return rjs.updateJobStatus(data[0], data[1]) + case opCheckIn: + data := item.data.([]string) + return rjs.checkIn(data[0], data[1]) + case opDieAt: + data := item.data.([]interface{}) + return rjs.dieAt(data[0].(string), data[1].(int64)) default: break } diff --git a/src/jobservice_v2/period/sweeper.go b/src/jobservice_v2/period/sweeper.go index bc22870a4..36c03b181 100644 --- a/src/jobservice_v2/period/sweeper.go +++ b/src/jobservice_v2/period/sweeper.go @@ -3,7 +3,6 @@ package period import ( - "errors" "fmt" "time" @@ -55,7 +54,7 @@ func (s *Sweeper) ClearOutdatedScheduledJobs() error { } nowEpoch := time.Now().Unix() - jobScores, err := GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch}) + jobScores, err := utils.GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch}) if err != nil { return err } @@ -94,32 +93,3 @@ func (s *Sweeper) ClearOutdatedScheduledJobs() error { } return fmt.Errorf("%s", errorSummary) } - -//JobScore represents the data item with score in the redis db. -type JobScore struct { - JobBytes []byte - Score int64 -} - -//GetZsetByScore get the items from the zset filtered by the specified score scope. -func GetZsetByScore(pool *redis.Pool, key string, scores []int64) ([]JobScore, error) { - if pool == nil || utils.IsEmptyStr(key) || len(scores) < 2 { - return nil, errors.New("bad arguments") - } - - conn := pool.Get() - defer conn.Close() - - values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, scores[0], scores[1], "WITHSCORES")) - if err != nil { - return nil, err - } - - var jobsWithScores []JobScore - - if err := redis.ScanSlice(values, &jobsWithScores); err != nil { - return nil, err - } - - return jobsWithScores, nil -} diff --git a/src/jobservice_v2/pool/redis_job_wrapper.go b/src/jobservice_v2/pool/redis_job_wrapper.go new file mode 100644 index 000000000..146dd22dd --- /dev/null +++ b/src/jobservice_v2/pool/redis_job_wrapper.go @@ -0,0 +1,151 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package pool + +import ( + "time" + + "github.com/gocraft/work" + "github.com/vmware/harbor/src/jobservice_v2/env" + "github.com/vmware/harbor/src/jobservice_v2/errs" + "github.com/vmware/harbor/src/jobservice_v2/job" + "github.com/vmware/harbor/src/jobservice_v2/opm" +) + +//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool. +type RedisJob struct { + job interface{} // the real job implementation + context *env.Context //context + statsManager opm.JobStatsManager //job stats manager +} + +//NewRedisJob is constructor of RedisJob +func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager) *RedisJob { + return &RedisJob{ + job: j, + context: ctx, + statsManager: statsManager, + } +} + +//Run the job +func (rj *RedisJob) Run(j *work.Job) error { + var cancelled = false + + execContext, err := rj.buildContext(j) + if err != nil { + return err + } + + runningJob := Wrap(rj.job) + defer func() { + if rj.shouldDisableRetry(runningJob, j, cancelled) { + j.Fails = 10000000000 //Make it big enough to avoid retrying + now := time.Now().Unix() + go func() { + timer := time.NewTimer(2 * time.Second) //make sure the failed job is already put into the dead queue + defer timer.Stop() + + <-timer.C + + rj.statsManager.DieAt(j.ID, now) + }() + } + }() + + //Start to run + rj.jobRunning(j.ID) + //Inject data + err = runningJob.Run(execContext, j.Args) + + //update the proper status + if err == nil { + rj.jobSucceed(j.ID) + return nil + } + + if errs.IsJobStoppedError(err) { + rj.jobStopped(j.ID) + return nil // no need to put it into the dead queue for resume + } + + if errs.IsJobCancelledError(err) { + rj.jobCancelled(j.ID) + cancelled = true + return err //need to resume + } + + rj.jobFailed(j.ID) + return err +} + +func (rj *RedisJob) jobRunning(jobID string) { + rj.statsManager.SetJobStatus(jobID, job.JobStatusRunning) +} + +func (rj *RedisJob) jobFailed(jobID string) { + rj.statsManager.SetJobStatus(jobID, job.JobStatusError) +} + +func (rj *RedisJob) jobStopped(jobID string) { + rj.statsManager.SetJobStatus(jobID, job.JobStatusStopped) +} + +func (rj *RedisJob) jobCancelled(jobID string) { + rj.statsManager.SetJobStatus(jobID, job.JobStatusCancelled) +} + +func (rj *RedisJob) jobSucceed(jobID string) { + rj.statsManager.SetJobStatus(jobID, job.JobStatusSuccess) +} + +func (rj *RedisJob) buildContext(j *work.Job) (env.JobContext, error) { + //Build job execution context + jData := env.JobData{ + ID: j.ID, + Name: j.Name, + Args: j.Args, + ExtraData: make(map[string]interface{}), + } + + checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc { + return func() (string, bool) { + cmd, err := rj.statsManager.CtlCommand(jobID) + if err != nil { + return "", false + } + return cmd, true + } + } + + jData.ExtraData["opCommandFunc"] = checkOPCmdFuncFactory(j.ID) + + checkInFuncFactory := func(jobID string) job.CheckInFunc { + return func(message string) { + rj.statsManager.CheckIn(jobID, message) + } + } + + jData.ExtraData["checkInFunc"] = checkInFuncFactory(j.ID) + + return rj.context.JobContext.Build(jData) +} + +func (rj *RedisJob) shouldDisableRetry(j job.Interface, wj *work.Job, cancelled bool) bool { + maxFails := j.MaxFails() + if maxFails == 0 { + maxFails = 4 //Consistent with backend worker pool + } + fails := wj.Fails + fails++ //as the fail is not returned to backend pool yet + + if cancelled && fails < int64(maxFails) { + return true + } + + if !cancelled && fails < int64(maxFails) && !j.ShouldRetry() { + return true + } + + return false +} diff --git a/src/jobservice_v2/pool/redis_pool.go b/src/jobservice_v2/pool/redis_pool.go index d782e0503..79afea53e 100644 --- a/src/jobservice_v2/pool/redis_pool.go +++ b/src/jobservice_v2/pool/redis_pool.go @@ -172,24 +172,10 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error { return errors.New("job must implement the job.Interface") } - //Use redis job wrapper pointer to keep the data required by the job.Interface. - statusChangeCallback := func(jobID string, status string) { - gcwp.statsManager.SetJobStatus(jobID, status) - } - //Define the concrete factory method for creating 'job.CheckOPCmdFunc'. - checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc { - return func() (string, bool) { - cmd, err := gcwp.statsManager.CtlCommand(jobID) - if err != nil { - return "", false - } - return cmd, true - } - } - redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback, checkOPCmdFuncFactory) + redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager) //Get more info from j - theJ := job.Wrap(j) + theJ := Wrap(j) gcwp.pool.JobWithOptions(name, work.JobOptions{MaxFails: theJ.MaxFails()}, @@ -350,7 +336,7 @@ func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error { //RetryJob retry the job func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error { - return nil + return gcwp.statsManager.Retry(jobID) } //IsKnownJob ... @@ -365,13 +351,13 @@ func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params m return errors.New("nil job type") } - theJ := job.Wrap(jobType) + theJ := Wrap(jobType) return theJ.Validate(params) } //log the job func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error { - log.Infof("Job incoming: %s:%s", job.ID, job.Name) + log.Infof("Job incoming: %s:%s", job.Name, job.ID) return next() } diff --git a/src/jobservice_v2/pool/runner.go b/src/jobservice_v2/pool/runner.go new file mode 100644 index 000000000..93479e40e --- /dev/null +++ b/src/jobservice_v2/pool/runner.go @@ -0,0 +1,22 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package pool + +import ( + "reflect" + + "github.com/vmware/harbor/src/jobservice_v2/job" +) + +//Wrap returns a new job.Interface based on the wrapped job handler reference. +func Wrap(j interface{}) job.Interface { + theType := reflect.TypeOf(j) + + if theType.Kind() == reflect.Ptr { + theType = theType.Elem() + } + + //Crate new + v := reflect.New(theType).Elem() + return v.Addr().Interface().(job.Interface) +} diff --git a/src/jobservice_v2/utils/gocarft_work.go b/src/jobservice_v2/utils/gocarft_work.go index 4254f5ad8..0dc8bf0bc 100644 --- a/src/jobservice_v2/utils/gocarft_work.go +++ b/src/jobservice_v2/utils/gocarft_work.go @@ -43,6 +43,11 @@ func RedisKeyLastPeriodicEnqueue(namespace string) string { return RedisNamespacePrefix(namespace) + "last_periodic_enqueue" } +//RedisKeyDead returns key of the dead jobs. +func RedisKeyDead(namespace string) string { + return RedisNamespacePrefix(namespace) + "dead" +} + var nowMock int64 //NowEpochSeconds ... diff --git a/src/jobservice_v2/utils/utils.go b/src/jobservice_v2/utils/utils.go index e7de0a5ae..b7f7fe0bc 100644 --- a/src/jobservice_v2/utils/utils.go +++ b/src/jobservice_v2/utils/utils.go @@ -4,8 +4,11 @@ package utils import ( + "errors" "os" "strings" + + "github.com/garyburd/redigo/redis" ) //IsEmptyStr check if the specified str is empty (len ==0) after triming prefix and suffix spaces. @@ -33,3 +36,32 @@ func FileExists(file string) bool { func IsValidPort(port uint) bool { return port != 0 && port < 65536 } + +//JobScore represents the data item with score in the redis db. +type JobScore struct { + JobBytes []byte + Score int64 +} + +//GetZsetByScore get the items from the zset filtered by the specified score scope. +func GetZsetByScore(pool *redis.Pool, key string, scores []int64) ([]JobScore, error) { + if pool == nil || IsEmptyStr(key) || len(scores) < 2 { + return nil, errors.New("bad arguments") + } + + conn := pool.Get() + defer conn.Close() + + values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, scores[0], scores[1], "WITHSCORES")) + if err != nil { + return nil, err + } + + var jobsWithScores []JobScore + + if err := redis.ScanSlice(values, &jobsWithScores); err != nil { + return nil, err + } + + return jobsWithScores, nil +}