Merge branch 'job_service' of https://github.com/vmware/harbor into scan-job-migrate

This commit is contained in:
Tan Jiang 2018-03-20 17:37:27 +08:00
commit 00825d3ca4
16 changed files with 560 additions and 107 deletions

View File

@ -6,6 +6,8 @@ import (
"io/ioutil"
"net/http"
"github.com/vmware/harbor/src/jobservice_v2/opm"
"github.com/gorilla/mux"
"github.com/vmware/harbor/src/jobservice_v2/core"
@ -77,6 +79,10 @@ func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Re
//HandleGetJobReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) {
return
}
vars := mux.Vars(req)
jobID := vars["job_id"]
@ -97,11 +103,50 @@ func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Reque
//HandleJobActionReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) {
return
}
vars := mux.Vars(req)
jobID := vars["job_id"]
data, err := ioutil.ReadAll(req.Body)
if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.ReadRequestBodyError(err))
return
}
//unmarshal data
jobActionReq := models.JobActionRequest{}
if err = json.Unmarshal(data, &jobActionReq); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.HandleJSONDataError(err))
return
}
if jobActionReq.Action == opm.CtlCommandStop {
if err := dh.controller.StopJob(jobID); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err))
return
}
} else if jobActionReq.Action == opm.CtlCommandCancel {
if err := dh.controller.StopJob(jobID); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err))
return
}
} else {
dh.handleError(w, http.StatusNotImplemented, errs.UnknownActionNameError(fmt.Errorf("%s", jobID)))
return
}
w.WriteHeader(http.StatusOK)
}
//HandleCheckStatusReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleCheckStatusReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) {
return
}
stats, err := dh.controller.CheckStatus()
if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.CheckStatsError(err))

View File

@ -77,12 +77,29 @@ func (c *Controller) GetJob(jobID string) (models.JobStats, error) {
//StopJob is implementation of same method in core interface.
func (c *Controller) StopJob(jobID string) error {
return nil
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
return c.backendPool.StopJob(jobID)
}
//CancelJob is implementation of same method in core interface.
func (c *Controller) CancelJob(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
return c.backendPool.CancelJob(jobID)
}
//RetryJob is implementation of same method in core interface.
func (c *Controller) RetryJob(jonID string) error {
return nil
func (c *Controller) RetryJob(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
return c.backendPool.RetryJob(jobID)
}
//CheckStatus is implementation of same method in core interface.

View File

@ -43,6 +43,14 @@ type Interface interface {
// error : Error returned if failed to retry the specified job.
RetryJob(jonID string) error
//Cancel the job
//
//jobID string : ID of the enqueued job
//
//Returns:
// error : error returned if meet any problems
CancelJob(jobID string) error
//CheckStatus is used to handle the job service healthy status checking request.
CheckStatus() (models.JobPoolStats, error)
}

View File

@ -30,11 +30,27 @@ type JobContext interface {
//Returns:
// context.Context
SystemContext() context.Context
//Checkin is bridge func for reporting detailed status
//
//status string : detailed status
//
//Returns:
// error if meet any problems
Checkin(status string) error
//OPCommand return the control operational command like stop/cancel if have
//
//Returns:
// op command if have
// flag to indicate if have command
OPCommand() (string, bool)
}
//JobData defines job context dependencies.
type JobData struct {
ID string
Name string
Args map[string]interface{}
ID string
Name string
Args map[string]interface{}
ExtraData map[string]interface{}
}

View File

@ -10,6 +10,8 @@ import (
const (
//JobStoppedErrorCode is code for jobStoppedError
JobStoppedErrorCode = 10000 + iota
//JobCancelledErrorCode is code for jobCancelledError
JobCancelledErrorCode
//ReadRequestBodyErrorCode is code for the error of reading http request body error
ReadRequestBodyErrorCode
//HandleJSONDataErrorCode is code for the error of handling json data error
@ -22,13 +24,19 @@ const (
CheckStatsErrorCode
//GetJobStatsErrorCode is code for the error of getting stats of enqueued job
GetJobStatsErrorCode
//StopJobErrorCode is code for the error of stopping job
StopJobErrorCode
//CancelJobErrorCode is code for the error of cancelling job
CancelJobErrorCode
//UnknownActionNameErrorCode is code for the case of unknown action name
UnknownActionNameErrorCode
)
//baseError ...
type baseError struct {
Code uint16 `json:"code"`
Err string `json:"message"`
Description string `json:"details"`
Description string `json:"details,omitempty"`
}
//Error is implementation of error interface.
@ -79,12 +87,59 @@ func GetJobStatsError(err error) error {
return New(GetJobStatsErrorCode, "Get job stats failed with error", err.Error())
}
//StopJobError is error for the case of stopping job failed
func StopJobError(err error) error {
return New(StopJobErrorCode, "Stop job failed with error", err.Error())
}
//CancelJobError is error for the case of cancelling job failed
func CancelJobError(err error) error {
return New(CancelJobErrorCode, "Cancel job failed with error", err.Error())
}
//UnknownActionNameError is error for the case of getting unknown job action
func UnknownActionNameError(err error) error {
return New(UnknownActionNameErrorCode, "Unknown job action name", err.Error())
}
//jobStoppedError is designed for the case of stopping job.
type jobStoppedError struct {
baseError
}
//JobStoppedError is error wrapper for the case of stopping job.
func JobStoppedError() error {
return jobStoppedError{
baseError{
Code: JobStoppedErrorCode,
Err: "Job is stopped",
},
}
}
//jobCancelledError is designed for the case of cancelling job.
type jobCancelledError struct {
baseError
}
//JobCancelledError is error wrapper for the case of cancelling job.
func JobCancelledError() error {
return jobStoppedError{
baseError{
Code: JobStoppedErrorCode,
Err: "Job is cancelled",
},
}
}
//IsJobStoppedError return true if the error is jobStoppedError
func IsJobStoppedError(err error) bool {
_, ok := err.(jobStoppedError)
return ok
}
//IsJobCancelledError return true if the error is jobCancelledError
func IsJobCancelledError(err error) bool {
_, ok := err.(jobCancelledError)
return ok
}

View File

@ -4,9 +4,11 @@ package impl
import (
"context"
"reflect"
hlog "github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
)
//Context ...
@ -16,6 +18,9 @@ type Context struct {
//Logger for job
logger *hlog.Logger
//op command func
opCommandFunc job.CheckOPCmdFunc
}
//NewContext ...
@ -33,9 +38,20 @@ func (c *Context) InitDao() error {
//Build implements the same method in env.JobContext interface
//This func will build the job execution context before running
func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
return &Context{
jContext := &Context{
sysContext: c.sysContext,
}, nil
}
//TODO:Init logger here
if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok {
if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func {
if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok {
jContext.opCommandFunc = funcRef
}
}
}
return jContext, nil
}
//Get implements the same method in env.JobContext interface
@ -47,3 +63,17 @@ func (c *Context) Get(prop string) interface{} {
func (c *Context) SystemContext() context.Context {
return c.sysContext
}
//Checkin is bridge func for reporting detailed status
func (c *Context) Checkin(status string) error {
return nil
}
//OPCommand return the control operational command like stop/cancel if have
func (c *Context) OPCommand() (string, bool) {
if c.opCommandFunc != nil {
return c.opCommandFunc()
}
return "", false
}

View File

@ -8,8 +8,9 @@ import (
"strings"
"time"
"github.com/vmware/harbor/src/jobservice_v2/errs"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
)
//ReplicationJob is the job for replicating repositories.
@ -38,13 +39,23 @@ func (rj *ReplicationJob) Validate(params map[string]interface{}) error {
}
//Run the replication logic here.
func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}, f job.CheckOPCmdFunc) error {
func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}) error {
defer func() {
fmt.Println("I'm finished, exit!")
}()
fmt.Println("=======Replication job running=======")
fmt.Printf("params: %#v\n", params)
fmt.Printf("context: %#v\n", ctx)
//HOLD ON FOR A WHILE
fmt.Println("Holding for 10 sec")
<-time.After(10 * time.Second)
fmt.Println("Holding for 20 sec")
<-time.After(20 * time.Second)
fmt.Println("I'm back, check if I'm stopped")
if cmd, ok := ctx.OPCommand(); ok {
fmt.Printf("cmd=%s\n", cmd)
return errs.JobStoppedError()
}
return nil
}

View File

@ -6,8 +6,8 @@ import "github.com/vmware/harbor/src/jobservice_v2/env"
//CheckOPCmdFunc is the function to check if the related operation commands
//like STOP or CANCEL is fired for the specified job. If yes, return the
//command code for job to determin if take corresponding action.
type CheckOPCmdFunc func(string) (uint, bool)
//command code for job to determine if take corresponding action.
type CheckOPCmdFunc func() (string, bool)
//Interface defines the related injection and run entry methods.
type Interface interface {
@ -28,10 +28,9 @@ type Interface interface {
//
//ctx env.JobContext : Job execution context.
//params map[string]interface{} : parameters with key-pair style for the job execution.
//f CheckOPCmdFunc: check function reference.
//
//Returns:
// error if failed to run. NOTES: If job is stopped or cancelled, a specified error should be returned
//
Run(ctx env.JobContext, params map[string]interface{}, f CheckOPCmdFunc) error
Run(ctx env.JobContext, params map[string]interface{}) error
}

View File

@ -5,31 +5,38 @@ package job
import (
"github.com/gocraft/work"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/errs"
)
//StatusChangeCallback is the func called when job status changed
type StatusChangeCallback func(jobID string, status string)
//CheckOPCmdFuncFactoryFunc is used to generate CheckOPCmdFunc func for the specified job
type CheckOPCmdFuncFactoryFunc func(jobID string) CheckOPCmdFunc
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
type RedisJob struct {
job interface{}
context *env.Context
callback StatusChangeCallback
job interface{}
context *env.Context
callback StatusChangeCallback
opCmdFuncFactory CheckOPCmdFuncFactoryFunc
}
//NewRedisJob is constructor of RedisJob
func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback) *RedisJob {
return &RedisJob{j, ctx, statusChangeCallback}
func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback, opCmdFuncFactory CheckOPCmdFuncFactoryFunc) *RedisJob {
return &RedisJob{j, ctx, statusChangeCallback, opCmdFuncFactory}
}
//Run the job
func (rj *RedisJob) Run(j *work.Job) error {
//Build job execution context
jData := env.JobData{
ID: j.ID,
Name: j.Name,
Args: j.Args,
ID: j.ID,
Name: j.Name,
Args: j.Args,
ExtraData: make(map[string]interface{}),
}
jData.ExtraData["opCommandFunc"] = rj.opCmdFuncFactory(j.ID)
execContext, err := rj.context.JobContext.Build(jData)
if err != nil {
return err
@ -41,18 +48,26 @@ func (rj *RedisJob) Run(j *work.Job) error {
rj.callback(j.ID, JobStatusRunning)
//TODO: Check function should be defined
err = runningJob.Run(execContext, j.Args, nil)
err = runningJob.Run(execContext, j.Args)
if err == nil {
rj.callback(j.ID, JobStatusSuccess)
} else {
rj.callback(j.ID, JobStatusError)
return nil
}
//TODO:
//If error is stopped error, update status to 'Stopped' and return nil
//If error is cancelled error, update status to 'Cancelled' and return err
//Need to consider how to rm the retry option
if errs.IsJobStoppedError(err) {
rj.callback(j.ID, JobStatusStopped)
return nil // no need to put it into the dead queue for resume
}
if errs.IsJobCancelledError(err) {
rj.callback(j.ID, JobStatusCancelled)
return err //need to resume
}
rj.callback(j.ID, JobStatusError)
return err
//TODO:
//Need to consider how to rm the retry option
}

View File

@ -38,6 +38,7 @@ type JobStatData struct {
JobKind string `json:"kind"`
IsUnique bool `json:"unique"`
RefLink string `json:"ref_link,omitempty"`
CronSpec string `json:"cron_spec,omitempty"`
EnqueueTime int64 `json:"enqueue_time"`
UpdateTime int64 `json:"update_time"`
RunAt int64 `json:"run_at,omitempty"`
@ -54,3 +55,8 @@ type JobPoolStats struct {
Concurrency uint `json:"concurrency"`
Status string `json:"status"`
}
//JobActionRequest defines for triggering job action like stop/cancel.
type JobActionRequest struct {
Action string `json:"action"`
}

View File

@ -1,3 +0,0 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm

View File

@ -9,8 +9,8 @@ type JobStatsManager interface {
//Start to serve
Start()
//Stop to serve
Stop()
//Shutdown the manager
Shutdown()
//Save the job stats
//Async method to retry and improve performance
@ -29,4 +29,37 @@ type JobStatsManager interface {
//SetJobStatus will mark the status of job to the specified one
//Async method to retry
SetJobStatus(jobID string, status string)
//Stop the job
//
//jobID string : ID of the being stopped job
//
//Returns:
// error if meet any problems
Stop(jobID string) error
//Cancel the job
//
//jobID string : ID of the being cancelled job
//
//Returns:
// error if meet any problems
Cancel(jobID string) error
//Retry the job
//
//jobID string : ID of the being retried job
//
//Returns:
// error if meet any problems
Retry(jobID string) error
//CtlCommand check if control command is fired for the specified job.
//
//jobID string : ID of the job
//
//Returns:
// the command if it was fired
// error if it was not fired yet to meet some other problems
CtlCommand(jobID string) (string, error)
}

View File

@ -4,10 +4,17 @@ package opm
import (
"context"
"errors"
"math/rand"
"strconv"
"time"
"github.com/vmware/harbor/src/jobservice_v2/period"
"github.com/robfig/cron"
"github.com/gocraft/work"
"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/job"
@ -20,6 +27,14 @@ const (
opSaveStats = "save_job_stats"
opUpdateStatus = "update_job_status"
maxFails = 3
//CtlCommandStop : command stop
CtlCommandStop = "stop"
//CtlCommandCancel : command cancel
CtlCommandCancel = "cancel"
//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute
)
type queueItem struct {
@ -33,6 +48,8 @@ type RedisJobStatsManager struct {
namespace string
redisPool *redis.Pool
context context.Context
client *work.Client
scheduler period.Interface
stopChan chan struct{}
doneChan chan struct{}
@ -41,11 +58,13 @@ type RedisJobStatsManager struct {
}
//NewRedisJobStatsManager is constructor of RedisJobStatsManager
func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisJobStatsManager {
func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool, client *work.Client, scheduler period.Interface) *RedisJobStatsManager {
return &RedisJobStatsManager{
namespace: namespace,
context: ctx,
redisPool: redisPool,
client: client,
scheduler: scheduler,
stopChan: make(chan struct{}, 1),
doneChan: make(chan struct{}, 1),
processChan: make(chan *queueItem, processBufferSize),
@ -61,8 +80,8 @@ func (rjs *RedisJobStatsManager) Start() {
rjs.isRunning = true
}
//Stop is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Stop() {
//Shutdown is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Shutdown() {
if !rjs.isRunning {
return
}
@ -82,6 +101,212 @@ func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) {
//Retrieve is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) {
if utils.IsEmptyStr(jobID) {
return models.JobStats{}, errors.New("empty job ID")
}
return rjs.getJobStats(jobID)
}
//SetJobStatus is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) {
if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(status) {
return
}
item := &queueItem{
op: opUpdateStatus,
data: []string{jobID, status},
}
rjs.processChan <- item
}
func (rjs *RedisJobStatsManager) loop() {
controlChan := make(chan struct{})
defer func() {
rjs.isRunning = false
//Notify other sub goroutines
close(controlChan)
log.Info("Redis job stats manager is stopped")
}()
for {
select {
case item := <-rjs.processChan:
if err := rjs.process(item); err != nil {
item.fails++
if item.fails < maxFails {
//Retry after a random interval
go func() {
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer timer.Stop()
select {
case <-timer.C:
rjs.processChan <- item
return
case <-controlChan:
}
}()
} else {
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
}
}
break
case <-rjs.stopChan:
rjs.doneChan <- struct{}{}
return
case <-rjs.context.Done():
return
}
}
}
//Stop the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Stop(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
//nothing need to do
case job.JobKindScheduled:
//we need to delete the scheduled job in the queue if it is not running yet
//otherwise, nothing need to do
if theJob.Stats.Status == job.JobStatusScheduled {
if err := rjs.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil {
return err
}
}
case job.JobKindPeriodic:
//firstly we need try to delete the job instances scheduled for this periodic job, a try best action
rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
//secondly delete the periodic job policy
if err := rjs.scheduler.UnSchedule(jobID); err != nil {
return err
}
default:
break
}
//Check if the job has 'running' instance
if theJob.Stats.Status == job.JobStatusRunning {
//Send 'stop' ctl command to the running instance
if err := rjs.writeCtlCommand(jobID, CtlCommandStop); err != nil {
return err
}
}
return nil
}
//Cancel the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Cancel(jobID string) error {
return nil
}
//Retry the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Retry(jobID string) error {
return nil
}
//CtlCommand checks if control command is fired for the specified job.
func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) {
if utils.IsEmptyStr(jobID) {
return "", errors.New("empty job ID")
}
return rjs.getCrlCommand(jobID)
}
func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
schedule, err := cron.Parse(cronSpec)
if err != nil {
log.Errorf("cron spec '%s' is not valid", cronSpec)
return err
}
now := utils.NowEpochSeconds()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
//try to delete more
//return the last error if occurred
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
log.Infof("epoch=%d\n", epoch)
if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil {
//only logged
log.Errorf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
}
}
return err
}
func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
cmd, err := redis.String(conn.Do("HGET", key, "command"))
if err != nil {
return "", err
}
//try to DEL it after getting the command
//Ignore the error,leave it as dirty data
_, err = conn.Do("DEL", key)
if err != nil {
//only logged
log.Errorf("del key %s failed with error: %s\n", key, err)
}
return cmd, nil
}
func (rjs *RedisJobStatsManager) writeCtlCommand(jobID string, command string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
args := make([]interface{}, 0, 5)
args = append(args, key, "command", command, "fire_time", time.Now().Unix())
if err := conn.Send("HMSET", args...); err != nil {
return err
}
expireTime := 24*60*60 + rand.Int63n(10)
if err := conn.Send("EXPIRE", key, expireTime); err != nil {
return err
}
return conn.Flush()
}
func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 5)
args = append(args, key, "status", status, "update_time", time.Now().Unix())
_, err := conn.Do("HMSET", args...)
return err
}
func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, error) {
conn := rjs.redisPool.Get()
defer conn.Close()
@ -137,6 +362,9 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error)
case "check_in":
res.Stats.CheckIn = value
break
case "cron_spec":
res.Stats.CronSpec = value
break
default:
}
}
@ -144,70 +372,6 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error)
return res, nil
}
//SetJobStatus is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) {
item := &queueItem{
op: opUpdateStatus,
data: []string{jobID, status},
}
rjs.processChan <- item
}
func (rjs *RedisJobStatsManager) loop() {
controlChan := make(chan struct{})
defer func() {
rjs.isRunning = false
//Notify other sub goroutines
close(controlChan)
log.Info("Redis job stats manager is stopped")
}()
for {
select {
case item := <-rjs.processChan:
if err := rjs.process(item); err != nil {
item.fails++
if item.fails < maxFails {
//Retry after a random interval
go func() {
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer timer.Stop()
select {
case <-timer.C:
rjs.processChan <- item
return
case <-controlChan:
}
}()
} else {
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
}
}
break
case <-rjs.stopChan:
rjs.doneChan <- struct{}{}
return
case <-rjs.context.Done():
return
}
}
}
func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 3)
args = append(args, key, "status", status, "update_time", time.Now().Unix())
_, err := conn.Do("HMSET", args...)
return err
}
func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
conn := rjs.redisPool.Get()
defer conn.Close()
@ -225,6 +389,7 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
"enqueue_time", jobStats.Stats.EnqueueTime,
"update_time", jobStats.Stats.UpdateTime,
"run_at", jobStats.Stats.RunAt,
"cron_spec", jobStats.Stats.CronSpec,
)
if jobStats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(jobStats.Stats.CheckIn) {
args = append(args,
@ -245,6 +410,7 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
expireTime += future
}
}
expireTime += rand.Int63n(30) //Avoid lots of keys being expired at the same time
conn.Send("EXPIRE", key, expireTime)
}

View File

@ -91,4 +91,28 @@ type Interface interface {
// models.JobStats : job stats data
// error : error returned if meet any problems
GetJobStats(jobID string) (models.JobStats, error)
//Stop the job
//
//jobID string : ID of the enqueued job
//
//Returns:
// error : error returned if meet any problems
StopJob(jobID string) error
//Cancel the job
//
//jobID string : ID of the enqueued job
//
//Returns:
// error : error returned if meet any problems
CancelJob(jobID string) error
//Retry the job
//
//jobID string : ID of the enqueued job
//
//Returns:
// error : error returned if meet any problems
RetryJob(jobID string) error
}

View File

@ -83,7 +83,7 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool
client := work.NewClient(cfg.Namespace, redisPool)
scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool)
sweeper := period.NewSweeper(cfg.Namespace, redisPool, client)
statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool)
statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool, client, scheduler)
return &GoCraftWorkPool{
namespace: cfg.Namespace,
redisPool: redisPool,
@ -115,7 +115,7 @@ func (gcwp *GoCraftWorkPool) Start() {
go func() {
defer func() {
gcwp.context.WG.Done()
gcwp.statsManager.Stop()
gcwp.statsManager.Shutdown()
}()
//Start stats manager
//None-blocking
@ -176,7 +176,17 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
statusChangeCallback := func(jobID string, status string) {
gcwp.statsManager.SetJobStatus(jobID, status)
}
redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback)
//Define the concrete factory method for creating 'job.CheckOPCmdFunc'.
checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc {
return func() (string, bool) {
cmd, err := gcwp.statsManager.CtlCommand(jobID)
if err != nil {
return "", false
}
return cmd, true
}
}
redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback, checkOPCmdFuncFactory)
//Get more info from j
theJ := job.Wrap(j)
@ -274,6 +284,7 @@ func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.P
JobName: jobName,
Status: job.JobStatusPending,
JobKind: job.JobKindPeriodic,
CronSpec: cronSetting,
EnqueueTime: time.Now().Unix(),
UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", id),
@ -327,6 +338,21 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
return models.JobPoolStats{}, errors.New("Failed to get stats of worker pool")
}
//StopJob will stop the job
func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
return gcwp.statsManager.Stop(jobID)
}
//CancelJob will cancel the job
func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error {
return gcwp.statsManager.Cancel(jobID)
}
//RetryJob retry the job
func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error {
return nil
}
//IsKnownJob ...
func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (interface{}, bool) {
v, ok := gcwp.knownJobs[name]

View File

@ -60,3 +60,8 @@ func KeyPeriodicLock(namespace string) string {
func KeyJobStats(namespace string, jobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "job_stats", jobID)
}
//KeyJobCtlCommands give the key for publishing ctl commands like 'stop' etc.
func KeyJobCtlCommands(namespace string, jobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "ctl_commands", jobID)
}