From 926ac44a6740de8c8de4330774aad6abb6cdbde1 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Wed, 21 Aug 2019 16:26:30 +0800 Subject: [PATCH] fix #8525: periodic job retry issue: job stats is not found Signed-off-by: Steven Zou --- src/jobservice/period/enqueuer.go | 34 +++++++++++++---------- src/jobservice/runner/redis.go | 26 ++++++++--------- src/jobservice/worker/cworker/c_worker.go | 2 +- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/jobservice/period/enqueuer.go b/src/jobservice/period/enqueuer.go index 6303ac80e..7de9e54dd 100644 --- a/src/jobservice/period/enqueuer.go +++ b/src/jobservice/period/enqueuer.go @@ -15,11 +15,11 @@ package period import ( + "context" "fmt" "math/rand" "time" - "context" "github.com/gocraft/work" "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/utils" @@ -175,23 +175,14 @@ func (e *enqueuer) scheduleNextJobs(p *Policy, conn redis.Conn) { e.lastEnqueueErr = err logger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err) } else { - if p.JobParameters == nil { - p.JobParameters = make(job.Parameters) - } - - // Clone job parameters - wJobParams := make(job.Parameters) - if p.JobParameters != nil && len(p.JobParameters) > 0 { - for k, v := range p.JobParameters { - wJobParams[k] = v - } - } - // Add extra argument for job running - // Notes: Only for system using - wJobParams[PeriodicExecutionMark] = true for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) { epoch := t.Unix() + // Clone parameters + // Add extra argument for job running too. + // Notes: Only for system using + wJobParams := cloneParameters(p.JobParameters, epoch) + // Create an execution (job) based on the periodic job template (policy) j := &work.Job{ Name: p.JobName, @@ -316,3 +307,16 @@ func (e *enqueuer) shouldEnqueue() bool { return false } + +func cloneParameters(params job.Parameters, epoch int64) job.Parameters { + p := make(job.Parameters) + + // Clone parameters to a new param map + for k, v := range params { + p[k] = v + } + + p[PeriodicExecutionMark] = fmt.Sprintf("%d", epoch) + + return p +} diff --git a/src/jobservice/runner/redis.go b/src/jobservice/runner/redis.go index 1fa6dde56..69cc94714 100644 --- a/src/jobservice/runner/redis.go +++ b/src/jobservice/runner/redis.go @@ -15,18 +15,17 @@ package runner import ( - "runtime" - - "github.com/goharbor/harbor/src/jobservice/job/impl" - "fmt" + "runtime" "time" "github.com/gocraft/work" "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/job/impl" "github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/jobservice/period" "github.com/pkg/errors" ) @@ -69,8 +68,10 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { // Track the running job now jID := j.ID - if isPeriodicJobExecution(j) { - jID = fmt.Sprintf("%s@%d", j.ID, j.EnqueuedAt) + + // Check if the job is a periodic one as periodic job has its own ID format + if eID, yes := isPeriodicJobExecution(j); yes { + jID = eID } if tracker, err = rj.ctl.Track(jID); err != nil { @@ -191,7 +192,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { // Handle retry rj.retry(runningJob, j) // Handle periodic job execution - if isPeriodicJobExecution(j) { + if _, yes := isPeriodicJobExecution(j); yes { if er := tracker.PeriodicExecutionDone(); er != nil { // Just log it logger.Error(er) @@ -210,14 +211,9 @@ func (rj *RedisJob) retry(j job.Interface, wj *work.Job) { } } -func isPeriodicJobExecution(j *work.Job) bool { - if isPeriodic, ok := j.Args["_job_kind_periodic_"]; ok { - if isPeriodicV, yes := isPeriodic.(bool); yes && isPeriodicV { - return true - } - } - - return false +func isPeriodicJobExecution(j *work.Job) (string, bool) { + epoch, ok := j.Args[period.PeriodicExecutionMark] + return fmt.Sprintf("%s@%s", j.ID, epoch), ok } func bp(b bool) *bool { diff --git a/src/jobservice/worker/cworker/c_worker.go b/src/jobservice/worker/cworker/c_worker.go index b4c3b799f..6de36856f 100644 --- a/src/jobservice/worker/cworker/c_worker.go +++ b/src/jobservice/worker/cworker/c_worker.go @@ -405,7 +405,7 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) { w.pool.JobWithOptions( name, work.JobOptions{ - MaxFails: theJ.MaxFails() + 1, + MaxFails: theJ.MaxFails(), }, // Use generic handler to handle as we do not accept context with this way. func(job *work.Job) error {