Support job related operational actions

support stop job
support cancel running generic job
support retry failed job
support checkin details progress of job
This commit is contained in:
Steven Zou 2018-03-20 18:03:04 +08:00
parent fbc24d85db
commit ac544b3ead
16 changed files with 480 additions and 175 deletions

View File

@ -123,22 +123,28 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
return
}
if jobActionReq.Action == opm.CtlCommandStop {
switch jobActionReq.Action {
case opm.CtlCommandStop:
if err := dh.controller.StopJob(jobID); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err))
return
}
} else if jobActionReq.Action == opm.CtlCommandCancel {
if err := dh.controller.StopJob(jobID); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err))
case opm.CtlCommandCancel:
if err := dh.controller.CancelJob(jobID); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.CancelJobError(err))
return
}
} else {
case opm.CtlCommandRetry:
if err := dh.controller.RetryJob(jobID); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.RetryJobError(err))
return
}
default:
dh.handleError(w, http.StatusNotImplemented, errs.UnknownActionNameError(fmt.Errorf("%s", jobID)))
return
}
w.WriteHeader(http.StatusOK)
w.WriteHeader(http.StatusNoContent) //only header, no content returned
}
//HandleCheckStatusReq is implementation of method defined in interface 'Handler'

View File

@ -28,6 +28,8 @@ const (
StopJobErrorCode
//CancelJobErrorCode is code for the error of cancelling job
CancelJobErrorCode
//RetryJobErrorCode is code for the error of retry job
RetryJobErrorCode
//UnknownActionNameErrorCode is code for the case of unknown action name
UnknownActionNameErrorCode
)
@ -97,6 +99,11 @@ func CancelJobError(err error) error {
return New(CancelJobErrorCode, "Cancel job failed with error", err.Error())
}
//RetryJobError is error for the case of retrying job failed
func RetryJobError(err error) error {
return New(RetryJobErrorCode, "Retry job failed with error", err.Error())
}
//UnknownActionNameError is error for the case of getting unknown job action
func UnknownActionNameError(err error) error {
return New(UnknownActionNameErrorCode, "Unknown job action name", err.Error())
@ -124,7 +131,7 @@ type jobCancelledError struct {
//JobCancelledError is error wrapper for the case of cancelling job.
func JobCancelledError() error {
return jobStoppedError{
return jobCancelledError{
baseError{
Code: JobStoppedErrorCode,
Err: "Job is cancelled",

View File

@ -4,6 +4,7 @@ package impl
import (
"context"
"errors"
"reflect"
hlog "github.com/vmware/harbor/src/common/utils/log"
@ -21,6 +22,9 @@ type Context struct {
//op command func
opCommandFunc job.CheckOPCmdFunc
//checkin func
checkInFunc job.CheckInFunc
}
//NewContext ...
@ -51,6 +55,14 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
}
}
if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok {
if reflect.TypeOf(checkInFunc).Kind() == reflect.Func {
if funcRef, ok := checkInFunc.(job.CheckInFunc); ok {
jContext.checkInFunc = funcRef
}
}
}
return jContext, nil
}
@ -66,6 +78,12 @@ func (c *Context) SystemContext() context.Context {
//Checkin is bridge func for reporting detailed status
func (c *Context) Checkin(status string) error {
if c.checkInFunc != nil {
c.checkInFunc(status)
} else {
return errors.New("nil check in function")
}
return nil
}

View File

@ -8,6 +8,8 @@ import (
"strings"
"time"
"github.com/vmware/harbor/src/jobservice_v2/opm"
"github.com/vmware/harbor/src/jobservice_v2/errs"
"github.com/vmware/harbor/src/jobservice_v2/env"
@ -18,7 +20,12 @@ type ReplicationJob struct{}
//MaxFails is implementation of same method in Interface.
func (rj *ReplicationJob) MaxFails() uint {
return 2
return 3
}
//ShouldRetry ...
func (rj *ReplicationJob) ShouldRetry() bool {
return true
}
//Validate is implementation of same method in Interface.
@ -47,13 +54,33 @@ func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{})
fmt.Printf("params: %#v\n", params)
fmt.Printf("context: %#v\n", ctx)
/*if 1 != 0 {
return errors.New("I suicide")
}*/
fmt.Println("check in 30%")
ctx.Checkin("30%")
time.Sleep(5 * time.Second)
fmt.Println("check in 60%")
ctx.Checkin("60%")
time.Sleep(5 * time.Second)
fmt.Println("check in 100%")
ctx.Checkin("100%")
time.Sleep(1 * time.Second)
//HOLD ON FOR A WHILE
fmt.Println("Holding for 20 sec")
<-time.After(20 * time.Second)
fmt.Println("I'm back, check if I'm stopped")
fmt.Println("I'm back, check if I'm stopped/cancelled")
if cmd, ok := ctx.OPCommand(); ok {
fmt.Printf("cmd=%s\n", cmd)
if cmd == opm.CtlCommandCancel {
fmt.Println("exit for receiving cancel signal")
return errs.JobCancelledError()
}
fmt.Println("exit for receiving stop signal")
return errs.JobStoppedError()
}

View File

@ -9,14 +9,24 @@ import "github.com/vmware/harbor/src/jobservice_v2/env"
//command code for job to determine if take corresponding action.
type CheckOPCmdFunc func() (string, bool)
//CheckInFunc is designed for job to report more detailed progress info
type CheckInFunc func(message string)
//Interface defines the related injection and run entry methods.
type Interface interface {
//Declare how many times the job can be retried if failed.
//
//Return:
// uint: the failure count allowed
// uint: the failure count allowed. If it is set to 0, then default value 4 is used.
MaxFails() uint
//Tell the worker pool if retry the failed job when the fails is
//still less that the number declared by the method 'MaxFails'.
//
//Returns:
// true for retry and false for none-retry
ShouldRetry() bool
//Indicate whether the parameters of job are valid.
//
//Return:

View File

@ -1,73 +0,0 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
import (
"github.com/gocraft/work"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/errs"
)
//StatusChangeCallback is the func called when job status changed
type StatusChangeCallback func(jobID string, status string)
//CheckOPCmdFuncFactoryFunc is used to generate CheckOPCmdFunc func for the specified job
type CheckOPCmdFuncFactoryFunc func(jobID string) CheckOPCmdFunc
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
type RedisJob struct {
job interface{}
context *env.Context
callback StatusChangeCallback
opCmdFuncFactory CheckOPCmdFuncFactoryFunc
}
//NewRedisJob is constructor of RedisJob
func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback, opCmdFuncFactory CheckOPCmdFuncFactoryFunc) *RedisJob {
return &RedisJob{j, ctx, statusChangeCallback, opCmdFuncFactory}
}
//Run the job
func (rj *RedisJob) Run(j *work.Job) error {
//Build job execution context
jData := env.JobData{
ID: j.ID,
Name: j.Name,
Args: j.Args,
ExtraData: make(map[string]interface{}),
}
jData.ExtraData["opCommandFunc"] = rj.opCmdFuncFactory(j.ID)
execContext, err := rj.context.JobContext.Build(jData)
if err != nil {
return err
}
//Inject data
runningJob := Wrap(rj.job)
//Start to run
rj.callback(j.ID, JobStatusRunning)
//TODO: Check function should be defined
err = runningJob.Run(execContext, j.Args)
if err == nil {
rj.callback(j.ID, JobStatusSuccess)
return nil
}
if errs.IsJobStoppedError(err) {
rj.callback(j.ID, JobStatusStopped)
return nil // no need to put it into the dead queue for resume
}
if errs.IsJobCancelledError(err) {
rj.callback(j.ID, JobStatusCancelled)
return err //need to resume
}
rj.callback(j.ID, JobStatusError)
return err
//TODO:
//Need to consider how to rm the retry option
}

View File

@ -1,18 +0,0 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
import "reflect"
//Wrap returns a new (job.)Interface based on the wrapped job handler reference.
func Wrap(j interface{}) Interface {
theType := reflect.TypeOf(j)
if theType.Kind() == reflect.Ptr {
theType = theType.Elem()
}
//Crate new
v := reflect.New(theType).Elem()
return v.Addr().Interface().(Interface)
}

View File

@ -44,6 +44,7 @@ type JobStatData struct {
RunAt int64 `json:"run_at,omitempty"`
CheckIn string `json:"check_in,omitempty"`
CheckInAt int64 `json:"check_in_at,omitempty"`
DieAt int64 `json:"die_at,omitempty"`
}
//JobPoolStats represent the healthy and status of the job service.

View File

@ -54,7 +54,7 @@ type JobStatsManager interface {
// error if meet any problems
Retry(jobID string) error
//CtlCommand check if control command is fired for the specified job.
//CtlCommand checks if control command is fired for the specified job.
//
//jobID string : ID of the job
//
@ -62,4 +62,18 @@ type JobStatsManager interface {
// the command if it was fired
// error if it was not fired yet to meet some other problems
CtlCommand(jobID string) (string, error)
//CheckIn message for the specified job like detailed progress info.
//
//jobID string : ID of the job
//message string : The message being checked in
//
CheckIn(jobID string, message string)
//DieAt marks the failed jobs with the time they put into dead queue.
//
//jobID string : ID of the job
//message string : The message being checked in
//
DieAt(jobID string, dieAt int64)
}

View File

@ -5,6 +5,7 @@ package opm
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"time"
@ -26,12 +27,16 @@ const (
processBufferSize = 1024
opSaveStats = "save_job_stats"
opUpdateStatus = "update_job_status"
opCheckIn = "check_in"
opDieAt = "mark_die_at"
maxFails = 3
//CtlCommandStop : command stop
CtlCommandStop = "stop"
//CtlCommandCancel : command cancel
CtlCommandCancel = "cancel"
//CtlCommandRetry : command retry
CtlCommandRetry = "retry"
//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute
@ -90,6 +95,7 @@ func (rjs *RedisJobStatsManager) Shutdown() {
}
//Save is implementation of same method in JobStatsManager interface.
//Async method
func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) {
item := &queueItem{
op: opSaveStats,
@ -100,6 +106,7 @@ func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) {
}
//Retrieve is implementation of same method in JobStatsManager interface.
//Sync method
func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) {
if utils.IsEmptyStr(jobID) {
return models.JobStats{}, errors.New("empty job ID")
@ -109,6 +116,7 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error)
}
//SetJobStatus is implementation of same method in JobStatsManager interface.
//Async method
func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) {
if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(status) {
return
@ -135,25 +143,27 @@ func (rjs *RedisJobStatsManager) loop() {
for {
select {
case item := <-rjs.processChan:
if err := rjs.process(item); err != nil {
item.fails++
if item.fails < maxFails {
//Retry after a random interval
go func() {
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer timer.Stop()
go func(item *queueItem) {
if err := rjs.process(item); err != nil {
item.fails++
if item.fails < maxFails {
//Retry after a random interval
go func() {
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer timer.Stop()
select {
case <-timer.C:
rjs.processChan <- item
return
case <-controlChan:
}
}()
} else {
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
select {
case <-timer.C:
rjs.processChan <- item
return
case <-controlChan:
}
}()
} else {
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
}
}
}
}(item)
break
case <-rjs.stopChan:
rjs.doneChan <- struct{}{}
@ -165,7 +175,6 @@ func (rjs *RedisJobStatsManager) loop() {
}
//Stop the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Stop(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
@ -188,12 +197,17 @@ func (rjs *RedisJobStatsManager) Stop(jobID string) error {
}
}
case job.JobKindPeriodic:
//firstly we need try to delete the job instances scheduled for this periodic job, a try best action
rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
//secondly delete the periodic job policy
//firstly delete the periodic job policy
if err := rjs.scheduler.UnSchedule(jobID); err != nil {
return err
}
//secondly we need try to delete the job instances scheduled for this periodic job, a try best action
rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
//thirdly expire the job stats of this periodic job if exists
if err := rjs.expirePeriodicJobStats(theJob.Stats.JobID); err != nil {
//only logged
log.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
}
default:
break
}
@ -212,13 +226,64 @@ func (rjs *RedisJobStatsManager) Stop(jobID string) error {
//Cancel the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Cancel(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
if theJob.Stats.Status != job.JobStatusRunning {
return fmt.Errorf("only running job can be cancelled, job '%s' seems not running now", theJob.Stats.JobID)
}
//Send 'cancel' ctl command to the running instance
if err := rjs.writeCtlCommand(jobID, CtlCommandCancel); err != nil {
return err
}
break
default:
return fmt.Errorf("job kind '%s' does not support 'cancel' operation", theJob.Stats.JobKind)
}
return nil
}
//Retry the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Retry(jobID string) error {
return nil
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
}
if theJob.Stats.DieAt == 0 {
return fmt.Errorf("job '%s' is not a retryable job", jobID)
}
return rjs.client.RetryDeadJob(theJob.Stats.DieAt, jobID)
}
//CheckIn mesage
func (rjs *RedisJobStatsManager) CheckIn(jobID string, message string) {
if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(message) {
return
}
item := &queueItem{
op: opCheckIn,
data: []string{jobID, message},
}
rjs.processChan <- item
}
//CtlCommand checks if control command is fired for the specified job.
@ -230,6 +295,33 @@ func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) {
return rjs.getCrlCommand(jobID)
}
//DieAt marks the failed jobs with the time they put into dead queue.
func (rjs *RedisJobStatsManager) DieAt(jobID string, dieAt int64) {
if utils.IsEmptyStr(jobID) || dieAt == 0 {
return
}
item := &queueItem{
op: opDieAt,
data: []interface{}{jobID, dieAt},
}
rjs.processChan <- item
}
func (rjs *RedisJobStatsManager) expirePeriodicJobStats(jobID string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
//The periodic job (policy) is stopped/unscheduled and then
//the stats of periodic job now can be expired
key := utils.KeyJobStats(rjs.namespace, jobID)
expireTime := 24 * 60 * 60 //1 day
_, err := conn.Do("EXPIRE", key, expireTime)
return err
}
func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
schedule, err := cron.Parse(cronSpec)
if err != nil {
@ -245,7 +337,6 @@ func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID st
//return the last error if occurred
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
log.Infof("epoch=%d\n", epoch)
if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil {
//only logged
log.Errorf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
@ -301,11 +392,55 @@ func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) er
key := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 5)
args = append(args, key, "status", status, "update_time", time.Now().Unix())
if status == job.JobStatusSuccess {
//make sure the 'die_at' is reset in case it's a retrying job
args = append(args, "die_at", 0)
}
_, err := conn.Do("HMSET", args...)
return err
}
func (rjs *RedisJobStatsManager) checkIn(jobID string, message string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
now := time.Now().Unix()
key := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 7)
args = append(args, key, "check_in", message, "check_in_at", now, "update_time", now)
_, err := conn.Do("HMSET", args...)
return err
}
func (rjs *RedisJobStatsManager) dieAt(jobID string, baseTime int64) error {
conn := rjs.redisPool.Get()
defer conn.Close()
//Query the dead job in the time scope of [baseTime,baseTime+5]
key := utils.RedisKeyDead(rjs.namespace)
jobWithScores, err := utils.GetZsetByScore(rjs.redisPool, key, []int64{baseTime, baseTime + 5})
if err != nil {
return err
}
for _, jws := range jobWithScores {
if j, err := utils.DeSerializeJob(jws.JobBytes); err == nil {
if j.ID == jobID {
//Found
statsKey := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 7)
args = append(args, statsKey, "die_at", jws.Score, "update_time", time.Now().Unix())
_, err := conn.Do("HMSET", args...)
return err
}
}
}
return fmt.Errorf("seems %s is not a dead job", jobID)
}
func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, error) {
conn := rjs.redisPool.Get()
defer conn.Close()
@ -365,6 +500,9 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err
case "cron_spec":
res.Stats.CronSpec = value
break
case "die_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.DieAt = v
default:
}
}
@ -397,6 +535,9 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
"check_in_at", jobStats.Stats.CheckInAt,
)
}
if jobStats.Stats.DieAt > 0 {
args = append(args, "die_at", jobStats.Stats.DieAt)
}
conn.Send("HMSET", args...)
//If job kind is periodic job, expire time should not be set
@ -425,6 +566,12 @@ func (rjs *RedisJobStatsManager) process(item *queueItem) error {
case opUpdateStatus:
data := item.data.([]string)
return rjs.updateJobStatus(data[0], data[1])
case opCheckIn:
data := item.data.([]string)
return rjs.checkIn(data[0], data[1])
case opDieAt:
data := item.data.([]interface{})
return rjs.dieAt(data[0].(string), data[1].(int64))
default:
break
}

View File

@ -3,7 +3,6 @@
package period
import (
"errors"
"fmt"
"time"
@ -55,7 +54,7 @@ func (s *Sweeper) ClearOutdatedScheduledJobs() error {
}
nowEpoch := time.Now().Unix()
jobScores, err := GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch})
jobScores, err := utils.GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch})
if err != nil {
return err
}
@ -94,32 +93,3 @@ func (s *Sweeper) ClearOutdatedScheduledJobs() error {
}
return fmt.Errorf("%s", errorSummary)
}
//JobScore represents the data item with score in the redis db.
type JobScore struct {
JobBytes []byte
Score int64
}
//GetZsetByScore get the items from the zset filtered by the specified score scope.
func GetZsetByScore(pool *redis.Pool, key string, scores []int64) ([]JobScore, error) {
if pool == nil || utils.IsEmptyStr(key) || len(scores) < 2 {
return nil, errors.New("bad arguments")
}
conn := pool.Get()
defer conn.Close()
values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, scores[0], scores[1], "WITHSCORES"))
if err != nil {
return nil, err
}
var jobsWithScores []JobScore
if err := redis.ScanSlice(values, &jobsWithScores); err != nil {
return nil, err
}
return jobsWithScores, nil
}

View File

@ -0,0 +1,151 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package pool
import (
"time"
"github.com/gocraft/work"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/errs"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/opm"
)
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
type RedisJob struct {
job interface{} // the real job implementation
context *env.Context //context
statsManager opm.JobStatsManager //job stats manager
}
//NewRedisJob is constructor of RedisJob
func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager) *RedisJob {
return &RedisJob{
job: j,
context: ctx,
statsManager: statsManager,
}
}
//Run the job
func (rj *RedisJob) Run(j *work.Job) error {
var cancelled = false
execContext, err := rj.buildContext(j)
if err != nil {
return err
}
runningJob := Wrap(rj.job)
defer func() {
if rj.shouldDisableRetry(runningJob, j, cancelled) {
j.Fails = 10000000000 //Make it big enough to avoid retrying
now := time.Now().Unix()
go func() {
timer := time.NewTimer(2 * time.Second) //make sure the failed job is already put into the dead queue
defer timer.Stop()
<-timer.C
rj.statsManager.DieAt(j.ID, now)
}()
}
}()
//Start to run
rj.jobRunning(j.ID)
//Inject data
err = runningJob.Run(execContext, j.Args)
//update the proper status
if err == nil {
rj.jobSucceed(j.ID)
return nil
}
if errs.IsJobStoppedError(err) {
rj.jobStopped(j.ID)
return nil // no need to put it into the dead queue for resume
}
if errs.IsJobCancelledError(err) {
rj.jobCancelled(j.ID)
cancelled = true
return err //need to resume
}
rj.jobFailed(j.ID)
return err
}
func (rj *RedisJob) jobRunning(jobID string) {
rj.statsManager.SetJobStatus(jobID, job.JobStatusRunning)
}
func (rj *RedisJob) jobFailed(jobID string) {
rj.statsManager.SetJobStatus(jobID, job.JobStatusError)
}
func (rj *RedisJob) jobStopped(jobID string) {
rj.statsManager.SetJobStatus(jobID, job.JobStatusStopped)
}
func (rj *RedisJob) jobCancelled(jobID string) {
rj.statsManager.SetJobStatus(jobID, job.JobStatusCancelled)
}
func (rj *RedisJob) jobSucceed(jobID string) {
rj.statsManager.SetJobStatus(jobID, job.JobStatusSuccess)
}
func (rj *RedisJob) buildContext(j *work.Job) (env.JobContext, error) {
//Build job execution context
jData := env.JobData{
ID: j.ID,
Name: j.Name,
Args: j.Args,
ExtraData: make(map[string]interface{}),
}
checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc {
return func() (string, bool) {
cmd, err := rj.statsManager.CtlCommand(jobID)
if err != nil {
return "", false
}
return cmd, true
}
}
jData.ExtraData["opCommandFunc"] = checkOPCmdFuncFactory(j.ID)
checkInFuncFactory := func(jobID string) job.CheckInFunc {
return func(message string) {
rj.statsManager.CheckIn(jobID, message)
}
}
jData.ExtraData["checkInFunc"] = checkInFuncFactory(j.ID)
return rj.context.JobContext.Build(jData)
}
func (rj *RedisJob) shouldDisableRetry(j job.Interface, wj *work.Job, cancelled bool) bool {
maxFails := j.MaxFails()
if maxFails == 0 {
maxFails = 4 //Consistent with backend worker pool
}
fails := wj.Fails
fails++ //as the fail is not returned to backend pool yet
if cancelled && fails < int64(maxFails) {
return true
}
if !cancelled && fails < int64(maxFails) && !j.ShouldRetry() {
return true
}
return false
}

View File

@ -172,24 +172,10 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
return errors.New("job must implement the job.Interface")
}
//Use redis job wrapper pointer to keep the data required by the job.Interface.
statusChangeCallback := func(jobID string, status string) {
gcwp.statsManager.SetJobStatus(jobID, status)
}
//Define the concrete factory method for creating 'job.CheckOPCmdFunc'.
checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc {
return func() (string, bool) {
cmd, err := gcwp.statsManager.CtlCommand(jobID)
if err != nil {
return "", false
}
return cmd, true
}
}
redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback, checkOPCmdFuncFactory)
redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager)
//Get more info from j
theJ := job.Wrap(j)
theJ := Wrap(j)
gcwp.pool.JobWithOptions(name,
work.JobOptions{MaxFails: theJ.MaxFails()},
@ -350,7 +336,7 @@ func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error {
//RetryJob retry the job
func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error {
return nil
return gcwp.statsManager.Retry(jobID)
}
//IsKnownJob ...
@ -365,13 +351,13 @@ func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params m
return errors.New("nil job type")
}
theJ := job.Wrap(jobType)
theJ := Wrap(jobType)
return theJ.Validate(params)
}
//log the job
func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
log.Infof("Job incoming: %s:%s", job.ID, job.Name)
log.Infof("Job incoming: %s:%s", job.Name, job.ID)
return next()
}

View File

@ -0,0 +1,22 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package pool
import (
"reflect"
"github.com/vmware/harbor/src/jobservice_v2/job"
)
//Wrap returns a new job.Interface based on the wrapped job handler reference.
func Wrap(j interface{}) job.Interface {
theType := reflect.TypeOf(j)
if theType.Kind() == reflect.Ptr {
theType = theType.Elem()
}
//Crate new
v := reflect.New(theType).Elem()
return v.Addr().Interface().(job.Interface)
}

View File

@ -43,6 +43,11 @@ func RedisKeyLastPeriodicEnqueue(namespace string) string {
return RedisNamespacePrefix(namespace) + "last_periodic_enqueue"
}
//RedisKeyDead returns key of the dead jobs.
func RedisKeyDead(namespace string) string {
return RedisNamespacePrefix(namespace) + "dead"
}
var nowMock int64
//NowEpochSeconds ...

View File

@ -4,8 +4,11 @@
package utils
import (
"errors"
"os"
"strings"
"github.com/garyburd/redigo/redis"
)
//IsEmptyStr check if the specified str is empty (len ==0) after triming prefix and suffix spaces.
@ -33,3 +36,32 @@ func FileExists(file string) bool {
func IsValidPort(port uint) bool {
return port != 0 && port < 65536
}
//JobScore represents the data item with score in the redis db.
type JobScore struct {
JobBytes []byte
Score int64
}
//GetZsetByScore get the items from the zset filtered by the specified score scope.
func GetZsetByScore(pool *redis.Pool, key string, scores []int64) ([]JobScore, error) {
if pool == nil || IsEmptyStr(key) || len(scores) < 2 {
return nil, errors.New("bad arguments")
}
conn := pool.Get()
defer conn.Close()
values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, scores[0], scores[1], "WITHSCORES"))
if err != nil {
return nil, err
}
var jobsWithScores []JobScore
if err := redis.ScanSlice(values, &jobsWithScores); err != nil {
return nil, err
}
return jobsWithScores, nil
}