From 6e07a62fe1e10313c9737d7e2165d542f16f44aa Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 8 May 2019 19:11:18 +0800 Subject: [PATCH] Fix #7579: cannot stop the execution during the initialization stage (#7721) Fix #7579: cannot stop the execution during the initialization stage Signed-off-by: Wenkai Yin --- src/replication/operation/controller.go | 23 ++++++++++++++++++++ src/replication/operation/flow/copy.go | 9 ++++++++ src/replication/operation/flow/stage.go | 12 ++++++++++ src/replication/operation/flow/stage_test.go | 2 +- 4 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/replication/operation/controller.go b/src/replication/operation/controller.go index 4d233dac9..878325e7e 100644 --- a/src/replication/operation/controller.go +++ b/src/replication/operation/controller.go @@ -125,6 +125,29 @@ func (c *controller) StopReplication(executionID int64) error { if err != nil { return err } + // no tasks, just set its status to "stopped" + if len(tasks) == 0 { + execution, err := c.executionMgr.Get(executionID) + if err != nil { + return err + } + if execution == nil { + return fmt.Errorf("the execution %d not found", executionID) + } + if execution.Status != models.ExecutionStatusInProgress { + log.Debugf("the execution %d isn't in progress, no need to stop", executionID) + return nil + } + if err = c.executionMgr.Update(&models.Execution{ + ID: executionID, + Status: models.ExecutionStatusStopped, + EndTime: time.Now(), + }, models.ExecutionPropsName.Status, models.ExecutionPropsName.EndTime); err != nil { + return err + } + log.Debugf("the status of execution %d is set to stopped", executionID) + } + // got tasks, stopping the tasks one by one for _, task := range tasks { if !isTaskRunning(task) { log.Debugf("the task %d(job ID: %s) isn't running, its status is %s, skip", task.ID, task.JobID, task.Status) diff --git a/src/replication/operation/flow/copy.go b/src/replication/operation/flow/copy.go index e3687697c..00489c314 100644 --- a/src/replication/operation/flow/copy.go +++ b/src/replication/operation/flow/copy.go @@ -61,6 +61,15 @@ func (c *copyFlow) Run(interface{}) (int, error) { return 0, err } + isStopped, err := isExecutionStopped(c.executionMgr, c.executionID) + if err != nil { + return 0, err + } + if isStopped { + log.Debugf("the execution %d is stopped, stop the flow", c.executionID) + return 0, nil + } + if len(srcResources) == 0 { markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated") log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID) diff --git a/src/replication/operation/flow/stage.go b/src/replication/operation/flow/stage.go index f3cfef416..5476f3c13 100644 --- a/src/replication/operation/flow/stage.go +++ b/src/replication/operation/flow/stage.go @@ -309,6 +309,18 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite return n, nil } +// check whether the execution is stopped +func isExecutionStopped(mgr execution.Manager, id int64) (bool, error) { + execution, err := mgr.Get(id) + if err != nil { + return false, err + } + if execution == nil { + return false, fmt.Errorf("execution %d not found", id) + } + return execution.Status == models.ExecutionStatusStopped, nil +} + // return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]" // if the resource has vtags func getResourceName(res *model.Resource) string { diff --git a/src/replication/operation/flow/stage_test.go b/src/replication/operation/flow/stage_test.go index 303f6cc10..4863383df 100644 --- a/src/replication/operation/flow/stage_test.go +++ b/src/replication/operation/flow/stage_test.go @@ -151,7 +151,7 @@ func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*model return 0, nil, nil } func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) { - return nil, nil + return &models.Execution{}, nil } func (f *fakedExecutionManager) Update(*models.Execution, ...string) error { return nil