Merge pull request #4565 from vmware/fix_stop_periodical_job_issue

Fix issue of stopping periodic job
This commit is contained in:
Steven Zou 2018-04-02 18:40:19 +08:00 committed by GitHub
commit 44d63fe935
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 309 additions and 57 deletions

View File

@ -55,6 +55,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
logger.Info("I'm finished, exit!")
fmt.Println("I'm finished, exit!")
}()
fmt.Println("I'm running")
logger.Info("=======Replication job running=======")
logger.Infof("params: %#v\n", params)
logger.Infof("context: %#v\n", ctx)
@ -81,11 +82,12 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
//HOLD ON FOR A WHILE
logger.Error("Holding for 20 sec")
<-time.After(10 * time.Second)
<-time.After(15 * time.Second)
//logger.Fatal("I'm back, check if I'm stopped/cancelled")
if cmd, ok := ctx.OPCommand(); ok {
logger.Infof("cmd=%s\n", cmd)
fmt.Printf("Receive OP command: %s\n", cmd)
if cmd == opm.CtlCommandCancel {
logger.Info("exit for receiving cancel signal")
return errs.JobCancelledError()
@ -95,7 +97,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
return errs.JobStoppedError()
}
fmt.Println("I'm here")
fmt.Println("I'm close to end")
return nil
}

View File

@ -0,0 +1,166 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/jobservice/logger"
"github.com/vmware/harbor/src/jobservice/models"
"github.com/vmware/harbor/src/jobservice/utils"
)
const (
commandValidTime = 5 * time.Minute
commandSweepTickerTime = 1 * time.Hour
//EventFireCommand for firing command event
EventFireCommand = "fire_command"
)
type oPCommand struct {
command string
fireTime int64
}
//oPCommands maintain commands list
type oPCommands struct {
lock *sync.RWMutex
commands map[string]*oPCommand
context context.Context
redisPool *redis.Pool
namespace string
stopChan chan struct{}
doneChan chan struct{}
}
//newOPCommands is constructor of OPCommands
func newOPCommands(ctx context.Context, ns string, redisPool *redis.Pool) *oPCommands {
return &oPCommands{
lock: new(sync.RWMutex),
commands: make(map[string]*oPCommand),
context: ctx,
redisPool: redisPool,
namespace: ns,
stopChan: make(chan struct{}, 1),
doneChan: make(chan struct{}, 1),
}
}
//Start the command sweeper
func (opc *oPCommands) Start() {
go opc.loop()
logger.Info("OP commands sweeper is started")
}
//Stop the command sweeper
func (opc *oPCommands) Stop() {
opc.stopChan <- struct{}{}
<-opc.doneChan
}
//Fire command
func (opc *oPCommands) Fire(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
if command != CtlCommandStop && command != CtlCommandCancel {
return fmt.Errorf("Unsupported command %s", command)
}
notification := &models.Message{
Event: EventFireCommand,
Data: []string{jobID, command},
}
rawJSON, err := json.Marshal(notification)
if err != nil {
return err
}
conn := opc.redisPool.Get()
defer conn.Close()
_, err = conn.Do("PUBLISH", utils.KeyPeriodicNotification(opc.namespace), rawJSON)
return err
}
//Push command into the list
func (opc *oPCommands) Push(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
if command != CtlCommandStop && command != CtlCommandCancel {
return fmt.Errorf("Unsupported command %s", command)
}
opc.lock.Lock()
defer opc.lock.Unlock()
opc.commands[jobID] = &oPCommand{
command: command,
fireTime: time.Now().Unix(),
}
return nil
}
//Pop out the command if existing
func (opc *oPCommands) Pop(jobID string) (string, bool) {
if utils.IsEmptyStr(jobID) {
return "", false
}
opc.lock.RLock()
defer opc.lock.RUnlock()
c, ok := opc.commands[jobID]
if ok {
if time.Unix(c.fireTime, 0).Add(commandValidTime).After(time.Now()) {
delete(opc.commands, jobID)
return c.command, true
}
}
return "", false
}
func (opc *oPCommands) loop() {
defer func() {
logger.Info("OP commands is stopped")
opc.doneChan <- struct{}{}
}()
tk := time.NewTicker(commandSweepTickerTime)
defer tk.Stop()
for {
select {
case <-tk.C:
opc.sweepCommands()
case <-opc.context.Done():
return
case <-opc.stopChan:
return
}
}
}
func (opc *oPCommands) sweepCommands() {
opc.lock.Lock()
defer opc.lock.Unlock()
for k, v := range opc.commands {
if time.Unix(v.fireTime, 0).Add(commandValidTime).After(time.Now()) {
delete(opc.commands, k)
}
}
}

View File

@ -57,7 +57,8 @@ type RedisJobStatsManager struct {
doneChan chan struct{}
processChan chan *queueItem
isRunning *atomic.Value
hookStore *HookStore //cache the hook here to avoid requesting backend
hookStore *HookStore //cache the hook here to avoid requesting backend
opCommands *oPCommands //maintain the OP commands
}
//NewRedisJobStatsManager is constructor of RedisJobStatsManager
@ -74,6 +75,7 @@ func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *r
processChan: make(chan *queueItem, processBufferSize),
hookStore: NewHookStore(),
isRunning: isRunning,
opCommands: newOPCommands(ctx, namespace, redisPool),
}
}
@ -83,6 +85,7 @@ func (rjs *RedisJobStatsManager) Start() {
return
}
go rjs.loop()
rjs.opCommands.Start()
rjs.isRunning.Store(true)
logger.Info("Redis job stats manager is started")
@ -97,6 +100,8 @@ func (rjs *RedisJobStatsManager) Shutdown() {
if !(rjs.isRunning.Load().(bool)) {
return
}
rjs.opCommands.Stop()
rjs.stopChan <- struct{}{}
<-rjs.doneChan
}
@ -213,7 +218,12 @@ func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error
return errors.New("unknown command")
}
return rjs.writeCtlCommand(jobID, command)
if err := rjs.opCommands.Fire(jobID, command); err != nil {
return err
}
//Directly add to op commands maintaining list
return rjs.opCommands.Push(jobID, command)
}
//CheckIn mesage
@ -239,7 +249,12 @@ func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) {
return "", errors.New("empty job ID")
}
return rjs.getCrlCommand(jobID)
c, ok := rjs.opCommands.Pop(jobID)
if !ok {
return "", fmt.Errorf("no OP command fired to job %s", jobID)
}
return c, nil
}
//DieAt marks the failed jobs with the time they put into dead queue.
@ -262,7 +277,7 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa
return errors.New("empty job ID")
}
if utils.IsEmptyStr(hookURL) {
if !utils.IsValidURL(hookURL) {
return errors.New("invalid hook url")
}
@ -302,7 +317,7 @@ func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status,
if !ok {
//Retrieve from backend
hookURL, err = rjs.getHook(jobID)
if err != nil {
if err != nil || !utils.IsValidURL(hookURL) {
//logged and exit
logger.Warningf("no status hook found for job %s\n, abandon status reporting", jobID)
return
@ -328,45 +343,6 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che
return DefaultHookClient.ReportStatus(hookURL, reportingStatus)
}
func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
cmd, err := redis.String(conn.Do("HGET", key, "command"))
if err != nil {
return "", err
}
//try to DEL it after getting the command
//Ignore the error,leave it as dirty data
_, err = conn.Do("DEL", key)
if err != nil {
//only logged
logger.Errorf("del key %s failed with error: %s\n", key, err)
}
return cmd, nil
}
func (rjs *RedisJobStatsManager) writeCtlCommand(jobID string, command string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobCtlCommands(rjs.namespace, jobID)
args := make([]interface{}, 0, 5)
args = append(args, key, "command", command, "fire_time", time.Now().Unix())
if err := conn.Send("HMSET", args...); err != nil {
return err
}
expireTime := 24*60*60 + rand.Int63n(10)
if err := conn.Send("EXPIRE", key, expireTime); err != nil {
return err
}
return conn.Flush()
}
func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error {
conn := rjs.redisPool.Get()
defer conn.Close()

View File

@ -87,11 +87,6 @@ func TestCommand(t *testing.T) {
t.Fatalf("expect '%s' but got '%s'", CtlCommandStop, cmd)
}
}
key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil {
t.Fatal(err)
}
}
func TestDieAt(t *testing.T) {

View File

@ -9,6 +9,8 @@ import (
"sync"
"time"
"github.com/vmware/harbor/src/jobservice/errs"
"github.com/robfig/cron"
"github.com/garyburd/redigo/redis"
@ -156,6 +158,10 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
}
score, err := rps.getScoreByID(cronJobPolicyID)
if err == redis.ErrNil {
return errs.NoObjectFoundError(err.Error())
}
if err != nil {
return err
}

View File

@ -89,6 +89,9 @@ func (ms *MessageServer) Start() error {
dt, _ := json.Marshal(m.Data)
json.Unmarshal(dt, hookObject)
converted = hookObject
case opm.EventFireCommand:
//no need to convert []string
converted = m.Data
}
res := callback.Call([]reflect.Value{reflect.ValueOf(converted)})
e := res[0].Interface()

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
"time"
@ -141,6 +142,56 @@ func TestPublishHook(t *testing.T) {
ms.Start()
}
func TestPublishCommands(t *testing.T) {
ms, cancel := createMessageServer()
err := ms.Subscribe(opm.EventFireCommand, func(data interface{}) error {
cmds, ok := data.([]string)
if !ok {
t.Fatal("expect fired command but got other thing")
return errors.New("expect fired command but got other thing")
}
if len(cmds) != 2 {
t.Fatalf("expect a array with 2 items but only got '%d' items", len(cmds))
return fmt.Errorf("expect a array with 2 items but only got '%d' items", len(cmds))
}
if cmds[1] != "stop" {
t.Fatalf("expect command 'stop' but got '%s'", cmds[1])
return fmt.Errorf("expect command 'stop' but got '%s'", cmds[1])
}
return nil
})
if err != nil {
t.Fatal(err)
}
go func() {
defer cancel()
<-time.After(200 * time.Millisecond)
notification := &models.Message{
Event: opm.EventRegisterStatusHook,
Data: []string{"fake_job_ID", "stop"},
}
rawJSON, err := json.Marshal(notification)
if err != nil {
t.Fatal(err)
}
conn := redisPool.Get()
defer conn.Close()
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON)
if err != nil {
t.Fatal(err)
}
//hold for a while
<-time.After(200 * time.Millisecond)
}()
ms.Start()
}
func createMessageServer() (*MessageServer, context.CancelFunc) {
ns := tests.GiveMeTestNamespace()
ctx, cancel := context.WithCancel(context.Background())

View File

@ -123,6 +123,12 @@ func (gcwp *GoCraftWorkPool) Start() {
}); err != nil {
return
}
if err = gcwp.messageServer.Subscribe(opm.EventFireCommand,
func(data interface{}) error {
return gcwp.handleOPCommandFiring(data)
}); err != nil {
return
}
//Start message server
if err = gcwp.messageServer.Start(); err != nil {
@ -323,8 +329,6 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
return models.JobPoolStats{}, err
}
fmt.Printf("hbs=%+#v\n", hbs[0])
//Find the heartbeat of this pool via pid
stats := make([]*models.JobPoolStatsData, 0)
for _, hb := range hbs {
@ -367,9 +371,14 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
return err
}
needSetStopStatus := false
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
//nothing need to do
//Only running job can be stopped
if theJob.Stats.Status != job.JobStatusRunning {
return fmt.Errorf("job '%s' is not a running job", jobID)
}
case job.JobKindScheduled:
//we need to delete the scheduled job in the queue if it is not running yet
//otherwise, nothing need to do
@ -377,6 +386,7 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
if err := gcwp.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil {
return err
}
needSetStopStatus = true
}
case job.JobKindPeriodic:
//firstly delete the periodic job policy
@ -390,6 +400,8 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
//only logged
logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
}
needSetStopStatus = true
default:
break
}
@ -400,6 +412,13 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop); err != nil {
return err
}
//The job running instance will set the status to 'stopped'
needSetStopStatus = false
}
//If needed, update the job status to 'stopped'
if needSetStopStatus {
gcwp.statsManager.SetJobStatus(jobID, job.JobStatusStopped)
}
return nil
@ -475,8 +494,8 @@ func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error {
return errors.New("empty job ID")
}
if utils.IsEmptyStr(hookURL) {
return errors.New("empty hook url")
if !utils.IsValidURL(hookURL) {
return errors.New("invalid hook url")
}
return gcwp.statsManager.RegisterHook(jobID, hookURL, false)
@ -550,6 +569,24 @@ func (gcwp *GoCraftWorkPool) handleRegisterStatusHook(data interface{}) error {
return gcwp.statsManager.RegisterHook(hook.JobID, hook.HookURL, true)
}
func (gcwp *GoCraftWorkPool) handleOPCommandFiring(data interface{}) error {
if data == nil {
return errors.New("nil data interface")
}
commands, ok := data.([]interface{})
if !ok || len(commands) != 2 {
return errors.New("malformed op commands object")
}
jobID, ok := commands[0].(string)
command, ok := commands[1].(string)
if !ok {
return errors.New("malformed op command info")
}
return gcwp.statsManager.SendCommand(jobID, command)
}
//log the job
func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
logger.Infof("Job incoming: %s:%s", job.Name, job.ID)

View File

@ -5,6 +5,7 @@ package utils
import (
"errors"
"net/url"
"os"
"strings"
@ -57,6 +58,19 @@ func IsValidPort(port uint) bool {
return port != 0 && port < 65536
}
//IsValidURL validates if the url is well-formted
func IsValidURL(address string) bool {
if IsEmptyStr(address) {
return false
}
if _, err := url.Parse(address); err != nil {
return false
}
return true
}
//JobScore represents the data item with score in the redis db.
type JobScore struct {
JobBytes []byte

View File

@ -15,6 +15,8 @@
package target
import (
"fmt"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils"
@ -42,7 +44,7 @@ func (d *DefaultManager) GetTarget(id int64) (*models.RepTarget, error) {
}
if target == nil {
return nil, nil
return nil, fmt.Errorf("target '%d' does not exist", id)
}
// decrypt the password