From a2f0da0d7d164c7efa0702bc62d60275c612f9f6 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Fri, 26 Oct 2018 14:28:04 +0800 Subject: [PATCH] Fix the web hook mising issue for the periodic job executions Signed-off-by: Steven Zou --- src/jobservice/opm/job_stats_mgr.go | 9 +++++++ src/jobservice/opm/redis_job_stats_mgr.go | 32 ++++++++++++++--------- src/jobservice/period/enqueuer.go | 13 +++++++++ 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/src/jobservice/opm/job_stats_mgr.go b/src/jobservice/opm/job_stats_mgr.go index 8a4dc7bf7..f78e6b066 100644 --- a/src/jobservice/opm/job_stats_mgr.go +++ b/src/jobservice/opm/job_stats_mgr.go @@ -93,6 +93,15 @@ type JobStatsManager interface { // error if meet any problems RegisterHook(jobID string, hookURL string, isCached bool) error + // Get hook returns the web hook url for the specified job if it is registered + // + // jobID string : ID of job + // + // Returns: + // the web hook url if existing + // non-nil error if meet any problems + GetHook(jobID string) (string, error) + // Mark the periodic job stats expired // // jobID string : ID of job diff --git a/src/jobservice/opm/redis_job_stats_mgr.go b/src/jobservice/opm/redis_job_stats_mgr.go index 39cca3533..683317589 100644 --- a/src/jobservice/opm/redis_job_stats_mgr.go +++ b/src/jobservice/opm/redis_job_stats_mgr.go @@ -330,6 +330,20 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa return nil } +// GetHook returns the status web hook url for the specified job if existing +func (rjs *RedisJobStatsManager) GetHook(jobID string) (string, error) { + if utils.IsEmptyStr(jobID) { + return "", errors.New("empty job ID") + } + + // First retrieve from the cache + if hookURL, ok := rjs.hookStore.Get(jobID); ok { + return hookURL, nil + } + + return rjs.getHook(jobID) +} + // ExpirePeriodicJobStats marks the periodic job stats expired func (rjs *RedisJobStatsManager) ExpirePeriodicJobStats(jobID string) error { conn := rjs.redisPool.Get() @@ -772,23 +786,15 @@ func (rjs *RedisJobStatsManager) getHook(jobID string) (string, error) { defer conn.Close() key := utils.KeyJobStats(rjs.namespace, jobID) - vals, err := redis.Strings(conn.Do("HGETALL", key)) + hookURL, err := redis.String(conn.Do("HMGET", key, "status_hook")) if err != nil { + if err == redis.ErrNil { + return "", fmt.Errorf("no registered web hook found for job '%s'", jobID) + } return "", err } - for i, l := 0, len(vals); i < l; i = i + 2 { - prop := vals[i] - value := vals[i+1] - switch prop { - case "status_hook": - return value, nil - default: - break - } - } - - return "", fmt.Errorf("no hook found for job '%s'", jobID) + return hookURL, nil } func backoff(seed uint) int { diff --git a/src/jobservice/period/enqueuer.go b/src/jobservice/period/enqueuer.go index cfddfd357..aa9f6ddc6 100644 --- a/src/jobservice/period/enqueuer.go +++ b/src/jobservice/period/enqueuer.go @@ -145,6 +145,19 @@ func (pe *periodicEnqueuer) enqueue() error { // Try to save the stats of new scheduled execution (job). pe.createExecution(pl.PolicyID, pl.JobName, scheduledExecutionID, epoch) + + // Get web hook from the periodic job (policy) + webHookURL, err := pe.statsManager.GetHook(pl.PolicyID) + if err == nil { + // Register hook for the execution + if err := pe.statsManager.RegisterHook(scheduledExecutionID, webHookURL, false); err != nil { + // Just logged + logger.Errorf("Failed to register web hook '%s' for periodic job (execution) '%s' with error: %s", webHookURL, scheduledExecutionID, err) + } + } else { + // Just a warning + logger.Warningf("Failed to retrieve web hook for periodic job (policy) %s: %s", pl.PolicyID, err) + } } // Link the upstream job (policy) with the created executions if len(executions) > 0 {