From 7924f37d8699e246cf422d8046a8bef24136950e Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Mon, 26 Aug 2019 10:30:23 +0800 Subject: [PATCH] Add status revision to handle retrying in replication task Add status revision to handle retrying in replication task Signed-off-by: Wenkai Yin --- .../postgresql/0010_1.9.0_schema.up.sql | 2 + src/core/api/replication_execution_test.go | 2 +- src/core/api/replication_policy.go | 29 +------- .../service/notifications/jobs/handler.go | 2 +- src/replication/dao/execution.go | 30 +++----- src/replication/dao/execution_test.go | 73 ++++++++++--------- src/replication/dao/models/execution.go | 21 +++--- src/replication/event/handler_test.go | 2 +- src/replication/operation/controller.go | 32 +++++++- src/replication/operation/controller_test.go | 4 +- .../operation/execution/execution.go | 6 +- .../operation/execution/execution_test.go | 17 +++-- src/replication/operation/flow/stage.go | 10 ++- src/replication/operation/flow/stage_test.go | 2 +- src/replication/operation/hook/task.go | 4 +- src/replication/operation/hook/task_test.go | 4 +- 16 files changed, 121 insertions(+), 119 deletions(-) diff --git a/make/migrations/postgresql/0010_1.9.0_schema.up.sql b/make/migrations/postgresql/0010_1.9.0_schema.up.sql index 80725fbe4..052b5de6b 100644 --- a/make/migrations/postgresql/0010_1.9.0_schema.up.sql +++ b/make/migrations/postgresql/0010_1.9.0_schema.up.sql @@ -182,3 +182,5 @@ create table notification_policy ( update_time timestamp default CURRENT_TIMESTAMP, PRIMARY KEY (id) ); + +ALTER TABLE replication_task ADD COLUMN status_revision int DEFAULT 0; \ No newline at end of file diff --git a/src/core/api/replication_execution_test.go b/src/core/api/replication_execution_test.go index c6ed3fdfb..a4d8409cd 100644 --- a/src/core/api/replication_execution_test.go +++ b/src/core/api/replication_execution_test.go @@ -66,7 +66,7 @@ func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) { } return nil, nil } -func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { +func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error { return nil } func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) { diff --git a/src/core/api/replication_policy.go b/src/core/api/replication_policy.go index feb23397e..fe50b5d59 100644 --- a/src/core/api/replication_policy.go +++ b/src/core/api/replication_policy.go @@ -239,9 +239,6 @@ func (r *ReplicationPolicyAPI) Delete() { } } -// the execution's status will not be updated if it is not queried -// so need to check the status of tasks to determine the status of -// the execution func hasRunningExecutions(policyID int64) (bool, error) { _, executions, err := replication.OperationCtl.ListExecutions(&models.ExecutionQuery{ PolicyID: policyID, @@ -253,35 +250,11 @@ func hasRunningExecutions(policyID int64) (bool, error) { if execution.Status != models.ExecutionStatusInProgress { continue } - _, tasks, err := replication.OperationCtl.ListTasks(&models.TaskQuery{ - ExecutionID: execution.ID, - }) - if err != nil { - return false, err - } - for _, task := range tasks { - if isTaskRunning(task) { - return true, nil - } - } + return true, nil } return false, nil } -// return true if the status of the task is running or pending -func isTaskRunning(task *models.Task) bool { - if task == nil { - return false - } - switch task.Status { - case models.TaskStatusSucceed, - models.TaskStatusStopped, - models.TaskStatusFailed: - return false - } - return true -} - // ignore the credential for the registries func populateRegistries(registryMgr registry.Manager, policy *model.Policy) error { if err := event.PopulateRegistries(registryMgr, policy); err != nil { diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index bfd79c889..cd6092469 100755 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -104,7 +104,7 @@ func (h *Handler) HandleReplicationScheduleJob() { // HandleReplicationTask handles the webhook of replication task func (h *Handler) HandleReplicationTask() { log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status) - if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus); err != nil { + if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus, h.revision); err != nil { log.Errorf("failed to update the status of the replication task %d: %v", h.id, err) h.SendInternalServerError(err) return diff --git a/src/replication/dao/execution.go b/src/replication/dao/execution.go index 1fbd54ba2..1833d7e3e 100644 --- a/src/replication/dao/execution.go +++ b/src/replication/dao/execution.go @@ -131,12 +131,6 @@ func fillExecution(execution *models.Execution) error { } resetExecutionStatus(execution) - // if execution status changed to a final status, store to DB - if executionFinished(execution.Status) { - UpdateExecution(execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.InProgress, - models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Failed, models.ExecutionPropsName.Stopped, - models.ExecutionPropsName.EndTime, models.ExecutionPropsName.Total) - } return nil } @@ -329,23 +323,21 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) { // WHERE "id" IN ( SELECT T0."id" FROM "replication_task" T0 WHERE T0."id" = $3 // AND T0."status" IN ($4, $5, $6))]` // which is not a "single" sql statement, this will cause issues when running in concurrency -func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) { +func UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) (int64, error) { params := []interface{}{} - sql := `update replication_task set status = ? ` - params = append(params, status) - + sql := `update replication_task set status = ?, status_revision = ?, end_time = ? ` + params = append(params, status, statusRevision) + var t time.Time + // when the task is in final status, update the endtime + // when the task re-runs again, the endtime should be cleared + // so set the endtime to null if the task isn't in final status if taskFinished(status) { - // should update endTime - sql += `, end_time = ? ` - params = append(params, time.Now()) + t = time.Now() } + params = append(params, t) - sql += `where id = ? ` - params = append(params, id) - if len(statusCondition) > 0 { - sql += fmt.Sprintf(`and status in (%s) `, dao.ParamPlaceholderForIn(len(statusCondition))) - params = append(params, statusCondition) - } + sql += fmt.Sprintf(`where id = ? and (status_revision < ? or status_revision = ? and status in (%s)) `, dao.ParamPlaceholderForIn(len(statusCondition))) + params = append(params, id, statusRevision, statusRevision, statusCondition) result, err := dao.GetOrmer().Raw(sql, params...).Exec() if err != nil { diff --git a/src/replication/dao/execution_test.go b/src/replication/dao/execution_test.go index faa16e1cc..5d41efb9e 100644 --- a/src/replication/dao/execution_test.go +++ b/src/replication/dao/execution_test.go @@ -93,23 +93,25 @@ func TestMethodOfExecution(t *testing.T) { func TestMethodOfTask(t *testing.T) { now := time.Now() task1 := &models.Task{ - ExecutionID: 112200, - ResourceType: "resourceType1", - SrcResource: "srcResource1", - DstResource: "dstResource1", - JobID: "jobID1", - Status: "Initialized", - StartTime: &now, + ExecutionID: 112200, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Initialized", + StatusRevision: 1, + StartTime: &now, } task2 := &models.Task{ - ExecutionID: 112200, - ResourceType: "resourceType2", - SrcResource: "srcResource2", - DstResource: "dstResource2", - JobID: "jobID2", - Status: "Stopped", - StartTime: &now, - EndTime: &now, + ExecutionID: 112200, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StatusRevision: 1, + StartTime: &now, + EndTime: &now, } // test add @@ -151,11 +153,12 @@ func TestMethodOfTask(t *testing.T) { assert.Equal(t, int64(1), n) // test update status - n, err = UpdateTaskStatus(id1, "Succeed") + n, err = UpdateTaskStatus(id1, "Succeed", 2, "Initialized") require.Nil(t, err) assert.Equal(t, int64(1), n) task, _ = GetTask(id1) assert.Equal(t, "Succeed", task.Status) + assert.Equal(t, int64(2), task.StatusRevision) // test delete require.Nil(t, DeleteTask(id1)) @@ -237,25 +240,27 @@ func TestExecutionFill2(t *testing.T) { } executionID, _ := AddExecution(execution) task1 := &models.Task{ - ID: 20191, - ExecutionID: executionID, - ResourceType: "resourceType1", - SrcResource: "srcResource1", - DstResource: "dstResource1", - JobID: "jobID1", - Status: models.TaskStatusInProgress, - StartTime: &now, + ID: 20191, + ExecutionID: executionID, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: models.TaskStatusInProgress, + StatusRevision: 1, + StartTime: &now, } task2 := &models.Task{ - ID: 20192, - ExecutionID: executionID, - ResourceType: "resourceType2", - SrcResource: "srcResource2", - DstResource: "dstResource2", - JobID: "jobID2", - Status: "Stopped", - StartTime: &now, - EndTime: &now, + ID: 20192, + ExecutionID: executionID, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StatusRevision: 1, + StartTime: &now, + EndTime: &now, } taskID1, _ := AddTask(task1) AddTask(task2) @@ -275,7 +280,7 @@ func TestExecutionFill2(t *testing.T) { assert.Equal(t, 0, exe.Succeed) // update task status and query and fill - UpdateTaskStatus(taskID1, models.TaskStatusFailed) + UpdateTaskStatus(taskID1, models.TaskStatusFailed, 2, models.TaskStatusInProgress) exes, err := GetExecutions(&models.ExecutionQuery{ PolicyID: 11209, }) diff --git a/src/replication/dao/models/execution.go b/src/replication/dao/models/execution.go index eb9a06057..95c5386ba 100644 --- a/src/replication/dao/models/execution.go +++ b/src/replication/dao/models/execution.go @@ -109,16 +109,17 @@ type TaskFieldsName struct { // Task represent the tasks in one execution. type Task struct { - ID int64 `orm:"pk;auto;column(id)" json:"id"` - ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` - ResourceType string `orm:"column(resource_type)" json:"resource_type"` - SrcResource string `orm:"column(src_resource)" json:"src_resource"` - DstResource string `orm:"column(dst_resource)" json:"dst_resource"` - Operation string `orm:"column(operation)" json:"operation"` - JobID string `orm:"column(job_id)" json:"job_id"` - Status string `orm:"column(status)" json:"status"` - StartTime *time.Time `orm:"column(start_time)" json:"start_time"` - EndTime *time.Time `orm:"column(end_time)" json:"end_time,omitempty"` + ID int64 `orm:"pk;auto;column(id)" json:"id"` + ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` + ResourceType string `orm:"column(resource_type)" json:"resource_type"` + SrcResource string `orm:"column(src_resource)" json:"src_resource"` + DstResource string `orm:"column(dst_resource)" json:"dst_resource"` + Operation string `orm:"column(operation)" json:"operation"` + JobID string `orm:"column(job_id)" json:"job_id"` + Status string `orm:"column(status)" json:"status"` + StatusRevision int64 `orm:"column(status_revision)"` + StartTime *time.Time `orm:"column(start_time)" json:"start_time"` + EndTime *time.Time `orm:"column(end_time)" json:"end_time,omitempty"` } // TableName is required by by beego orm to map Execution to table replication_execution diff --git a/src/replication/event/handler_test.go b/src/replication/event/handler_test.go index 372a198eb..1edabac11 100644 --- a/src/replication/event/handler_test.go +++ b/src/replication/event/handler_test.go @@ -44,7 +44,7 @@ func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*mo func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) { return nil, nil } -func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { +func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error { return nil } func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) { diff --git a/src/replication/operation/controller.go b/src/replication/operation/controller.go index 35d6e84fe..ab419f581 100644 --- a/src/replication/operation/controller.go +++ b/src/replication/operation/controller.go @@ -17,6 +17,7 @@ package operation import ( "fmt" "regexp" + "strings" "time" "github.com/goharbor/harbor/src/common/job" @@ -39,7 +40,7 @@ type Controller interface { GetExecution(int64) (*models.Execution, error) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) GetTask(int64) (*models.Task, error) - UpdateTaskStatus(id int64, status string, statusCondition ...string) error + UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error GetTaskLog(int64) ([]byte, error) } @@ -50,6 +51,7 @@ const ( var ( statusBehindErrorPattern = "mismatch job status for stopping job: .*, job status (.*) is behind Running" statusBehindErrorReg = regexp.MustCompile(statusBehindErrorPattern) + jobNotFoundErrorMsg = "object is not found" ) // NewController returns a controller implementation @@ -169,7 +171,10 @@ func (c *controller) StopReplication(executionID int64) error { case hjob.SuccessStatus: status = models.TaskStatusSucceed } - e := c.executionMgr.UpdateTaskStatus(task.ID, status) + e := c.executionMgr.UpdateTask(&models.Task{ + ID: task.ID, + Status: status, + }, "Status") if e != nil { log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e) } else { @@ -177,6 +182,18 @@ func (c *controller) StopReplication(executionID int64) error { } continue } + if isJobNotFoundError(err) { + e := c.executionMgr.UpdateTask(&models.Task{ + ID: task.ID, + Status: models.ExecutionStatusStopped, + }, "Status") + if e != nil { + log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e) + } else { + log.Debugf("got job not found error for task %d, update it's status to %s directly", task.ID, models.ExecutionStatusStopped) + } + continue + } log.Errorf("failed to stop the task %d(job ID: %s): %v", task.ID, task.JobID, err) continue } @@ -209,6 +226,13 @@ func isStatusBehindError(err error) (string, bool) { return strs[1], true } +func isJobNotFoundError(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), jobNotFoundErrorMsg) +} + func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { return c.executionMgr.List(query...) } @@ -221,8 +245,8 @@ func (c *controller) ListTasks(query ...*models.TaskQuery) (int64, []*models.Tas func (c *controller) GetTask(id int64) (*models.Task, error) { return c.executionMgr.GetTask(id) } -func (c *controller) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { - return c.executionMgr.UpdateTaskStatus(id, status, statusCondition...) +func (c *controller) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error { + return c.executionMgr.UpdateTaskStatus(id, status, statusRevision, statusCondition...) } func (c *controller) GetTaskLog(taskID int64) ([]byte, error) { return c.executionMgr.GetTaskLog(taskID) diff --git a/src/replication/operation/controller_test.go b/src/replication/operation/controller_test.go index b6f4f5d77..778e9f5ea 100644 --- a/src/replication/operation/controller_test.go +++ b/src/replication/operation/controller_test.go @@ -75,7 +75,7 @@ func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { return nil } -func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { +func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, int64, ...string) error { return nil } func (f *fakedExecutionManager) RemoveTask(int64) error { @@ -333,7 +333,7 @@ func TestGetTask(t *testing.T) { } func TestUpdateTaskStatus(t *testing.T) { - err := ctl.UpdateTaskStatus(1, "running") + err := ctl.UpdateTaskStatus(1, "running", 1) require.Nil(t, err) } diff --git a/src/replication/operation/execution/execution.go b/src/replication/operation/execution/execution.go index 0d4db946c..f91f82c8d 100644 --- a/src/replication/operation/execution/execution.go +++ b/src/replication/operation/execution/execution.go @@ -50,7 +50,7 @@ type Manager interface { // UpdateTaskStatus only updates the task status. If "statusCondition" // presents, only the tasks whose status equal to "statusCondition" // will be updated - UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error + UpdateTaskStatus(taskID int64, status string, statusRevision int64, statusCondition ...string) error // Remove one task specified by task ID RemoveTask(int64) error // Remove all tasks of one execution specified by the execution ID @@ -151,8 +151,8 @@ func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error { } // UpdateTaskStatus ... -func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error { - if _, err := dao.UpdateTaskStatus(taskID, status, statusCondition...); err != nil { +func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusRevision int64, statusCondition ...string) error { + if _, err := dao.UpdateTaskStatus(taskID, status, statusRevision, statusCondition...); err != nil { return err } return nil diff --git a/src/replication/operation/execution/execution_test.go b/src/replication/operation/execution/execution_test.go index 65e35fd07..7262186da 100644 --- a/src/replication/operation/execution/execution_test.go +++ b/src/replication/operation/execution/execution_test.go @@ -76,13 +76,14 @@ func TestMethodOfExecutionManager(t *testing.T) { func TestMethodOfTaskManager(t *testing.T) { now := time.Now() task := &models.Task{ - ExecutionID: 112200, - ResourceType: "resourceType1", - SrcResource: "srcResource1", - DstResource: "dstResource1", - JobID: "jobID1", - Status: "Initialized", - StartTime: &now, + ExecutionID: 112200, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Initialized", + StatusRevision: 1, + StartTime: &now, } defer func() { @@ -121,7 +122,7 @@ func TestMethodOfTaskManager(t *testing.T) { assert.Equal(t, taskNew.SrcResource, taskUpdate.SrcResource) // UpdateTaskStatus - err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed) + err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed, 1, models.TaskStatusInitialized) require.Nil(t, err) taskUpdate, _ = executionManager.GetTask(id) assert.Equal(t, models.TaskStatusSucceed, taskUpdate.Status) diff --git a/src/replication/operation/flow/stage.go b/src/replication/operation/flow/stage.go index 27110f001..5048662e1 100644 --- a/src/replication/operation/flow/stage.go +++ b/src/replication/operation/flow/stage.go @@ -279,19 +279,23 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite for _, result := range results { // if the task is failed to be submitted, update the status of the // task as failure + now := time.Now() if result.Error != nil { log.Errorf("failed to schedule the task %d: %v", result.TaskID, result.Error) - if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil { + if err = executionMgr.UpdateTask(&models.Task{ + ID: result.TaskID, + Status: models.TaskStatusFailed, + EndTime: &now, + }, "Status", "EndTime"); err != nil { log.Errorf("failed to update the task status %d: %v", result.TaskID, err) } continue } allFailed = false // if the task is submitted successfully, update the status, job ID and start time - if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil { + if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, 0, models.TaskStatusInitialized); err != nil { log.Errorf("failed to update the task status %d: %v", result.TaskID, err) } - now := time.Now() if err = executionMgr.UpdateTask(&models.Task{ ID: result.TaskID, JobID: result.JobID, diff --git a/src/replication/operation/flow/stage_test.go b/src/replication/operation/flow/stage_test.go index 4863383df..b26857488 100644 --- a/src/replication/operation/flow/stage_test.go +++ b/src/replication/operation/flow/stage_test.go @@ -176,7 +176,7 @@ func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { return nil } -func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { +func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, int64, ...string) error { return nil } func (f *fakedExecutionManager) RemoveTask(int64) error { diff --git a/src/replication/operation/hook/task.go b/src/replication/operation/hook/task.go index 220763dbd..af512f7f2 100644 --- a/src/replication/operation/hook/task.go +++ b/src/replication/operation/hook/task.go @@ -21,7 +21,7 @@ import ( ) // UpdateTask update the status of the task -func UpdateTask(ctl operation.Controller, id int64, status string) error { +func UpdateTask(ctl operation.Controller, id int64, status string, statusRevision int64) error { jobStatus := job.Status(status) // convert the job status to task status s := "" @@ -43,5 +43,5 @@ func UpdateTask(ctl operation.Controller, id int64, status string) error { s = models.TaskStatusSucceed preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress) } - return ctl.UpdateTaskStatus(id, s, preStatus...) + return ctl.UpdateTaskStatus(id, s, statusRevision, preStatus...) } diff --git a/src/replication/operation/hook/task_test.go b/src/replication/operation/hook/task_test.go index 238eafb36..a06c44961 100644 --- a/src/replication/operation/hook/task_test.go +++ b/src/replication/operation/hook/task_test.go @@ -46,7 +46,7 @@ func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*mo func (f *fakedOperationController) GetTask(int64) (*models.Task, error) { return nil, nil } -func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { +func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error { f.status = status return nil } @@ -87,7 +87,7 @@ func TestUpdateTask(t *testing.T) { } for _, c := range cases { - err := UpdateTask(mgr, 1, c.inputStatus) + err := UpdateTask(mgr, 1, c.inputStatus, 1) require.Nil(t, err) assert.Equal(t, c.expectedStatus, mgr.status) }