Merge pull request #7125 from ywk253100/190312_update_job_id

Update the job ID in flow controller
This commit is contained in:
Wenkai Yin 2019-03-12 17:26:43 +08:00 committed by GitHub
commit 5f8c19e5ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 5 additions and 3 deletions

View File

@ -246,12 +246,13 @@ func (f *flow) schedule() error {
continue continue
} }
allFailed = false allFailed = false
// if the task is submitted successfully, update the status and start time // if the task is submitted successfully, update the status, job ID and start time
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil { if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil {
log.Errorf("failed to update task status %d: %v", result.TaskID, err) log.Errorf("failed to update task status %d: %v", result.TaskID, err)
} }
if err = f.executionMgr.UpdateTask(&model.Task{ if err = f.executionMgr.UpdateTask(&model.Task{
ID: result.TaskID, ID: result.TaskID,
JobID: result.JobID,
StartTime: time.Now(), StartTime: time.Now(),
}); err != nil { }); err != nil {
log.Errorf("failed to update task %d: %v", result.TaskID, err) log.Errorf("failed to update task %d: %v", result.TaskID, err)

View File

@ -49,6 +49,7 @@ type ScheduleItem struct {
// ScheduleResult is the result of the schedule for one item // ScheduleResult is the result of the schedule for one item
type ScheduleResult struct { type ScheduleResult struct {
TaskID int64 TaskID int64
JobID string
Error error Error error
} }
@ -119,16 +120,16 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult,
"src_resource": string(src), "src_resource": string(src),
"dst_resource": string(dest), "dst_resource": string(dest),
} }
_, joberr := d.client.SubmitJob(job) id, joberr := d.client.SubmitJob(job)
if joberr != nil { if joberr != nil {
result.Error = joberr result.Error = joberr
results = append(results, result) results = append(results, result)
continue continue
} }
result.JobID = id
results = append(results, result) results = append(results, result)
} }
return results, nil return results, nil
} }
// Stop the transfer job // Stop the transfer job