Merge pull request #8794 from ywk253100/190822_retry_status

Add status revision to retention task to handle retrying
This commit is contained in:
Wang Yan 2019-08-23 10:54:35 +08:00 committed by GitHub
commit 35e786e54c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 135 additions and 113 deletions

View File

@ -133,6 +133,7 @@ create table retention_task
job_id varchar(64), job_id varchar(64),
status varchar(32), status varchar(32),
status_code integer, status_code integer,
status_revision integer,
start_time timestamp default CURRENT_TIMESTAMP, start_time timestamp default CURRENT_TIMESTAMP,
end_time timestamp default CURRENT_TIMESTAMP, end_time timestamp default CURRENT_TIMESTAMP,
total integer, total integer,

View File

@ -20,7 +20,6 @@ import (
"github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/job"
jobmodels "github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api" "github.com/goharbor/harbor/src/core/api"
@ -48,6 +47,7 @@ type Handler struct {
status string status string
rawStatus string rawStatus string
checkIn string checkIn string
revision int64
} }
// Prepare ... // Prepare ...
@ -60,7 +60,7 @@ func (h *Handler) Prepare() {
return return
} }
h.id = id h.id = id
var data jobmodels.JobStatusChange var data jjob.StatusChange
err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data) err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data)
if err != nil { if err != nil {
log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err) log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err)
@ -76,6 +76,9 @@ func (h *Handler) Prepare() {
} }
h.status = status h.status = status
h.checkIn = data.CheckIn h.checkIn = data.CheckIn
if data.Metadata != nil {
h.revision = data.Metadata.Revision
}
} }
// HandleScan handles the webhook of scan job // HandleScan handles the webhook of scan job
@ -138,24 +141,11 @@ func (h *Handler) HandleRetentionTask() {
} }
// handle status updating // handle status updating
if err := mgr.UpdateTaskStatus(taskID, status); err != nil { if err := mgr.UpdateTaskStatus(taskID, status, h.revision); err != nil {
log.Errorf("failed to update the status of retention task %d: %v", taskID, err) log.Errorf("failed to update the status of retention task %d: %v", taskID, err)
h.SendInternalServerError(err) h.SendInternalServerError(err)
return return
} }
// if the status is the final status, update the end time
if status == jjob.StoppedStatus.String() || status == jjob.SuccessStatus.String() ||
status == jjob.ErrorStatus.String() {
task := &retention.Task{
ID: taskID,
EndTime: time.Now(),
}
if err := mgr.UpdateTask(task, "EndTime"); err != nil {
log.Errorf("failed to update of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
}
} }
// HandleNotificationJob handles the hook of notification job // HandleNotificationJob handles the hook of notification job

View File

@ -49,14 +49,15 @@ type RetentionExecution struct {
// RetentionTask ... // RetentionTask ...
type RetentionTask struct { type RetentionTask struct {
ID int64 `orm:"pk;auto;column(id)"` ID int64 `orm:"pk;auto;column(id)"`
ExecutionID int64 `orm:"column(execution_id)"` ExecutionID int64 `orm:"column(execution_id)"`
Repository string `orm:"column(repository)"` Repository string `orm:"column(repository)"`
JobID string `orm:"column(job_id)"` JobID string `orm:"column(job_id)"`
Status string `orm:"column(status)"` Status string `orm:"column(status)"`
StatusCode int `orm:"column(status_code)"` StatusCode int `orm:"column(status_code)"` // For order the different statuses
StartTime time.Time `orm:"column(start_time)"` StatusRevision int64 `orm:"column(status_revision)"` // For differentiating the each retry of the same job
EndTime time.Time `orm:"column(end_time)"` StartTime time.Time `orm:"column(start_time)"`
Total int `orm:"column(total)"` EndTime time.Time `orm:"column(end_time)"`
Retained int `orm:"column(retained)"` Total int `orm:"column(total)"`
Retained int `orm:"column(retained)"`
} }

View File

@ -4,10 +4,11 @@ import (
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
"time"
"github.com/astaxie/beego/orm" "github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/dao"
jobmodels "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/dao/models" "github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/q" "github.com/goharbor/harbor/src/pkg/retention/q"
) )
@ -115,21 +116,17 @@ func fillStatus(exec *models.RetentionExecution) error {
} }
total += v total += v
switch k { switch k {
case jobmodels.JobScheduled: case job.ScheduledStatus.String():
running += v running += v
case jobmodels.JobPending: case job.PendingStatus.String():
running += v running += v
case jobmodels.JobRunning: case job.RunningStatus.String():
running += v running += v
case jobmodels.JobRetrying: case job.SuccessStatus.String():
running += v
case jobmodels.JobFinished:
succeed += v succeed += v
case jobmodels.JobCanceled: case job.StoppedStatus.String():
stopped += v stopped += v
case jobmodels.JobStopped: case job.ErrorStatus.String():
stopped += v
case jobmodels.JobError:
failed += v failed += v
} }
} }
@ -232,15 +229,26 @@ func UpdateTask(task *models.RetentionTask, cols ...string) error {
return err return err
} }
// UpdateTaskStatus updates the status of task whose status code is less than the statusCode provided // UpdateTaskStatus updates the status of task according to the status code and revision to avoid
func UpdateTaskStatus(taskID int64, status string, statusCode int) error { // override when running in concurrency
_, err := dao.GetOrmer().QueryTable(&models.RetentionTask{}). func UpdateTaskStatus(taskID int64, status string, statusCode int, statusRevision int64) error {
Filter("ID", taskID). params := []interface{}{}
Filter("StatusCode__lt", statusCode). // use raw sql rather than the ORM as the sql generated by ORM isn't a "single" statement
Update(orm.Params{ // which means the operation isn't atomic
"Status": status, sql := `update retention_task set status = ?, status_code = ?, status_revision = ?, end_time = ? `
"StatusCode": statusCode, params = append(params, status, statusCode, statusRevision)
}) var t time.Time
// when the task is in final status, update the endtime
// when the task re-runs again, the endtime should be cleared
// so set the endtime to null if the task isn't in final status
if IsFinalStatus(status) {
t = time.Now()
}
params = append(params, t)
sql += `where id = ? and
(status_revision = ? and status_code < ? or status_revision < ?) `
params = append(params, taskID, statusRevision, statusCode, statusRevision)
_, err := dao.GetOrmer().Raw(sql, params).Exec()
return err return err
} }
@ -292,3 +300,12 @@ func GetTotalOfTasks(executionID int64) (int64, error) {
qs = qs.Filter("ExecutionID", executionID) qs = qs.Filter("ExecutionID", executionID)
return qs.Count() return qs.Count()
} }
// IsFinalStatus checks whether the status is a final status
func IsFinalStatus(status string) bool {
if status == job.StoppedStatus.String() || status == job.SuccessStatus.String() ||
status == job.ErrorStatus.String() {
return true
}
return false
}

View File

@ -199,7 +199,7 @@ func TestTask(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
// update status // update status
err = UpdateTaskStatus(id, "running", 1) err = UpdateTaskStatus(id, "running", 1, 1)
require.Nil(t, err) require.Nil(t, err)
// list // list
@ -213,6 +213,7 @@ func TestTask(t *testing.T) {
assert.Equal(t, int64(1), tasks[0].ExecutionID) assert.Equal(t, int64(1), tasks[0].ExecutionID)
assert.Equal(t, "running", tasks[0].Status) assert.Equal(t, "running", tasks[0].Status)
assert.Equal(t, 1, tasks[0].StatusCode) assert.Equal(t, 1, tasks[0].StatusCode)
assert.Equal(t, int64(1), tasks[0].StatusRevision)
// delete // delete
err = DeleteTask(id) err = DeleteTask(id)

View File

@ -126,7 +126,7 @@ func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) {
func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error { func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error {
return nil return nil
} }
func (f *fakeRetentionManager) UpdateTaskStatus(int64, string) error { func (f *fakeRetentionManager) UpdateTaskStatus(int64, string, int64) error {
return nil return nil
} }
func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) { func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) {

View File

@ -60,11 +60,9 @@ type Manager interface {
// Update the specified task // Update the specified task
UpdateTask(task *Task, cols ...string) error UpdateTask(task *Task, cols ...string) error
// Update the status of the specified task // Update the status of the specified task
// The status is updated only when it is behind the one stored // The status is updated only when (the statusRevision > the current revision)
// in the database. // or (the the statusRevision = the current revision and status > the current status)
// e.g. if the status is running but the status stored UpdateTaskStatus(taskID int64, status string, statusRevision int64) error
// in database is failed, the updating doesn't take effect
UpdateTaskStatus(taskID int64, status string) error
// Get the task specified by the task ID // Get the task specified by the task ID
GetTask(taskID int64) (*Task, error) GetTask(taskID int64) (*Task, error)
// Get the log of the specified task // Get the log of the specified task
@ -195,14 +193,16 @@ func (d *DefaultManager) CreateTask(task *Task) (int64, error) {
return 0, errors.New("nil task") return 0, errors.New("nil task")
} }
t := &models.RetentionTask{ t := &models.RetentionTask{
ExecutionID: task.ExecutionID, ExecutionID: task.ExecutionID,
Repository: task.Repository, Repository: task.Repository,
JobID: task.JobID, JobID: task.JobID,
Status: task.Status, Status: task.Status,
StartTime: task.StartTime, StatusCode: task.StatusCode,
EndTime: task.EndTime, StatusRevision: task.StatusRevision,
Total: task.Total, StartTime: task.StartTime,
Retained: task.Retained, EndTime: task.EndTime,
Total: task.Total,
Retained: task.Retained,
} }
return dao.CreateTask(t) return dao.CreateTask(t)
} }
@ -219,16 +219,17 @@ func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
tasks := make([]*Task, 0) tasks := make([]*Task, 0)
for _, t := range ts { for _, t := range ts {
tasks = append(tasks, &Task{ tasks = append(tasks, &Task{
ID: t.ID, ID: t.ID,
ExecutionID: t.ExecutionID, ExecutionID: t.ExecutionID,
Repository: t.Repository, Repository: t.Repository,
JobID: t.JobID, JobID: t.JobID,
Status: t.Status, Status: t.Status,
StatusCode: t.StatusCode, StatusCode: t.StatusCode,
StartTime: t.StartTime, StatusRevision: t.StatusRevision,
EndTime: t.EndTime, StartTime: t.StartTime,
Total: t.Total, EndTime: t.EndTime,
Retained: t.Retained, Total: t.Total,
Retained: t.Retained,
}) })
} }
return tasks, nil return tasks, nil
@ -248,25 +249,27 @@ func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error {
return fmt.Errorf("invalid task ID: %d", task.ID) return fmt.Errorf("invalid task ID: %d", task.ID)
} }
return dao.UpdateTask(&models.RetentionTask{ return dao.UpdateTask(&models.RetentionTask{
ID: task.ID, ID: task.ID,
ExecutionID: task.ExecutionID, ExecutionID: task.ExecutionID,
Repository: task.Repository, Repository: task.Repository,
JobID: task.JobID, JobID: task.JobID,
Status: task.Status, Status: task.Status,
StartTime: task.StartTime, StatusCode: task.StatusCode,
EndTime: task.EndTime, StatusRevision: task.StatusRevision,
Total: task.Total, StartTime: task.StartTime,
Retained: task.Retained, EndTime: task.EndTime,
Total: task.Total,
Retained: task.Retained,
}, cols...) }, cols...)
} }
// UpdateTaskStatus updates the status of the specified task // UpdateTaskStatus updates the status of the specified task
func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string) error { func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusRevision int64) error {
if taskID <= 0 { if taskID <= 0 {
return fmt.Errorf("invalid task ID: %d", taskID) return fmt.Errorf("invalid task ID: %d", taskID)
} }
st := job.Status(status) st := job.Status(status)
return dao.UpdateTaskStatus(taskID, status, st.Code()) return dao.UpdateTaskStatus(taskID, status, st.Code(), statusRevision)
} }
// GetTask returns the task specified by task ID // GetTask returns the task specified by task ID
@ -279,16 +282,17 @@ func (d *DefaultManager) GetTask(taskID int64) (*Task, error) {
return nil, err return nil, err
} }
return &Task{ return &Task{
ID: task.ID, ID: task.ID,
ExecutionID: task.ExecutionID, ExecutionID: task.ExecutionID,
Repository: task.Repository, Repository: task.Repository,
JobID: task.JobID, JobID: task.JobID,
Status: task.Status, Status: task.Status,
StatusCode: task.StatusCode, StatusCode: task.StatusCode,
StartTime: task.StartTime, StatusRevision: task.StatusRevision,
EndTime: task.EndTime, StartTime: task.StartTime,
Total: task.Total, EndTime: task.EndTime,
Retained: task.Retained, Total: task.Total,
Retained: task.Retained,
}, nil }, nil
} }

View File

@ -171,12 +171,13 @@ func TestTask(t *testing.T) {
err := m.DeleteExecution(1000) err := m.DeleteExecution(1000)
require.Nil(t, err) require.Nil(t, err)
task := &Task{ task := &Task{
ExecutionID: 1000, ExecutionID: 1000,
JobID: "1", JobID: "1",
Status: jjob.PendingStatus.String(), Status: jjob.PendingStatus.String(),
StatusCode: jjob.PendingStatus.Code(), StatusCode: jjob.PendingStatus.Code(),
Total: 0, StatusRevision: 1,
StartTime: time.Now(), Total: 0,
StartTime: time.Now(),
} }
// create // create
id, err := m.CreateTask(task) id, err := m.CreateTask(task)
@ -194,12 +195,17 @@ func TestTask(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
// update status to success which is a final status // update status to success which is a final status
err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String()) err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String(), 1)
require.Nil(t, err) require.Nil(t, err)
// try to update status to running, as the status has already // try to update status to running, as the status has already
// been updated to a final status, this updating shouldn't take effect // been updated to a final status and the stautus revision doesn't change,
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String()) // this updating shouldn't take effect
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 1)
require.Nil(t, err)
// update the revision and try to update status to running again
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 2)
require.Nil(t, err) require.Nil(t, err)
// list // list
@ -210,8 +216,9 @@ func TestTask(t *testing.T) {
require.Equal(t, 1, len(tasks)) require.Equal(t, 1, len(tasks))
assert.Equal(t, int64(1000), tasks[0].ExecutionID) assert.Equal(t, int64(1000), tasks[0].ExecutionID)
assert.Equal(t, 1, tasks[0].Total) assert.Equal(t, 1, tasks[0].Total)
assert.Equal(t, jjob.SuccessStatus.String(), tasks[0].Status) assert.Equal(t, jjob.RunningStatus.String(), tasks[0].Status)
assert.Equal(t, jjob.SuccessStatus.Code(), tasks[0].StatusCode) assert.Equal(t, jjob.RunningStatus.Code(), tasks[0].StatusCode)
assert.Equal(t, int64(2), tasks[0].StatusRevision)
// get task log // get task log
job.GlobalClient = &tjob.MockJobClient{ job.GlobalClient = &tjob.MockJobClient{

View File

@ -43,16 +43,17 @@ type Execution struct {
// Task of retention // Task of retention
type Task struct { type Task struct {
ID int64 `json:"id"` ID int64 `json:"id"`
ExecutionID int64 `json:"execution_id"` ExecutionID int64 `json:"execution_id"`
Repository string `json:"repository"` Repository string `json:"repository"`
JobID string `json:"job_id"` JobID string `json:"job_id"`
Status string `json:"status"` Status string `json:"status"`
StatusCode int `json:"status_code"` StatusCode int `json:"status_code"`
StartTime time.Time `json:"start_time"` StatusRevision int64 `json:"status_revision"`
EndTime time.Time `json:"end_time"` StartTime time.Time `json:"start_time"`
Total int `json:"total"` EndTime time.Time `json:"end_time"`
Retained int `json:"retained"` Total int `json:"total"`
Retained int `json:"retained"`
} }
// History of retention // History of retention