From 28d02931d2b56b3ece7383f79252ff4a5b4eff3a Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Thu, 15 Mar 2018 09:56:07 +0800 Subject: [PATCH] Support saving job stats to the baclend for the redis workerpool --- src/jobservice_v2/models/models.go | 22 +++--- src/jobservice_v2/period/redis_scheduler.go | 12 ++++ src/jobservice_v2/period/sweeper.go | 1 + src/jobservice_v2/pool/redis_pool.go | 75 ++++++++++++++++++--- src/jobservice_v2/utils/keys.go | 5 ++ 5 files changed, 94 insertions(+), 21 deletions(-) diff --git a/src/jobservice_v2/models/models.go b/src/jobservice_v2/models/models.go index b88a52fe9..ce6d4f733 100644 --- a/src/jobservice_v2/models/models.go +++ b/src/jobservice_v2/models/models.go @@ -2,10 +2,6 @@ package models -import ( - "time" -) - //Parameters for job execution. type Parameters map[string]interface{} @@ -36,13 +32,17 @@ type JobStats struct { //JobStatData keeps the stats of job type JobStatData struct { - JobID string `json:"id"` - Status string `json:"status"` - JobName string `json:"name"` - RefLink string `json:"ref_link,omitempty"` - EnqueueTime time.Time `json:"enqueue_time"` - UpdateTime time.Time `json:"update_time"` - RunAt time.Time `json:"run_at,omitempty"` + JobID string `json:"id"` + Status string `json:"status"` + JobName string `json:"name"` + JobKind string `json:"kind"` + IsUnique bool `json:"unique"` + RefLink string `json:"ref_link,omitempty"` + EnqueueTime int64 `json:"enqueue_time"` + UpdateTime int64 `json:"update_time"` + RunAt int64 `json:"run_at,omitempty"` + CheckIn string `json:"check_in,omitempty"` + CheckInAt int64 `json:"check_in_at,omitempty"` } //JobPoolStats represent the healthy and status of the job service. diff --git a/src/jobservice_v2/period/redis_scheduler.go b/src/jobservice_v2/period/redis_scheduler.go index 04c25e124..9d053b2c8 100644 --- a/src/jobservice_v2/period/redis_scheduler.go +++ b/src/jobservice_v2/period/redis_scheduler.go @@ -57,6 +57,8 @@ func (rps *RedisPeriodicScheduler) Start() error { //As we get one connection from the pool, don't try to close it. conn := rps.redisPool.Get() + defer conn.Close() + psc := redis.PubSubConn{ Conn: conn, } @@ -170,6 +172,8 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame //Save to redis db and publish notification via redis transaction conn := rps.redisPool.Get() + defer conn.Close() + conn.Send("MULTI") conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON) conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2) @@ -205,6 +209,8 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error { //REM from redis db conn := rps.redisPool.Get() + defer conn.Close() + conn.Send("MULTI") conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON) @@ -216,6 +222,8 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error { //Load data from zset func (rps *RedisPeriodicScheduler) Load() error { conn := rps.redisPool.Get() + defer conn.Close() + bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES")) if err != nil { return err @@ -258,6 +266,8 @@ func (rps *RedisPeriodicScheduler) Load() error { //Clear is implementation of the same method in period.Interface func (rps *RedisPeriodicScheduler) Clear() error { conn := rps.redisPool.Get() + defer conn.Close() + _, err := conn.Do("ZREMRANGEBYRANK", utils.KeyPeriodicPolicy(rps.namespace), 0, -1) return err @@ -269,6 +279,8 @@ func (rps *RedisPeriodicScheduler) exists(rawPolicy string) (int64, bool) { } conn := rps.redisPool.Get() + defer conn.Close() + count, err := redis.Int64(conn.Do("ZSCORE", utils.KeyPeriodicPolicy(rps.namespace), rawPolicy)) return count, err == nil } diff --git a/src/jobservice_v2/period/sweeper.go b/src/jobservice_v2/period/sweeper.go index eb1c5294c..fb8dbf9f6 100644 --- a/src/jobservice_v2/period/sweeper.go +++ b/src/jobservice_v2/period/sweeper.go @@ -34,6 +34,7 @@ func NewSweeper(namespace string, pool *redis.Pool, client *work.Client) *Sweepe func (s *Sweeper) ClearOutdatedScheduledJobs() error { //Check if other workpool has done the action conn := s.redisPool.Get() + defer conn.Close() //Lock r, err := conn.Do("SET", utils.KeyPeriodicLock(s.namespace), time.Now().Unix(), "EX", 30, "NX") diff --git a/src/jobservice_v2/pool/redis_pool.go b/src/jobservice_v2/pool/redis_pool.go index 8e85300cd..4849ff11f 100644 --- a/src/jobservice_v2/pool/redis_pool.go +++ b/src/jobservice_v2/pool/redis_pool.go @@ -33,6 +33,7 @@ const ( //GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis. type GoCraftWorkPool struct { + namespace string redisPool *redis.Pool pool *work.WorkerPool enqueuer *work.Enqueuer @@ -81,6 +82,7 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool) sweeper := period.NewSweeper(cfg.Namespace, redisPool, client) return &GoCraftWorkPool{ + namespace: cfg.Namespace, redisPool: redisPool, pool: pool, enqueuer: enqueuer, @@ -210,7 +212,13 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i return models.JobStats{}, err } - return generateResult(j), nil + res := generateResult(j, job.JobKindGeneric, isUnique) + if err := gcwp.saveStats(res); err != nil { + //Once running job, let it fly away + //The client method may help if the job is still in progress when get stats of this job + log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err) + } + return res, nil } //Schedule job @@ -231,8 +239,14 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, return models.JobStats{}, err } - res := generateResult(j.Job) - res.Stats.RunAt = time.Unix(j.RunAt, 0) + res := generateResult(j.Job, job.JobKindScheduled, isUnique) + res.Stats.RunAt = j.RunAt + + if err := gcwp.saveStats(res); err != nil { + //As job is already scheduled, we should not block this call + //Use client method to help get the status of this fly-away job + log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err) + } return res, nil } @@ -245,13 +259,14 @@ func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.P } //TODO: Need more data + //TODO: EnqueueTime should be got from cron spec return models.JobStats{ Stats: &models.JobStatData{ JobID: id, JobName: jobName, Status: job.JobStatusPending, - EnqueueTime: time.Unix(time.Now().Unix(), 0), - UpdateTime: time.Unix(time.Now().Unix(), 0), + EnqueueTime: time.Now().Unix(), + UpdateTime: time.Now().Unix(), RefLink: fmt.Sprintf("/api/v1/jobs/%s", id), }, }, nil @@ -295,9 +310,47 @@ func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (bool, bool) { return ok, v } -//Clear the invalid data on redis db, such as outdated scheduled jobs etc. -func (gcwp *GoCraftWorkPool) clearDirtyData() { +func (gcwp *GoCraftWorkPool) saveStats(stats models.JobStats) error { + conn := gcwp.redisPool.Get() + defer conn.Close() + key := utils.KeyJobStats(gcwp.namespace, stats.Stats.JobID) + args := make([]interface{}, 0) + args = append(args, key) + args = append(args, + "id", stats.Stats.JobID, + "name", stats.Stats.JobName, + "kind", stats.Stats.JobKind, + "unique", stats.Stats.IsUnique, + "status", stats.Stats.Status, + "ref_link", stats.Stats.RefLink, + "enqueue_time", stats.Stats.EnqueueTime, + "update_time", stats.Stats.UpdateTime, + "run_at", stats.Stats.RunAt, + ) + if stats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(stats.Stats.CheckIn) { + args = append(args, + "check_in", stats.Stats.CheckIn, + "check_in_at", stats.Stats.CheckInAt, + ) + } + + conn.Send("HMSET", args...) + //If job kind is periodic job, expire time should be set + //If job kind is scheduled job, expire time should be runAt+1day + if stats.Stats.JobKind != job.JobKindPeriodic { + var expireTime int64 = 60 * 60 * 24 + if stats.Stats.JobKind == job.JobKindScheduled { + nowTime := time.Now().Unix() + future := stats.Stats.RunAt - nowTime + if future > 0 { + expireTime += future + } + } + conn.Send("EXPIRE", key, expireTime) + } + + return conn.Flush() } //log the job @@ -308,7 +361,7 @@ func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) } //generate the job stats data -func generateResult(j *work.Job) models.JobStats { +func generateResult(j *work.Job, jobKind string, isUnique bool) models.JobStats { if j == nil { return models.JobStats{} } @@ -317,9 +370,11 @@ func generateResult(j *work.Job) models.JobStats { Stats: &models.JobStatData{ JobID: j.ID, JobName: j.Name, + JobKind: jobKind, + IsUnique: isUnique, Status: job.JobStatusPending, - EnqueueTime: time.Unix(j.EnqueuedAt, 0), - UpdateTime: time.Unix(time.Now().Unix(), 0), + EnqueueTime: j.EnqueuedAt, + UpdateTime: time.Now().Unix(), RefLink: fmt.Sprintf("/api/v1/jobs/%s", j.ID), }, } diff --git a/src/jobservice_v2/utils/keys.go b/src/jobservice_v2/utils/keys.go index 23a1563e4..91d035d54 100644 --- a/src/jobservice_v2/utils/keys.go +++ b/src/jobservice_v2/utils/keys.go @@ -81,3 +81,8 @@ func KeyPeriodicNotification(namespace string) string { func KeyPeriodicLock(namespace string) string { return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "lock") } + +//KeyJobStats returns the key of job stats +func KeyJobStats(namespace string, jobID string) string { + return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "job_stats", jobID) +}