Handle the retention task status updating in concurrency

Compare the status code when updating retention task status to avoid the concurrent issue

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-08-15 09:50:53 +08:00
parent 032d57d8b2
commit 48b067f596
9 changed files with 107 additions and 51 deletions

View File

@ -131,6 +131,7 @@ create table retention_task
repository varchar(255),
job_id varchar(64),
status varchar(32),
status_code integer,
start_time timestamp default CURRENT_TIMESTAMP,
end_time timestamp default CURRENT_TIMESTAMP,
total integer,

View File

@ -24,6 +24,7 @@ import (
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api"
jjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/replication"
@ -109,42 +110,52 @@ func (h *Handler) HandleReplicationTask() {
// HandleRetentionTask handles the webhook of retention task
func (h *Handler) HandleRetentionTask() {
log.Debugf("received retention task status update event: task-%d, status-%s", h.id, h.status)
taskID := h.id
status := h.rawStatus
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
mgr := &retention.DefaultManager{}
props := []string{"Status"}
task := &retention.Task{
ID: h.id,
Status: h.status,
}
if h.status == models.JobFinished || h.status == models.JobError ||
h.status == models.JobStopped {
task.EndTime = time.Now()
props = append(props, "EndTime")
} else if h.status == models.JobRunning {
// handle checkin
if h.checkIn != "" {
var retainObj struct {
Total int `json:"total"`
Retained int `json:"retained"`
}
if err := json.Unmarshal([]byte(h.checkIn), &retainObj); err != nil {
log.Errorf("failed to resolve checkin of retention task %d: %v", h.id, err)
} else {
if retainObj.Total > 0 {
task.Total = retainObj.Total
props = append(props, "Total")
log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
return
}
if retainObj.Retained > 0 {
task.Retained = retainObj.Retained
props = append(props, "Retained")
task := &retention.Task{
ID: taskID,
Total: retainObj.Total,
Retained: retainObj.Retained,
}
}
}
}
if err := mgr.UpdateTask(task, props...); err != nil {
log.Errorf("failed to update the status of retention task %d: %v", h.id, err)
if err := mgr.UpdateTask(task, "Total", "Retained"); err != nil {
log.Errorf("failed to update of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
return
}
// handle status updating
if err := mgr.UpdateTaskStatus(taskID, status); err != nil {
log.Errorf("failed to update the status of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
// if the status is the final status, update the end time
if status == jjob.StoppedStatus.String() || status == jjob.SuccessStatus.String() ||
status == jjob.ErrorStatus.String() {
task := &retention.Task{
ID: taskID,
EndTime: time.Now(),
}
if err := mgr.UpdateTask(task, "EndTime"); err != nil {
log.Errorf("failed to update of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
}
}
// HandleNotificationJob handles the hook of notification job

View File

@ -54,6 +54,7 @@ type RetentionTask struct {
Repository string `orm:"column(repository)"`
JobID string `orm:"column(job_id)"`
Status string `orm:"column(status)"`
StatusCode int `orm:"column(status_code)"`
StartTime time.Time `orm:"column(start_time)"`
EndTime time.Time `orm:"column(end_time)"`
Total int `orm:"column(total)"`

View File

@ -3,12 +3,13 @@ package dao
import (
"errors"
"fmt"
"strconv"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/dao"
jobmodels "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/q"
"strconv"
)
// CreatePolicy Create Policy
@ -228,6 +229,18 @@ func UpdateTask(task *models.RetentionTask, cols ...string) error {
return err
}
// UpdateTaskStatus updates the status of task whose status code is less than the statusCode provided
func UpdateTaskStatus(taskID int64, status string, statusCode int) error {
_, err := dao.GetOrmer().QueryTable(&models.RetentionTask{}).
Filter("ID", taskID).
Filter("StatusCode__lt", statusCode).
Update(orm.Params{
"Status": status,
"StatusCode": statusCode,
})
return err
}
// DeleteTask deletes the task record specified by ID in database
func DeleteTask(id int64) error {
_, err := dao.GetOrmer().Delete(&models.RetentionTask{

View File

@ -194,8 +194,12 @@ func TestTask(t *testing.T) {
// update
task.ID = id
task.Status = "running"
err = UpdateTask(task, "Status")
task.Total = 1
err = UpdateTask(task, "Total")
require.Nil(t, err)
// update status
err = UpdateTaskStatus(id, "running", 1)
require.Nil(t, err)
// list
@ -205,8 +209,10 @@ func TestTask(t *testing.T) {
})
require.Nil(t, err)
require.Equal(t, 1, len(tasks))
assert.Equal(t, 1, tasks[0].Total)
assert.Equal(t, int64(1), tasks[0].ExecutionID)
assert.Equal(t, "running", tasks[0].Status)
assert.Equal(t, 1, tasks[0].StatusCode)
// delete
err = DeleteTask(id)

View File

@ -126,6 +126,9 @@ func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) {
func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error {
return nil
}
func (f *fakeRetentionManager) UpdateTaskStatus(int64, string) error {
return nil
}
func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) {
return nil, nil
}

View File

@ -21,7 +21,8 @@ import (
"time"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/job"
cjob "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/dao"
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/policy"
@ -58,6 +59,12 @@ type Manager interface {
CreateTask(task *Task) (int64, error)
// Update the specified task
UpdateTask(task *Task, cols ...string) error
// Update the status of the specified task
// The status is updated only when it is behind the one stored
// in the database.
// e.g. if the status is running but the status stored
// in database is failed, the updating doesn't take effect
UpdateTaskStatus(taskID int64, status string) error
// Get the task specified by the task ID
GetTask(taskID int64) (*Task, error)
// Get the log of the specified task
@ -217,6 +224,7 @@ func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
Repository: t.Repository,
JobID: t.JobID,
Status: t.Status,
StatusCode: t.StatusCode,
StartTime: t.StartTime,
EndTime: t.EndTime,
Total: t.Total,
@ -252,6 +260,15 @@ func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error {
}, cols...)
}
// UpdateTaskStatus updates the status of the specified task
func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string) error {
if taskID <= 0 {
return fmt.Errorf("invalid task ID: %d", taskID)
}
st := job.Status(status)
return dao.UpdateTaskStatus(taskID, status, st.Code())
}
// GetTask returns the task specified by task ID
func (d *DefaultManager) GetTask(taskID int64) (*Task, error) {
if taskID <= 0 {
@ -267,6 +284,7 @@ func (d *DefaultManager) GetTask(taskID int64) (*Task, error) {
Repository: task.Repository,
JobID: task.JobID,
Status: task.Status,
StatusCode: task.StatusCode,
StartTime: task.StartTime,
EndTime: task.EndTime,
Total: task.Total,
@ -283,7 +301,7 @@ func (d *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) {
if task == nil {
return nil, fmt.Errorf("task %d not found", taskID)
}
return job.GlobalClient.GetJobLog(task.JobID)
return cjob.GlobalClient.GetJobLog(task.JobID)
}
// NewManager ...

View File

@ -6,8 +6,8 @@ import (
"time"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/job"
jjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/q"
@ -171,7 +171,9 @@ func TestTask(t *testing.T) {
task := &Task{
ExecutionID: 1,
JobID: "1",
Status: TaskStatusPending,
Status: jjob.PendingStatus.String(),
StatusCode: jjob.PendingStatus.Code(),
Total: 0,
StartTime: time.Now(),
}
// create
@ -185,23 +187,29 @@ func TestTask(t *testing.T) {
// update
task.ID = id
task.Status = TaskStatusInProgress
err = m.UpdateTask(task, "Status")
task.Total = 1
err = m.UpdateTask(task, "Total")
require.Nil(t, err)
// update status to success which is a final status
err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String())
require.Nil(t, err)
// try to update status to running, as the status has already
// been updated to a final status, this updating shouldn't take effect
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String())
require.Nil(t, err)
// list
tasks, err := m.ListTasks(&q.TaskQuery{
ExecutionID: 1,
Status: TaskStatusInProgress,
})
require.Nil(t, err)
require.Equal(t, 1, len(tasks))
assert.Equal(t, int64(1), tasks[0].ExecutionID)
assert.Equal(t, TaskStatusInProgress, tasks[0].Status)
task.Status = TaskStatusFailed
err = m.UpdateTask(task, "Status")
require.Nil(t, err)
assert.Equal(t, 1, tasks[0].Total)
assert.Equal(t, jjob.SuccessStatus.String(), tasks[0].Status)
assert.Equal(t, jjob.SuccessStatus.Code(), tasks[0].StatusCode)
// get task log
job.GlobalClient = &tjob.MockJobClient{

View File

@ -23,12 +23,6 @@ const (
ExecutionStatusFailed string = "Failed"
ExecutionStatusStopped string = "Stopped"
TaskStatusPending string = "Pending"
TaskStatusInProgress string = "InProgress"
TaskStatusSucceed string = "Succeed"
TaskStatusFailed string = "Failed"
TaskStatusStopped string = "Stopped"
CandidateKindImage string = "image"
CandidateKindChart string = "chart"
@ -54,6 +48,7 @@ type Task struct {
Repository string `json:"repository"`
JobID string `json:"job_id"`
Status string `json:"status"`
StatusCode int `json:"status_code"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Total int `json:"total"`