From 5c286d799fa7e0c43d9ee838766236eee24e0966 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Fri, 16 Aug 2019 13:54:57 +0800 Subject: [PATCH] Fix bug found in scheduler The scheduler hook handler doesn't parse the job status struct when handling the hook. This commit fixes it. Signed-off-by: Wenkai Yin --- src/core/api/base.go | 16 ++++-- .../notifications/scheduler/handler.go | 13 ++++- src/pkg/scheduler/scheduler.go | 52 ++++++++++++------- 3 files changed, 56 insertions(+), 25 deletions(-) diff --git a/src/core/api/base.go b/src/core/api/base.go index 7c4dfddf4..7b4b4bade 100644 --- a/src/core/api/base.go +++ b/src/core/api/base.go @@ -15,7 +15,9 @@ package api import ( + "encoding/json" "errors" + "fmt" "github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/pkg/scheduler" @@ -121,12 +123,16 @@ func Init() error { retentionController = retention.NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher) callbackFun := func(p interface{}) error { - r, ok := p.(retention.TriggerParam) - if ok { - _, err := retentionController.TriggerRetentionExec(r.PolicyID, r.Trigger, false) - return err + str, ok := p.(string) + if !ok { + return fmt.Errorf("the type of param %v isn't string", p) } - return errors.New("bad retention callback param") + param := &retention.TriggerParam{} + if err := json.Unmarshal([]byte(str), param); err != nil { + return fmt.Errorf("failed to unmarshal the param: %v", err) + } + _, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false) + return err } err := scheduler.Register(retention.SchedulerCallback, callbackFun) diff --git a/src/core/service/notifications/scheduler/handler.go b/src/core/service/notifications/scheduler/handler.go index a3592f072..b07cfd5b6 100644 --- a/src/core/service/notifications/scheduler/handler.go +++ b/src/core/service/notifications/scheduler/handler.go @@ -21,6 +21,7 @@ import ( "github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" + "github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/pkg/scheduler/hook" ) @@ -31,7 +32,13 @@ type Handler struct { // Handle ... func (h *Handler) Handle() { + log.Debugf("received scheduler hook event for schedule %s", h.GetStringFromPath(":id")) + var data models.JobStatusChange + if err := json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data); err != nil { + log.Errorf("failed to decode hook event: %v", err) + return + } // status update if len(data.CheckIn) == 0 { schedulerID, err := h.GetInt64FromPath(":id") @@ -43,6 +50,7 @@ func (h *Handler) Handle() { h.SendInternalServerError(fmt.Errorf("failed to update status of job %s: %v", data.JobID, err)) return } + log.Debugf("handle status update hook event for schedule %s completed", h.GetStringFromPath(":id")) return } @@ -53,7 +61,7 @@ func (h *Handler) Handle() { log.Errorf("failed to unmarshal parameters from check in message: %v", err) return } - callbackFuncNameParam, exist := params["callback_func_name"] + callbackFuncNameParam, exist := params[scheduler.JobParamCallbackFunc] if !exist { log.Error("cannot get the parameter \"callback_func_name\" from the check in message") return @@ -63,8 +71,9 @@ func (h *Handler) Handle() { log.Errorf("invalid \"callback_func_name\": %v", callbackFuncName) return } - if err := hook.GlobalController.Run(callbackFuncName, params["params"]); err != nil { + if err := hook.GlobalController.Run(callbackFuncName, params[scheduler.JobParamCallbackFuncParams]); err != nil { log.Errorf("failed to run the callback function %s: %v", callbackFuncName, err) return } + log.Debugf("callback function %s called for schedule %s", callbackFuncName, h.GetStringFromPath(":id")) } diff --git a/src/pkg/scheduler/scheduler.go b/src/pkg/scheduler/scheduler.go index c0fa44d68..6fb7d7e87 100644 --- a/src/pkg/scheduler/scheduler.go +++ b/src/pkg/scheduler/scheduler.go @@ -15,6 +15,7 @@ package scheduler import ( + "encoding/json" "fmt" "net/http" "sync" @@ -29,9 +30,10 @@ import ( "github.com/pkg/errors" ) +// const definitions const ( - jobParamCallbackFunc = "callback_func" - jobParamCallbackFuncParams = "params" + JobParamCallbackFunc = "callback_func" + JobParamCallbackFuncParams = "params" ) var ( @@ -46,6 +48,8 @@ type CallbackFunc func(interface{}) error // Scheduler provides the capability to run a periodic task, a callback function // needs to be registered before using the scheduler +// The "params" is passed to the callback function specified by "callbackFuncName" +// as encoded json string, so the callback function must decode it before using type Scheduler interface { Schedule(cron string, callbackFuncName string, params interface{}) (int64, error) UnSchedule(id int64) error @@ -119,6 +123,15 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf if err != nil { return 0, err } + // if got error in the following steps, delete the schedule record in database + defer func() { + if err != nil { + e := s.manager.Delete(scheduleID) + if e != nil { + log.Errorf("failed to delete the schedule %d: %v", scheduleID, e) + } + } + }() log.Debugf("the schedule record %d created", scheduleID) // submit scheduler job to Jobservice @@ -126,8 +139,7 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf jd := &models.JobData{ Name: JobNameScheduler, Parameters: map[string]interface{}{ - jobParamCallbackFunc: callbackFuncName, - jobParamCallbackFuncParams: params, + JobParamCallbackFunc: callbackFuncName, }, Metadata: &models.JobMetadata{ JobKind: job.JobKindPeriodic, @@ -135,15 +147,26 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf }, StatusHook: statusHookURL, } + if params != nil { + var paramsData []byte + paramsData, err = json.Marshal(params) + if err != nil { + return 0, err + } + jd.Parameters[JobParamCallbackFuncParams] = string(paramsData) + } jobID, err := s.jobserviceClient.SubmitJob(jd) if err != nil { - // if failed to submit to Jobservice, delete the schedule record in database - e := s.manager.Delete(scheduleID) - if e != nil { - log.Errorf("failed to delete the schedule %d: %v", scheduleID, e) - } return 0, err } + // if got error in the following steps, stop the scheduler job + defer func() { + if err != nil { + if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil { + log.Errorf("failed to stop the scheduler job %s: %v", jobID, e) + } + } + }() log.Debugf("the scheduler job submitted to Jobservice, job ID: %s", jobID) // populate the job ID for the schedule @@ -152,14 +175,6 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf JobID: jobID, }, "JobID") if err != nil { - // stop the scheduler job - if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil { - log.Errorf("failed to stop the scheduler job %s: %v", jobID, e) - } - // delete the schedule record - if e := s.manager.Delete(scheduleID); e != nil { - log.Errorf("failed to delete the schedule record %d: %v", scheduleID, e) - } return 0, err } @@ -172,7 +187,8 @@ func (s *scheduler) UnSchedule(id int64) error { return err } if schedule == nil { - return fmt.Errorf("the schedule record %d not found", id) + log.Warningf("the schedule record %d not found", id) + return nil } if err = s.jobserviceClient.PostAction(schedule.JobID, job.JobActionStop); err != nil { herr, ok := err.(*chttp.Error)