diff --git a/src/jobservice_v2/api/handler.go b/src/jobservice_v2/api/handler.go index ea3656e5b..13c34a46e 100644 --- a/src/jobservice_v2/api/handler.go +++ b/src/jobservice_v2/api/handler.go @@ -6,6 +6,8 @@ import ( "io/ioutil" "net/http" + "github.com/vmware/harbor/src/jobservice_v2/opm" + "github.com/gorilla/mux" "github.com/vmware/harbor/src/jobservice_v2/core" @@ -77,6 +79,10 @@ func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Re //HandleGetJobReq is implementation of method defined in interface 'Handler' func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Request) { + if !dh.preCheck(w) { + return + } + vars := mux.Vars(req) jobID := vars["job_id"] @@ -97,11 +103,50 @@ func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Reque //HandleJobActionReq is implementation of method defined in interface 'Handler' func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Request) { + if !dh.preCheck(w) { + return + } + + vars := mux.Vars(req) + jobID := vars["job_id"] + + data, err := ioutil.ReadAll(req.Body) + if err != nil { + dh.handleError(w, http.StatusInternalServerError, errs.ReadRequestBodyError(err)) + return + } + + //unmarshal data + jobActionReq := models.JobActionRequest{} + if err = json.Unmarshal(data, &jobActionReq); err != nil { + dh.handleError(w, http.StatusInternalServerError, errs.HandleJSONDataError(err)) + return + } + + if jobActionReq.Action == opm.CtlCommandStop { + if err := dh.controller.StopJob(jobID); err != nil { + dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err)) + return + } + } else if jobActionReq.Action == opm.CtlCommandCancel { + if err := dh.controller.StopJob(jobID); err != nil { + dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err)) + return + } + } else { + dh.handleError(w, http.StatusNotImplemented, errs.UnknownActionNameError(fmt.Errorf("%s", jobID))) + return + } + w.WriteHeader(http.StatusOK) } //HandleCheckStatusReq is implementation of method defined in interface 'Handler' func (dh *DefaultHandler) HandleCheckStatusReq(w http.ResponseWriter, req *http.Request) { + if !dh.preCheck(w) { + return + } + stats, err := dh.controller.CheckStatus() if err != nil { dh.handleError(w, http.StatusInternalServerError, errs.CheckStatsError(err)) diff --git a/src/jobservice_v2/core/controller.go b/src/jobservice_v2/core/controller.go index 878d23c55..6a0d150f0 100644 --- a/src/jobservice_v2/core/controller.go +++ b/src/jobservice_v2/core/controller.go @@ -77,12 +77,29 @@ func (c *Controller) GetJob(jobID string) (models.JobStats, error) { //StopJob is implementation of same method in core interface. func (c *Controller) StopJob(jobID string) error { - return nil + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + return c.backendPool.StopJob(jobID) +} + +//CancelJob is implementation of same method in core interface. +func (c *Controller) CancelJob(jobID string) error { + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + return c.backendPool.CancelJob(jobID) } //RetryJob is implementation of same method in core interface. -func (c *Controller) RetryJob(jonID string) error { - return nil +func (c *Controller) RetryJob(jobID string) error { + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + return c.backendPool.RetryJob(jobID) } //CheckStatus is implementation of same method in core interface. diff --git a/src/jobservice_v2/core/interface.go b/src/jobservice_v2/core/interface.go index 71e93d334..f123365c2 100644 --- a/src/jobservice_v2/core/interface.go +++ b/src/jobservice_v2/core/interface.go @@ -43,6 +43,14 @@ type Interface interface { // error : Error returned if failed to retry the specified job. RetryJob(jonID string) error + //Cancel the job + // + //jobID string : ID of the enqueued job + // + //Returns: + // error : error returned if meet any problems + CancelJob(jobID string) error + //CheckStatus is used to handle the job service healthy status checking request. CheckStatus() (models.JobPoolStats, error) } diff --git a/src/jobservice_v2/env/job_context.go b/src/jobservice_v2/env/job_context.go index 55f27f7ba..cc498c66a 100644 --- a/src/jobservice_v2/env/job_context.go +++ b/src/jobservice_v2/env/job_context.go @@ -30,11 +30,27 @@ type JobContext interface { //Returns: // context.Context SystemContext() context.Context + + //Checkin is bridge func for reporting detailed status + // + //status string : detailed status + // + //Returns: + // error if meet any problems + Checkin(status string) error + + //OPCommand return the control operational command like stop/cancel if have + // + //Returns: + // op command if have + // flag to indicate if have command + OPCommand() (string, bool) } //JobData defines job context dependencies. type JobData struct { - ID string - Name string - Args map[string]interface{} + ID string + Name string + Args map[string]interface{} + ExtraData map[string]interface{} } diff --git a/src/jobservice_v2/errs/errors.go b/src/jobservice_v2/errs/errors.go index 5287f19f3..df245d613 100644 --- a/src/jobservice_v2/errs/errors.go +++ b/src/jobservice_v2/errs/errors.go @@ -10,6 +10,8 @@ import ( const ( //JobStoppedErrorCode is code for jobStoppedError JobStoppedErrorCode = 10000 + iota + //JobCancelledErrorCode is code for jobCancelledError + JobCancelledErrorCode //ReadRequestBodyErrorCode is code for the error of reading http request body error ReadRequestBodyErrorCode //HandleJSONDataErrorCode is code for the error of handling json data error @@ -22,13 +24,19 @@ const ( CheckStatsErrorCode //GetJobStatsErrorCode is code for the error of getting stats of enqueued job GetJobStatsErrorCode + //StopJobErrorCode is code for the error of stopping job + StopJobErrorCode + //CancelJobErrorCode is code for the error of cancelling job + CancelJobErrorCode + //UnknownActionNameErrorCode is code for the case of unknown action name + UnknownActionNameErrorCode ) //baseError ... type baseError struct { Code uint16 `json:"code"` Err string `json:"message"` - Description string `json:"details"` + Description string `json:"details,omitempty"` } //Error is implementation of error interface. @@ -79,12 +87,59 @@ func GetJobStatsError(err error) error { return New(GetJobStatsErrorCode, "Get job stats failed with error", err.Error()) } +//StopJobError is error for the case of stopping job failed +func StopJobError(err error) error { + return New(StopJobErrorCode, "Stop job failed with error", err.Error()) +} + +//CancelJobError is error for the case of cancelling job failed +func CancelJobError(err error) error { + return New(CancelJobErrorCode, "Cancel job failed with error", err.Error()) +} + +//UnknownActionNameError is error for the case of getting unknown job action +func UnknownActionNameError(err error) error { + return New(UnknownActionNameErrorCode, "Unknown job action name", err.Error()) +} + //jobStoppedError is designed for the case of stopping job. type jobStoppedError struct { baseError } +//JobStoppedError is error wrapper for the case of stopping job. +func JobStoppedError() error { + return jobStoppedError{ + baseError{ + Code: JobStoppedErrorCode, + Err: "Job is stopped", + }, + } +} + //jobCancelledError is designed for the case of cancelling job. type jobCancelledError struct { baseError } + +//JobCancelledError is error wrapper for the case of cancelling job. +func JobCancelledError() error { + return jobStoppedError{ + baseError{ + Code: JobStoppedErrorCode, + Err: "Job is cancelled", + }, + } +} + +//IsJobStoppedError return true if the error is jobStoppedError +func IsJobStoppedError(err error) bool { + _, ok := err.(jobStoppedError) + return ok +} + +//IsJobCancelledError return true if the error is jobCancelledError +func IsJobCancelledError(err error) bool { + _, ok := err.(jobCancelledError) + return ok +} diff --git a/src/jobservice_v2/job/impl/context.go b/src/jobservice_v2/job/impl/context.go index cfaa31180..b3b072d27 100644 --- a/src/jobservice_v2/job/impl/context.go +++ b/src/jobservice_v2/job/impl/context.go @@ -4,9 +4,11 @@ package impl import ( "context" + "reflect" hlog "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/jobservice_v2/env" + "github.com/vmware/harbor/src/jobservice_v2/job" ) //Context ... @@ -16,6 +18,9 @@ type Context struct { //Logger for job logger *hlog.Logger + + //op command func + opCommandFunc job.CheckOPCmdFunc } //NewContext ... @@ -33,9 +38,20 @@ func (c *Context) InitDao() error { //Build implements the same method in env.JobContext interface //This func will build the job execution context before running func (c *Context) Build(dep env.JobData) (env.JobContext, error) { - return &Context{ + jContext := &Context{ sysContext: c.sysContext, - }, nil + } + + //TODO:Init logger here + if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok { + if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func { + if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok { + jContext.opCommandFunc = funcRef + } + } + } + + return jContext, nil } //Get implements the same method in env.JobContext interface @@ -47,3 +63,17 @@ func (c *Context) Get(prop string) interface{} { func (c *Context) SystemContext() context.Context { return c.sysContext } + +//Checkin is bridge func for reporting detailed status +func (c *Context) Checkin(status string) error { + return nil +} + +//OPCommand return the control operational command like stop/cancel if have +func (c *Context) OPCommand() (string, bool) { + if c.opCommandFunc != nil { + return c.opCommandFunc() + } + + return "", false +} diff --git a/src/jobservice_v2/job/impl/replication_job.go b/src/jobservice_v2/job/impl/replication_job.go index f5af4f902..dfb9ee9e0 100644 --- a/src/jobservice_v2/job/impl/replication_job.go +++ b/src/jobservice_v2/job/impl/replication_job.go @@ -8,8 +8,9 @@ import ( "strings" "time" + "github.com/vmware/harbor/src/jobservice_v2/errs" + "github.com/vmware/harbor/src/jobservice_v2/env" - "github.com/vmware/harbor/src/jobservice_v2/job" ) //ReplicationJob is the job for replicating repositories. @@ -38,13 +39,23 @@ func (rj *ReplicationJob) Validate(params map[string]interface{}) error { } //Run the replication logic here. -func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}, f job.CheckOPCmdFunc) error { +func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}) error { + defer func() { + fmt.Println("I'm finished, exit!") + }() fmt.Println("=======Replication job running=======") fmt.Printf("params: %#v\n", params) fmt.Printf("context: %#v\n", ctx) //HOLD ON FOR A WHILE - fmt.Println("Holding for 10 sec") - <-time.After(10 * time.Second) + fmt.Println("Holding for 20 sec") + <-time.After(20 * time.Second) + fmt.Println("I'm back, check if I'm stopped") + + if cmd, ok := ctx.OPCommand(); ok { + fmt.Printf("cmd=%s\n", cmd) + return errs.JobStoppedError() + } + return nil } diff --git a/src/jobservice_v2/job/interface.go b/src/jobservice_v2/job/interface.go index e952cf3b6..5662c6a11 100644 --- a/src/jobservice_v2/job/interface.go +++ b/src/jobservice_v2/job/interface.go @@ -6,8 +6,8 @@ import "github.com/vmware/harbor/src/jobservice_v2/env" //CheckOPCmdFunc is the function to check if the related operation commands //like STOP or CANCEL is fired for the specified job. If yes, return the -//command code for job to determin if take corresponding action. -type CheckOPCmdFunc func(string) (uint, bool) +//command code for job to determine if take corresponding action. +type CheckOPCmdFunc func() (string, bool) //Interface defines the related injection and run entry methods. type Interface interface { @@ -28,10 +28,9 @@ type Interface interface { // //ctx env.JobContext : Job execution context. //params map[string]interface{} : parameters with key-pair style for the job execution. - //f CheckOPCmdFunc: check function reference. // //Returns: // error if failed to run. NOTES: If job is stopped or cancelled, a specified error should be returned // - Run(ctx env.JobContext, params map[string]interface{}, f CheckOPCmdFunc) error + Run(ctx env.JobContext, params map[string]interface{}) error } diff --git a/src/jobservice_v2/job/redis_job_wrapper.go b/src/jobservice_v2/job/redis_job_wrapper.go index c18d7883e..ddb5d2e7e 100644 --- a/src/jobservice_v2/job/redis_job_wrapper.go +++ b/src/jobservice_v2/job/redis_job_wrapper.go @@ -5,31 +5,38 @@ package job import ( "github.com/gocraft/work" "github.com/vmware/harbor/src/jobservice_v2/env" + "github.com/vmware/harbor/src/jobservice_v2/errs" ) //StatusChangeCallback is the func called when job status changed type StatusChangeCallback func(jobID string, status string) +//CheckOPCmdFuncFactoryFunc is used to generate CheckOPCmdFunc func for the specified job +type CheckOPCmdFuncFactoryFunc func(jobID string) CheckOPCmdFunc + //RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool. type RedisJob struct { - job interface{} - context *env.Context - callback StatusChangeCallback + job interface{} + context *env.Context + callback StatusChangeCallback + opCmdFuncFactory CheckOPCmdFuncFactoryFunc } //NewRedisJob is constructor of RedisJob -func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback) *RedisJob { - return &RedisJob{j, ctx, statusChangeCallback} +func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback, opCmdFuncFactory CheckOPCmdFuncFactoryFunc) *RedisJob { + return &RedisJob{j, ctx, statusChangeCallback, opCmdFuncFactory} } //Run the job func (rj *RedisJob) Run(j *work.Job) error { //Build job execution context jData := env.JobData{ - ID: j.ID, - Name: j.Name, - Args: j.Args, + ID: j.ID, + Name: j.Name, + Args: j.Args, + ExtraData: make(map[string]interface{}), } + jData.ExtraData["opCommandFunc"] = rj.opCmdFuncFactory(j.ID) execContext, err := rj.context.JobContext.Build(jData) if err != nil { return err @@ -41,18 +48,26 @@ func (rj *RedisJob) Run(j *work.Job) error { rj.callback(j.ID, JobStatusRunning) //TODO: Check function should be defined - err = runningJob.Run(execContext, j.Args, nil) + err = runningJob.Run(execContext, j.Args) if err == nil { rj.callback(j.ID, JobStatusSuccess) - } else { - rj.callback(j.ID, JobStatusError) + return nil } - //TODO: - //If error is stopped error, update status to 'Stopped' and return nil - //If error is cancelled error, update status to 'Cancelled' and return err - //Need to consider how to rm the retry option + if errs.IsJobStoppedError(err) { + rj.callback(j.ID, JobStatusStopped) + return nil // no need to put it into the dead queue for resume + } + if errs.IsJobCancelledError(err) { + rj.callback(j.ID, JobStatusCancelled) + return err //need to resume + } + + rj.callback(j.ID, JobStatusError) return err + + //TODO: + //Need to consider how to rm the retry option } diff --git a/src/jobservice_v2/models/models.go b/src/jobservice_v2/models/models.go index ce6d4f733..c48794428 100644 --- a/src/jobservice_v2/models/models.go +++ b/src/jobservice_v2/models/models.go @@ -38,6 +38,7 @@ type JobStatData struct { JobKind string `json:"kind"` IsUnique bool `json:"unique"` RefLink string `json:"ref_link,omitempty"` + CronSpec string `json:"cron_spec,omitempty"` EnqueueTime int64 `json:"enqueue_time"` UpdateTime int64 `json:"update_time"` RunAt int64 `json:"run_at,omitempty"` @@ -54,3 +55,8 @@ type JobPoolStats struct { Concurrency uint `json:"concurrency"` Status string `json:"status"` } + +//JobActionRequest defines for triggering job action like stop/cancel. +type JobActionRequest struct { + Action string `json:"action"` +} diff --git a/src/jobservice_v2/opm/job_operator.go b/src/jobservice_v2/opm/job_operator.go deleted file mode 100644 index cb4e8f22c..000000000 --- a/src/jobservice_v2/opm/job_operator.go +++ /dev/null @@ -1,3 +0,0 @@ -// Copyright 2018 The Harbor Authors. All rights reserved. - -package opm diff --git a/src/jobservice_v2/opm/job_stats_mgr.go b/src/jobservice_v2/opm/job_stats_mgr.go index 4d3e308ee..4ea2d182a 100644 --- a/src/jobservice_v2/opm/job_stats_mgr.go +++ b/src/jobservice_v2/opm/job_stats_mgr.go @@ -9,8 +9,8 @@ type JobStatsManager interface { //Start to serve Start() - //Stop to serve - Stop() + //Shutdown the manager + Shutdown() //Save the job stats //Async method to retry and improve performance @@ -29,4 +29,37 @@ type JobStatsManager interface { //SetJobStatus will mark the status of job to the specified one //Async method to retry SetJobStatus(jobID string, status string) + + //Stop the job + // + //jobID string : ID of the being stopped job + // + //Returns: + // error if meet any problems + Stop(jobID string) error + + //Cancel the job + // + //jobID string : ID of the being cancelled job + // + //Returns: + // error if meet any problems + Cancel(jobID string) error + + //Retry the job + // + //jobID string : ID of the being retried job + // + //Returns: + // error if meet any problems + Retry(jobID string) error + + //CtlCommand check if control command is fired for the specified job. + // + //jobID string : ID of the job + // + //Returns: + // the command if it was fired + // error if it was not fired yet to meet some other problems + CtlCommand(jobID string) (string, error) } diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr.go b/src/jobservice_v2/opm/redis_job_stats_mgr.go index b00b608a2..216db0637 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr.go @@ -4,10 +4,17 @@ package opm import ( "context" + "errors" "math/rand" "strconv" "time" + "github.com/vmware/harbor/src/jobservice_v2/period" + + "github.com/robfig/cron" + + "github.com/gocraft/work" + "github.com/garyburd/redigo/redis" "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/jobservice_v2/job" @@ -20,6 +27,14 @@ const ( opSaveStats = "save_job_stats" opUpdateStatus = "update_job_status" maxFails = 3 + + //CtlCommandStop : command stop + CtlCommandStop = "stop" + //CtlCommandCancel : command cancel + CtlCommandCancel = "cancel" + + //Copy from period.enqueuer + periodicEnqueuerHorizon = 4 * time.Minute ) type queueItem struct { @@ -33,6 +48,8 @@ type RedisJobStatsManager struct { namespace string redisPool *redis.Pool context context.Context + client *work.Client + scheduler period.Interface stopChan chan struct{} doneChan chan struct{} @@ -41,11 +58,13 @@ type RedisJobStatsManager struct { } //NewRedisJobStatsManager is constructor of RedisJobStatsManager -func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisJobStatsManager { +func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool, client *work.Client, scheduler period.Interface) *RedisJobStatsManager { return &RedisJobStatsManager{ namespace: namespace, context: ctx, redisPool: redisPool, + client: client, + scheduler: scheduler, stopChan: make(chan struct{}, 1), doneChan: make(chan struct{}, 1), processChan: make(chan *queueItem, processBufferSize), @@ -61,8 +80,8 @@ func (rjs *RedisJobStatsManager) Start() { rjs.isRunning = true } -//Stop is implementation of same method in JobStatsManager interface. -func (rjs *RedisJobStatsManager) Stop() { +//Shutdown is implementation of same method in JobStatsManager interface. +func (rjs *RedisJobStatsManager) Shutdown() { if !rjs.isRunning { return } @@ -82,6 +101,212 @@ func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) { //Retrieve is implementation of same method in JobStatsManager interface. func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) { + if utils.IsEmptyStr(jobID) { + return models.JobStats{}, errors.New("empty job ID") + } + + return rjs.getJobStats(jobID) +} + +//SetJobStatus is implementation of same method in JobStatsManager interface. +func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) { + if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(status) { + return + } + + item := &queueItem{ + op: opUpdateStatus, + data: []string{jobID, status}, + } + + rjs.processChan <- item +} + +func (rjs *RedisJobStatsManager) loop() { + controlChan := make(chan struct{}) + + defer func() { + rjs.isRunning = false + //Notify other sub goroutines + close(controlChan) + log.Info("Redis job stats manager is stopped") + }() + + for { + select { + case item := <-rjs.processChan: + if err := rjs.process(item); err != nil { + item.fails++ + if item.fails < maxFails { + //Retry after a random interval + go func() { + timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second) + defer timer.Stop() + + select { + case <-timer.C: + rjs.processChan <- item + return + case <-controlChan: + } + }() + } else { + log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) + } + } + break + case <-rjs.stopChan: + rjs.doneChan <- struct{}{} + return + case <-rjs.context.Done(): + return + } + } +} + +//Stop the specified job. +//Async method, not blocking +func (rjs *RedisJobStatsManager) Stop(jobID string) error { + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + theJob, err := rjs.getJobStats(jobID) + if err != nil { + return err + } + + switch theJob.Stats.JobKind { + case job.JobKindGeneric: + //nothing need to do + case job.JobKindScheduled: + //we need to delete the scheduled job in the queue if it is not running yet + //otherwise, nothing need to do + if theJob.Stats.Status == job.JobStatusScheduled { + if err := rjs.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil { + return err + } + } + case job.JobKindPeriodic: + //firstly we need try to delete the job instances scheduled for this periodic job, a try best action + rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged + //secondly delete the periodic job policy + if err := rjs.scheduler.UnSchedule(jobID); err != nil { + return err + } + default: + break + } + + //Check if the job has 'running' instance + if theJob.Stats.Status == job.JobStatusRunning { + //Send 'stop' ctl command to the running instance + if err := rjs.writeCtlCommand(jobID, CtlCommandStop); err != nil { + return err + } + } + + return nil +} + +//Cancel the specified job. +//Async method, not blocking +func (rjs *RedisJobStatsManager) Cancel(jobID string) error { + return nil +} + +//Retry the specified job. +//Async method, not blocking +func (rjs *RedisJobStatsManager) Retry(jobID string) error { + return nil +} + +//CtlCommand checks if control command is fired for the specified job. +func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) { + if utils.IsEmptyStr(jobID) { + return "", errors.New("empty job ID") + } + + return rjs.getCrlCommand(jobID) +} + +func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error { + schedule, err := cron.Parse(cronSpec) + if err != nil { + log.Errorf("cron spec '%s' is not valid", cronSpec) + return err + } + + now := utils.NowEpochSeconds() + nowTime := time.Unix(now, 0) + horizon := nowTime.Add(periodicEnqueuerHorizon) + + //try to delete more + //return the last error if occurred + for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) { + epoch := t.Unix() + log.Infof("epoch=%d\n", epoch) + if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil { + //only logged + log.Errorf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err) + } + } + + return err +} + +func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) { + conn := rjs.redisPool.Get() + defer conn.Close() + + key := utils.KeyJobCtlCommands(rjs.namespace, jobID) + cmd, err := redis.String(conn.Do("HGET", key, "command")) + if err != nil { + return "", err + } + //try to DEL it after getting the command + //Ignore the error,leave it as dirty data + _, err = conn.Do("DEL", key) + if err != nil { + //only logged + log.Errorf("del key %s failed with error: %s\n", key, err) + } + + return cmd, nil +} + +func (rjs *RedisJobStatsManager) writeCtlCommand(jobID string, command string) error { + conn := rjs.redisPool.Get() + defer conn.Close() + + key := utils.KeyJobCtlCommands(rjs.namespace, jobID) + args := make([]interface{}, 0, 5) + args = append(args, key, "command", command, "fire_time", time.Now().Unix()) + if err := conn.Send("HMSET", args...); err != nil { + return err + } + + expireTime := 24*60*60 + rand.Int63n(10) + if err := conn.Send("EXPIRE", key, expireTime); err != nil { + return err + } + + return conn.Flush() +} + +func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error { + conn := rjs.redisPool.Get() + defer conn.Close() + + key := utils.KeyJobStats(rjs.namespace, jobID) + args := make([]interface{}, 0, 5) + args = append(args, key, "status", status, "update_time", time.Now().Unix()) + _, err := conn.Do("HMSET", args...) + + return err +} + +func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, error) { conn := rjs.redisPool.Get() defer conn.Close() @@ -137,6 +362,9 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) case "check_in": res.Stats.CheckIn = value break + case "cron_spec": + res.Stats.CronSpec = value + break default: } } @@ -144,70 +372,6 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) return res, nil } -//SetJobStatus is implementation of same method in JobStatsManager interface. -func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) { - item := &queueItem{ - op: opUpdateStatus, - data: []string{jobID, status}, - } - - rjs.processChan <- item -} - -func (rjs *RedisJobStatsManager) loop() { - controlChan := make(chan struct{}) - - defer func() { - rjs.isRunning = false - //Notify other sub goroutines - close(controlChan) - log.Info("Redis job stats manager is stopped") - }() - - for { - select { - case item := <-rjs.processChan: - if err := rjs.process(item); err != nil { - item.fails++ - if item.fails < maxFails { - //Retry after a random interval - go func() { - timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second) - defer timer.Stop() - - select { - case <-timer.C: - rjs.processChan <- item - return - case <-controlChan: - } - }() - } else { - log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) - } - } - break - case <-rjs.stopChan: - rjs.doneChan <- struct{}{} - return - case <-rjs.context.Done(): - return - } - } -} - -func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error { - conn := rjs.redisPool.Get() - defer conn.Close() - - key := utils.KeyJobStats(rjs.namespace, jobID) - args := make([]interface{}, 0, 3) - args = append(args, key, "status", status, "update_time", time.Now().Unix()) - _, err := conn.Do("HMSET", args...) - - return err -} - func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error { conn := rjs.redisPool.Get() defer conn.Close() @@ -225,6 +389,7 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error { "enqueue_time", jobStats.Stats.EnqueueTime, "update_time", jobStats.Stats.UpdateTime, "run_at", jobStats.Stats.RunAt, + "cron_spec", jobStats.Stats.CronSpec, ) if jobStats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(jobStats.Stats.CheckIn) { args = append(args, @@ -245,6 +410,7 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error { expireTime += future } } + expireTime += rand.Int63n(30) //Avoid lots of keys being expired at the same time conn.Send("EXPIRE", key, expireTime) } diff --git a/src/jobservice_v2/pool/interface.go b/src/jobservice_v2/pool/interface.go index e95ad3eef..49f2cd9b7 100644 --- a/src/jobservice_v2/pool/interface.go +++ b/src/jobservice_v2/pool/interface.go @@ -91,4 +91,28 @@ type Interface interface { // models.JobStats : job stats data // error : error returned if meet any problems GetJobStats(jobID string) (models.JobStats, error) + + //Stop the job + // + //jobID string : ID of the enqueued job + // + //Returns: + // error : error returned if meet any problems + StopJob(jobID string) error + + //Cancel the job + // + //jobID string : ID of the enqueued job + // + //Returns: + // error : error returned if meet any problems + CancelJob(jobID string) error + + //Retry the job + // + //jobID string : ID of the enqueued job + // + //Returns: + // error : error returned if meet any problems + RetryJob(jobID string) error } diff --git a/src/jobservice_v2/pool/redis_pool.go b/src/jobservice_v2/pool/redis_pool.go index 93c31abce..d782e0503 100644 --- a/src/jobservice_v2/pool/redis_pool.go +++ b/src/jobservice_v2/pool/redis_pool.go @@ -83,7 +83,7 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool client := work.NewClient(cfg.Namespace, redisPool) scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool) sweeper := period.NewSweeper(cfg.Namespace, redisPool, client) - statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool) + statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool, client, scheduler) return &GoCraftWorkPool{ namespace: cfg.Namespace, redisPool: redisPool, @@ -115,7 +115,7 @@ func (gcwp *GoCraftWorkPool) Start() { go func() { defer func() { gcwp.context.WG.Done() - gcwp.statsManager.Stop() + gcwp.statsManager.Shutdown() }() //Start stats manager //None-blocking @@ -176,7 +176,17 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error { statusChangeCallback := func(jobID string, status string) { gcwp.statsManager.SetJobStatus(jobID, status) } - redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback) + //Define the concrete factory method for creating 'job.CheckOPCmdFunc'. + checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc { + return func() (string, bool) { + cmd, err := gcwp.statsManager.CtlCommand(jobID) + if err != nil { + return "", false + } + return cmd, true + } + } + redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback, checkOPCmdFuncFactory) //Get more info from j theJ := job.Wrap(j) @@ -274,6 +284,7 @@ func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.P JobName: jobName, Status: job.JobStatusPending, JobKind: job.JobKindPeriodic, + CronSpec: cronSetting, EnqueueTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), RefLink: fmt.Sprintf("/api/v1/jobs/%s", id), @@ -327,6 +338,21 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) { return models.JobPoolStats{}, errors.New("Failed to get stats of worker pool") } +//StopJob will stop the job +func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { + return gcwp.statsManager.Stop(jobID) +} + +//CancelJob will cancel the job +func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error { + return gcwp.statsManager.Cancel(jobID) +} + +//RetryJob retry the job +func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error { + return nil +} + //IsKnownJob ... func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (interface{}, bool) { v, ok := gcwp.knownJobs[name] diff --git a/src/jobservice_v2/utils/keys.go b/src/jobservice_v2/utils/keys.go index 45ac9a8aa..888d7b16b 100644 --- a/src/jobservice_v2/utils/keys.go +++ b/src/jobservice_v2/utils/keys.go @@ -60,3 +60,8 @@ func KeyPeriodicLock(namespace string) string { func KeyJobStats(namespace string, jobID string) string { return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "job_stats", jobID) } + +//KeyJobCtlCommands give the key for publishing ctl commands like 'stop' etc. +func KeyJobCtlCommands(namespace string, jobID string) string { + return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "ctl_commands", jobID) +}