Merge pull request #8767 from steven-zou/fix/issue-#8525

fix #8525: periodic job retry issue: job stats are not found
This commit is contained in:
Wenkai Yin(尹文开) 2019-08-22 11:06:13 +08:00 committed by GitHub
commit e3d8fa845d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 31 deletions

View File

@ -15,11 +15,11 @@
package period package period
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"time" "time"
"context"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
@ -175,23 +175,14 @@ func (e *enqueuer) scheduleNextJobs(p *Policy, conn redis.Conn) {
e.lastEnqueueErr = err e.lastEnqueueErr = err
logger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err) logger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err)
} else { } else {
if p.JobParameters == nil {
p.JobParameters = make(job.Parameters)
}
// Clone job parameters
wJobParams := make(job.Parameters)
if p.JobParameters != nil && len(p.JobParameters) > 0 {
for k, v := range p.JobParameters {
wJobParams[k] = v
}
}
// Add extra argument for job running
// Notes: Only for system using
wJobParams[PeriodicExecutionMark] = true
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) { for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix() epoch := t.Unix()
// Clone parameters
// Add extra argument for job running too.
// Notes: Only for system using
wJobParams := cloneParameters(p.JobParameters, epoch)
// Create an execution (job) based on the periodic job template (policy) // Create an execution (job) based on the periodic job template (policy)
j := &work.Job{ j := &work.Job{
Name: p.JobName, Name: p.JobName,
@ -316,3 +307,16 @@ func (e *enqueuer) shouldEnqueue() bool {
return false return false
} }
func cloneParameters(params job.Parameters, epoch int64) job.Parameters {
p := make(job.Parameters)
// Clone parameters to a new param map
for k, v := range params {
p[k] = v
}
p[PeriodicExecutionMark] = fmt.Sprintf("%d", epoch)
return p
}

View File

@ -15,18 +15,17 @@
package runner package runner
import ( import (
"runtime"
"github.com/goharbor/harbor/src/jobservice/job/impl"
"fmt" "fmt"
"runtime"
"time" "time"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/lcm"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/period"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -69,8 +68,10 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Track the running job now // Track the running job now
jID := j.ID jID := j.ID
if isPeriodicJobExecution(j) {
jID = fmt.Sprintf("%s@%d", j.ID, j.EnqueuedAt) // Check if the job is a periodic one as periodic job has its own ID format
if eID, yes := isPeriodicJobExecution(j); yes {
jID = eID
} }
if tracker, err = rj.ctl.Track(jID); err != nil { if tracker, err = rj.ctl.Track(jID); err != nil {
@ -191,7 +192,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Handle retry // Handle retry
rj.retry(runningJob, j) rj.retry(runningJob, j)
// Handle periodic job execution // Handle periodic job execution
if isPeriodicJobExecution(j) { if _, yes := isPeriodicJobExecution(j); yes {
if er := tracker.PeriodicExecutionDone(); er != nil { if er := tracker.PeriodicExecutionDone(); er != nil {
// Just log it // Just log it
logger.Error(er) logger.Error(er)
@ -210,14 +211,9 @@ func (rj *RedisJob) retry(j job.Interface, wj *work.Job) {
} }
} }
func isPeriodicJobExecution(j *work.Job) bool { func isPeriodicJobExecution(j *work.Job) (string, bool) {
if isPeriodic, ok := j.Args["_job_kind_periodic_"]; ok { epoch, ok := j.Args[period.PeriodicExecutionMark]
if isPeriodicV, yes := isPeriodic.(bool); yes && isPeriodicV { return fmt.Sprintf("%s@%s", j.ID, epoch), ok
return true
}
}
return false
} }
func bp(b bool) *bool { func bp(b bool) *bool {

View File

@ -405,7 +405,7 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
w.pool.JobWithOptions( w.pool.JobWithOptions(
name, name,
work.JobOptions{ work.JobOptions{
MaxFails: theJ.MaxFails() + 1, MaxFails: theJ.MaxFails(),
}, },
// Use generic handler to handle as we do not accept context with this way. // Use generic handler to handle as we do not accept context with this way.
func(job *work.Job) error { func(job *work.Job) error {