From e55c7d05ffb0b5d1333b3c4e8330be076544ded3 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Sat, 16 Jan 2021 08:20:16 +0800 Subject: [PATCH] Don't ignore the NotFoundErr when handling the status hook of tasks to avoid the status out of sync Don't ignore the NotFoundErr when handling the status hook of tasks to avoid the status out of sync Fixes #14016 Signed-off-by: Wenkai Yin --- src/controller/replication/controller.go | 10 ++++++++++ src/controller/replication/controller_test.go | 2 ++ src/jobservice/common/rds/scripts.go | 14 ++++++-------- src/pkg/task/task.go | 4 ++++ src/server/handler/job_status_hook.go | 18 +++++++++++------- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/controller/replication/controller.go b/src/controller/replication/controller.go index 44ecca11e..ea6464efc 100644 --- a/src/controller/replication/controller.go +++ b/src/controller/replication/controller.go @@ -95,6 +95,16 @@ func (c *controller) Start(ctx context.Context, policy *model.Policy, resource * // may be submitted already when the process starts, so create a new context // with orm populated ctxx := orm.NewContext(context.Background(), c.ormCreator.Create()) + + // as we start a new transaction in the goroutine, the execution record may not + // be inserted yet, wait until it is ready before continue + if err := lib.RetryUntil(func() error { + _, err := c.execMgr.Get(ctxx, id) + return err + }); err != nil { + logger.Errorf("failed to wait the execution record to be inserted: %v", err) + } + err := c.flowCtl.Start(ctxx, id, policy, resource) if err == nil { // no err, return directly diff --git a/src/controller/replication/controller_test.go b/src/controller/replication/controller_test.go index 2ddaf207b..1079c2b98 100644 --- a/src/controller/replication/controller_test.go +++ b/src/controller/replication/controller_test.go @@ -61,6 +61,7 @@ func (r *replicationTestSuite) TestStart() { // got error when running the replication flow r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil) r.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil) r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil) r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error")) @@ -78,6 +79,7 @@ func (r *replicationTestSuite) TestStart() { // got no error when running the replication flow r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil) r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) r.ormCreator.On("Create").Return(nil) id, err = r.ctl.Start(context.Background(), &model.Policy{Enabled: true}, nil, task.ExecutionTriggerManual) diff --git a/src/jobservice/common/rds/scripts.go b/src/jobservice/common/rds/scripts.go index fff8d3777..764a93e53 100644 --- a/src/jobservice/common/rds/scripts.go +++ b/src/jobservice/common/rds/scripts.go @@ -37,19 +37,17 @@ end // luaFuncCompareText is common lua script function var luaFuncCompareText = ` -local function compare(status, revision, checkInT) +local function compare(status, revision) local sCode = stCode(status) local aCode = stCode(ARGV[1]) local aRev = tonumber(ARGV[2]) or 0 local aCheckInT = tonumber(ARGV[3]) or 0 - if revision < aRev or - ( revision == aRev and sCode < aCode ) or - ( revision == aRev and sCode == aCode and (not checkInT or checkInT < aCheckInT)) + ( revision == aRev and sCode <= aCode ) or + ( revision == aRev and aCheckInT ~= 0 ) then return 'ok' end - return 'no' end ` @@ -129,7 +127,7 @@ if res then checkInAt = tonumber(res[3]) or 0 ack = res[4] - local reply = compare(st, rev, checkInAt) + local reply = compare(st, rev) if reply == 'ok' then if not ack then @@ -142,7 +140,7 @@ if res then rev = a['revision'] checkInAt = a['check_in_at'] - local reply2 = compare(st, rev, checkInAt) + local reply2 = compare(st, rev) if reply2 == 'ok' then return 'ok' end @@ -178,7 +176,7 @@ local function canSetAck(jk, nrev) if ackv then -- ack existing local ack = cjson.decode(ackv) - local cmp = compare(ack['status'], ack['revision'], ack['check_in_at']) + local cmp = compare(ack['status'], ack['revision']) if cmp == 'ok' then return 'ok' end diff --git a/src/pkg/task/task.go b/src/pkg/task/task.go index ada25850d..bca88eb93 100644 --- a/src/pkg/task/task.go +++ b/src/pkg/task/task.go @@ -88,6 +88,10 @@ func (m *manager) Create(ctx context.Context, executionID int64, jb *Job, extraA log.Debugf("the database record for task %d created", id) // submit job to jobservice + // As all database operations are in a transaction which is committed until API returns, + // when the job is submitted to the jobservice and running, the task record may not + // insert yet, this will cause the status hook handler returning 404, and the jobservice + // will re-send the status hook again jobID, err := m.submitJob(ctx, id, jb) if err != nil { // failed to submit job to jobservice, delete the task record diff --git a/src/server/handler/job_status_hook.go b/src/server/handler/job_status_hook.go index 1b259b12d..499ff7144 100644 --- a/src/server/handler/job_status_hook.go +++ b/src/server/handler/job_status_hook.go @@ -19,9 +19,7 @@ import ( "net/http" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/lib/errors" libhttp "github.com/goharbor/harbor/src/lib/http" - "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/pkg/task" ) @@ -45,11 +43,17 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } if err := j.handler.Handle(r.Context(), sc); err != nil { - // ignore the not found error to avoid the jobservice re-sending the hook - if errors.IsNotFoundErr(err) { - log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err) - return - } + // When the status hook comes, the execution/task database record may not insert yet + // because of that the transaction isn't committed + // Do not ignore the NotFoundErr here to make jobservice resend the status hook + // again to avoid the status lost + /* + // ignore the not found error to avoid the jobservice re-sending the hook + if errors.IsNotFoundErr(err) { + log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err) + return + } + */ libhttp.SendError(w, err) return }