mirror of
https://github.com/goharbor/harbor
synced 2025-05-17 20:02:51 +00:00
Merge pull request #14027 from ywk253100/210116_status
Don't ignore the NotFoundErr when handling the status hook of tasks to avoid the status out of sync
This commit is contained in:
commit
63831dfd08
@ -95,6 +95,16 @@ func (c *controller) Start(ctx context.Context, policy *model.Policy, resource *
|
|||||||
// may be submitted already when the process starts, so create a new context
|
// may be submitted already when the process starts, so create a new context
|
||||||
// with orm populated
|
// with orm populated
|
||||||
ctxx := orm.NewContext(context.Background(), c.ormCreator.Create())
|
ctxx := orm.NewContext(context.Background(), c.ormCreator.Create())
|
||||||
|
|
||||||
|
// as we start a new transaction in the goroutine, the execution record may not
|
||||||
|
// be inserted yet, wait until it is ready before continue
|
||||||
|
if err := lib.RetryUntil(func() error {
|
||||||
|
_, err := c.execMgr.Get(ctxx, id)
|
||||||
|
return err
|
||||||
|
}); err != nil {
|
||||||
|
logger.Errorf("failed to wait the execution record to be inserted: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
err := c.flowCtl.Start(ctxx, id, policy, resource)
|
err := c.flowCtl.Start(ctxx, id, policy, resource)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// no err, return directly
|
// no err, return directly
|
||||||
|
@ -61,6 +61,7 @@ func (r *replicationTestSuite) TestStart() {
|
|||||||
|
|
||||||
// got error when running the replication flow
|
// got error when running the replication flow
|
||||||
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
|
||||||
r.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
r.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
|
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
|
||||||
@ -78,6 +79,7 @@ func (r *replicationTestSuite) TestStart() {
|
|||||||
|
|
||||||
// got no error when running the replication flow
|
// got no error when running the replication flow
|
||||||
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
|
||||||
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
r.ormCreator.On("Create").Return(nil)
|
r.ormCreator.On("Create").Return(nil)
|
||||||
id, err = r.ctl.Start(context.Background(), &model.Policy{Enabled: true}, nil, task.ExecutionTriggerManual)
|
id, err = r.ctl.Start(context.Background(), &model.Policy{Enabled: true}, nil, task.ExecutionTriggerManual)
|
||||||
|
@ -37,19 +37,17 @@ end
|
|||||||
|
|
||||||
// luaFuncCompareText is common lua script function
|
// luaFuncCompareText is common lua script function
|
||||||
var luaFuncCompareText = `
|
var luaFuncCompareText = `
|
||||||
local function compare(status, revision, checkInT)
|
local function compare(status, revision)
|
||||||
local sCode = stCode(status)
|
local sCode = stCode(status)
|
||||||
local aCode = stCode(ARGV[1])
|
local aCode = stCode(ARGV[1])
|
||||||
local aRev = tonumber(ARGV[2]) or 0
|
local aRev = tonumber(ARGV[2]) or 0
|
||||||
local aCheckInT = tonumber(ARGV[3]) or 0
|
local aCheckInT = tonumber(ARGV[3]) or 0
|
||||||
|
|
||||||
if revision < aRev or
|
if revision < aRev or
|
||||||
( revision == aRev and sCode < aCode ) or
|
( revision == aRev and sCode <= aCode ) or
|
||||||
( revision == aRev and sCode == aCode and (not checkInT or checkInT < aCheckInT))
|
( revision == aRev and aCheckInT ~= 0 )
|
||||||
then
|
then
|
||||||
return 'ok'
|
return 'ok'
|
||||||
end
|
end
|
||||||
|
|
||||||
return 'no'
|
return 'no'
|
||||||
end
|
end
|
||||||
`
|
`
|
||||||
@ -129,7 +127,7 @@ if res then
|
|||||||
checkInAt = tonumber(res[3]) or 0
|
checkInAt = tonumber(res[3]) or 0
|
||||||
ack = res[4]
|
ack = res[4]
|
||||||
|
|
||||||
local reply = compare(st, rev, checkInAt)
|
local reply = compare(st, rev)
|
||||||
|
|
||||||
if reply == 'ok' then
|
if reply == 'ok' then
|
||||||
if not ack then
|
if not ack then
|
||||||
@ -142,7 +140,7 @@ if res then
|
|||||||
rev = a['revision']
|
rev = a['revision']
|
||||||
checkInAt = a['check_in_at']
|
checkInAt = a['check_in_at']
|
||||||
|
|
||||||
local reply2 = compare(st, rev, checkInAt)
|
local reply2 = compare(st, rev)
|
||||||
if reply2 == 'ok' then
|
if reply2 == 'ok' then
|
||||||
return 'ok'
|
return 'ok'
|
||||||
end
|
end
|
||||||
@ -178,7 +176,7 @@ local function canSetAck(jk, nrev)
|
|||||||
if ackv then
|
if ackv then
|
||||||
-- ack existing
|
-- ack existing
|
||||||
local ack = cjson.decode(ackv)
|
local ack = cjson.decode(ackv)
|
||||||
local cmp = compare(ack['status'], ack['revision'], ack['check_in_at'])
|
local cmp = compare(ack['status'], ack['revision'])
|
||||||
if cmp == 'ok' then
|
if cmp == 'ok' then
|
||||||
return 'ok'
|
return 'ok'
|
||||||
end
|
end
|
||||||
|
@ -88,6 +88,10 @@ func (m *manager) Create(ctx context.Context, executionID int64, jb *Job, extraA
|
|||||||
log.Debugf("the database record for task %d created", id)
|
log.Debugf("the database record for task %d created", id)
|
||||||
|
|
||||||
// submit job to jobservice
|
// submit job to jobservice
|
||||||
|
// As all database operations are in a transaction which is committed until API returns,
|
||||||
|
// when the job is submitted to the jobservice and running, the task record may not
|
||||||
|
// insert yet, this will cause the status hook handler returning 404, and the jobservice
|
||||||
|
// will re-send the status hook again
|
||||||
jobID, err := m.submitJob(ctx, id, jb)
|
jobID, err := m.submitJob(ctx, id, jb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// failed to submit job to jobservice, delete the task record
|
// failed to submit job to jobservice, delete the task record
|
||||||
|
@ -19,9 +19,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
|
||||||
libhttp "github.com/goharbor/harbor/src/lib/http"
|
libhttp "github.com/goharbor/harbor/src/lib/http"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/task"
|
"github.com/goharbor/harbor/src/pkg/task"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -45,11 +43,17 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := j.handler.Handle(r.Context(), sc); err != nil {
|
if err := j.handler.Handle(r.Context(), sc); err != nil {
|
||||||
// ignore the not found error to avoid the jobservice re-sending the hook
|
// When the status hook comes, the execution/task database record may not insert yet
|
||||||
if errors.IsNotFoundErr(err) {
|
// because of that the transaction isn't committed
|
||||||
log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err)
|
// Do not ignore the NotFoundErr here to make jobservice resend the status hook
|
||||||
return
|
// again to avoid the status lost
|
||||||
}
|
/*
|
||||||
|
// ignore the not found error to avoid the jobservice re-sending the hook
|
||||||
|
if errors.IsNotFoundErr(err) {
|
||||||
|
log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*/
|
||||||
libhttp.SendError(w, err)
|
libhttp.SendError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user