diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go index 529763836..29a5cd121 100644 --- a/src/replication/ng/flow/flow.go +++ b/src/replication/ng/flow/flow.go @@ -246,12 +246,13 @@ func (f *flow) schedule() error { continue } allFailed = false - // if the task is submitted successfully, update the status and start time + // if the task is submitted successfully, update the status, job ID and start time if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil { log.Errorf("failed to update task status %d: %v", result.TaskID, err) } if err = f.executionMgr.UpdateTask(&model.Task{ ID: result.TaskID, + JobID: result.JobID, StartTime: time.Now(), }); err != nil { log.Errorf("failed to update task %d: %v", result.TaskID, err) diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go index d7e51eba7..17a2013af 100644 --- a/src/replication/ng/scheduler/scheduler.go +++ b/src/replication/ng/scheduler/scheduler.go @@ -49,6 +49,7 @@ type ScheduleItem struct { // ScheduleResult is the result of the schedule for one item type ScheduleResult struct { TaskID int64 + JobID string Error error } @@ -119,16 +120,16 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, "src_resource": string(src), "dst_resource": string(dest), } - _, joberr := d.client.SubmitJob(job) + id, joberr := d.client.SubmitJob(job) if joberr != nil { result.Error = joberr results = append(results, result) continue } + result.JobID = id results = append(results, result) } return results, nil - } // Stop the transfer job