Merge pull request #8714 from ywk253100/190816_scheduler

Fix bug found in scheduler
This commit is contained in:
Wenkai Yin(尹文开) 2019-08-20 14:11:04 +08:00 committed by GitHub
commit 29ab93ad9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 25 deletions

View File

@ -15,7 +15,9 @@
package api package api
import ( import (
"encoding/json"
"errors" "errors"
"fmt"
"github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/pkg/scheduler"
@ -121,12 +123,16 @@ func Init() error {
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher) retentionController = retention.NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher)
callbackFun := func(p interface{}) error { callbackFun := func(p interface{}) error {
r, ok := p.(retention.TriggerParam) str, ok := p.(string)
if ok { if !ok {
_, err := retentionController.TriggerRetentionExec(r.PolicyID, r.Trigger, false) return fmt.Errorf("the type of param %v isn't string", p)
return err
} }
return errors.New("bad retention callback param") param := &retention.TriggerParam{}
if err := json.Unmarshal([]byte(str), param); err != nil {
return fmt.Errorf("failed to unmarshal the param: %v", err)
}
_, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false)
return err
} }
err := scheduler.Register(retention.SchedulerCallback, callbackFun) err := scheduler.Register(retention.SchedulerCallback, callbackFun)

View File

@ -21,6 +21,7 @@ import (
"github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api" "github.com/goharbor/harbor/src/core/api"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/scheduler/hook" "github.com/goharbor/harbor/src/pkg/scheduler/hook"
) )
@ -31,7 +32,13 @@ type Handler struct {
// Handle ... // Handle ...
func (h *Handler) Handle() { func (h *Handler) Handle() {
log.Debugf("received scheduler hook event for schedule %s", h.GetStringFromPath(":id"))
var data models.JobStatusChange var data models.JobStatusChange
if err := json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data); err != nil {
log.Errorf("failed to decode hook event: %v", err)
return
}
// status update // status update
if len(data.CheckIn) == 0 { if len(data.CheckIn) == 0 {
schedulerID, err := h.GetInt64FromPath(":id") schedulerID, err := h.GetInt64FromPath(":id")
@ -43,6 +50,7 @@ func (h *Handler) Handle() {
h.SendInternalServerError(fmt.Errorf("failed to update status of job %s: %v", data.JobID, err)) h.SendInternalServerError(fmt.Errorf("failed to update status of job %s: %v", data.JobID, err))
return return
} }
log.Debugf("handle status update hook event for schedule %s completed", h.GetStringFromPath(":id"))
return return
} }
@ -53,7 +61,7 @@ func (h *Handler) Handle() {
log.Errorf("failed to unmarshal parameters from check in message: %v", err) log.Errorf("failed to unmarshal parameters from check in message: %v", err)
return return
} }
callbackFuncNameParam, exist := params["callback_func_name"] callbackFuncNameParam, exist := params[scheduler.JobParamCallbackFunc]
if !exist { if !exist {
log.Error("cannot get the parameter \"callback_func_name\" from the check in message") log.Error("cannot get the parameter \"callback_func_name\" from the check in message")
return return
@ -63,8 +71,9 @@ func (h *Handler) Handle() {
log.Errorf("invalid \"callback_func_name\": %v", callbackFuncName) log.Errorf("invalid \"callback_func_name\": %v", callbackFuncName)
return return
} }
if err := hook.GlobalController.Run(callbackFuncName, params["params"]); err != nil { if err := hook.GlobalController.Run(callbackFuncName, params[scheduler.JobParamCallbackFuncParams]); err != nil {
log.Errorf("failed to run the callback function %s: %v", callbackFuncName, err) log.Errorf("failed to run the callback function %s: %v", callbackFuncName, err)
return return
} }
log.Debugf("callback function %s called for schedule %s", callbackFuncName, h.GetStringFromPath(":id"))
} }

View File

@ -15,6 +15,7 @@
package scheduler package scheduler
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
@ -29,9 +30,10 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// const definitions
const ( const (
jobParamCallbackFunc = "callback_func" JobParamCallbackFunc = "callback_func"
jobParamCallbackFuncParams = "params" JobParamCallbackFuncParams = "params"
) )
var ( var (
@ -46,6 +48,8 @@ type CallbackFunc func(interface{}) error
// Scheduler provides the capability to run a periodic task, a callback function // Scheduler provides the capability to run a periodic task, a callback function
// needs to be registered before using the scheduler // needs to be registered before using the scheduler
// The "params" is passed to the callback function specified by "callbackFuncName"
// as encoded json string, so the callback function must decode it before using
type Scheduler interface { type Scheduler interface {
Schedule(cron string, callbackFuncName string, params interface{}) (int64, error) Schedule(cron string, callbackFuncName string, params interface{}) (int64, error)
UnSchedule(id int64) error UnSchedule(id int64) error
@ -119,6 +123,15 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
if err != nil { if err != nil {
return 0, err return 0, err
} }
// if got error in the following steps, delete the schedule record in database
defer func() {
if err != nil {
e := s.manager.Delete(scheduleID)
if e != nil {
log.Errorf("failed to delete the schedule %d: %v", scheduleID, e)
}
}
}()
log.Debugf("the schedule record %d created", scheduleID) log.Debugf("the schedule record %d created", scheduleID)
// submit scheduler job to Jobservice // submit scheduler job to Jobservice
@ -126,8 +139,7 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
jd := &models.JobData{ jd := &models.JobData{
Name: JobNameScheduler, Name: JobNameScheduler,
Parameters: map[string]interface{}{ Parameters: map[string]interface{}{
jobParamCallbackFunc: callbackFuncName, JobParamCallbackFunc: callbackFuncName,
jobParamCallbackFuncParams: params,
}, },
Metadata: &models.JobMetadata{ Metadata: &models.JobMetadata{
JobKind: job.JobKindPeriodic, JobKind: job.JobKindPeriodic,
@ -135,15 +147,26 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
}, },
StatusHook: statusHookURL, StatusHook: statusHookURL,
} }
if params != nil {
var paramsData []byte
paramsData, err = json.Marshal(params)
if err != nil {
return 0, err
}
jd.Parameters[JobParamCallbackFuncParams] = string(paramsData)
}
jobID, err := s.jobserviceClient.SubmitJob(jd) jobID, err := s.jobserviceClient.SubmitJob(jd)
if err != nil { if err != nil {
// if failed to submit to Jobservice, delete the schedule record in database
e := s.manager.Delete(scheduleID)
if e != nil {
log.Errorf("failed to delete the schedule %d: %v", scheduleID, e)
}
return 0, err return 0, err
} }
// if got error in the following steps, stop the scheduler job
defer func() {
if err != nil {
if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil {
log.Errorf("failed to stop the scheduler job %s: %v", jobID, e)
}
}
}()
log.Debugf("the scheduler job submitted to Jobservice, job ID: %s", jobID) log.Debugf("the scheduler job submitted to Jobservice, job ID: %s", jobID)
// populate the job ID for the schedule // populate the job ID for the schedule
@ -152,14 +175,6 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
JobID: jobID, JobID: jobID,
}, "JobID") }, "JobID")
if err != nil { if err != nil {
// stop the scheduler job
if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil {
log.Errorf("failed to stop the scheduler job %s: %v", jobID, e)
}
// delete the schedule record
if e := s.manager.Delete(scheduleID); e != nil {
log.Errorf("failed to delete the schedule record %d: %v", scheduleID, e)
}
return 0, err return 0, err
} }
@ -172,7 +187,8 @@ func (s *scheduler) UnSchedule(id int64) error {
return err return err
} }
if schedule == nil { if schedule == nil {
return fmt.Errorf("the schedule record %d not found", id) log.Warningf("the schedule record %d not found", id)
return nil
} }
if err = s.jobserviceClient.PostAction(schedule.JobID, job.JobActionStop); err != nil { if err = s.jobserviceClient.PostAction(schedule.JobID, job.JobActionStop); err != nil {
herr, ok := err.(*chttp.Error) herr, ok := err.(*chttp.Error)