diff --git a/src/pkg/task/dao/execution.go b/src/pkg/task/dao/execution.go new file mode 100644 index 000000000..1aa538291 --- /dev/null +++ b/src/pkg/task/dao/execution.go @@ -0,0 +1,134 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "context" + + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" +) + +// ExecutionDAO is the data access object interface for execution +type ExecutionDAO interface { + // Count returns the total count of executions according to the query + Count(ctx context.Context, query *q.Query) (count int64, err error) + // List the executions according to the query + List(ctx context.Context, query *q.Query) (executions []*Execution, err error) + // Get the specified execution + Get(ctx context.Context, id int64) (execution *Execution, err error) + // Create an execution + Create(ctx context.Context, execution *Execution) (id int64, err error) + // Update the specified execution. Only the properties specified by "props" will be updated if it is set + Update(ctx context.Context, execution *Execution, props ...string) (err error) + // Delete the specified execution + Delete(ctx context.Context, id int64) (err error) +} + +// NewExecutionDAO returns an instance of ExecutionDAO +func NewExecutionDAO() ExecutionDAO { + return &executionDAO{} +} + +type executionDAO struct{} + +func (e *executionDAO) Count(ctx context.Context, query *q.Query) (int64, error) { + if query != nil { + // ignore the page number and size + query = &q.Query{ + Keywords: query.Keywords, + } + } + qs, err := orm.QuerySetter(ctx, &Execution{}, query) + if err != nil { + return 0, err + } + return qs.Count() +} + +func (e *executionDAO) List(ctx context.Context, query *q.Query) ([]*Execution, error) { + executions := []*Execution{} + qs, err := orm.QuerySetter(ctx, &Execution{}, query) + if err != nil { + return nil, err + } + qs = qs.OrderBy("-StartTime") + if _, err = qs.All(&executions); err != nil { + return nil, err + } + return executions, nil +} + +func (e *executionDAO) Get(ctx context.Context, id int64) (*Execution, error) { + execution := &Execution{ + ID: id, + } + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + if err := ormer.Read(execution); err != nil { + if e := orm.AsNotFoundError(err, "execution %d not found", id); e != nil { + err = e + } + return nil, err + } + return execution, nil +} + +func (e *executionDAO) Create(ctx context.Context, execution *Execution) (int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return 0, err + } + return ormer.Insert(execution) +} + +func (e *executionDAO) Update(ctx context.Context, execution *Execution, props ...string) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + n, err := ormer.Update(execution, props...) + if err != nil { + return err + } + if n == 0 { + return errors.NotFoundError(nil).WithMessage("execution %d not found", execution.ID) + } + return nil +} + +func (e *executionDAO) Delete(ctx context.Context, id int64) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + n, err := ormer.Delete(&Execution{ + ID: id, + }) + if err != nil { + if e := orm.AsForeignKeyError(err, + "the execution %d is referenced by other tasks", id); e != nil { + err = e + } + return err + } + if n == 0 { + return errors.NotFoundError(nil).WithMessage("execution %d not found", id) + } + return nil +} diff --git a/src/pkg/task/dao/execution_test.go b/src/pkg/task/dao/execution_test.go new file mode 100644 index 000000000..4c3ce727b --- /dev/null +++ b/src/pkg/task/dao/execution_test.go @@ -0,0 +1,121 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "context" + "testing" + + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" + "github.com/stretchr/testify/suite" +) + +type executionDAOTestSuite struct { + suite.Suite + ctx context.Context + executionDAO *executionDAO + executionID int64 +} + +func (e *executionDAOTestSuite) SetupSuite() { + dao.PrepareTestForPostgresSQL() + e.ctx = orm.Context() + e.executionDAO = &executionDAO{} +} + +func (e *executionDAOTestSuite) SetupTest() { + id, err := e.executionDAO.Create(e.ctx, &Execution{ + VendorType: "test", + Trigger: "test", + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + e.executionID = id +} + +func (e *executionDAOTestSuite) TearDownTest() { + err := e.executionDAO.Delete(e.ctx, e.executionID) + e.Nil(err) +} + +func (e *executionDAOTestSuite) TestCount() { + count, err := e.executionDAO.Count(e.ctx, &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": "test", + }, + }) + e.Require().Nil(err) + e.Equal(int64(1), count) +} + +func (e *executionDAOTestSuite) TestList() { + executions, err := e.executionDAO.List(e.ctx, &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": "test", + }, + }) + e.Require().Nil(err) + e.Require().Len(executions, 1) + e.Equal(e.executionID, executions[0].ID) +} + +func (e *executionDAOTestSuite) TestGet() { + // not exist + _, err := e.executionDAO.Get(e.ctx, 10000) + e.Require().NotNil(err) + e.True(errors.IsNotFoundErr(err)) + + // exist + execution, err := e.executionDAO.Get(e.ctx, e.executionID) + e.Require().Nil(err) + e.NotNil(execution) +} + +func (e *executionDAOTestSuite) TestCreate() { + // happy pass is covered by SetupTest +} + +func (e *executionDAOTestSuite) TestUpdate() { + // not exist + err := e.executionDAO.Update(e.ctx, &Execution{ID: 10000}, "Status") + e.Require().NotNil(err) + e.True(errors.IsNotFoundErr(err)) + + // exist + err = e.executionDAO.Update(e.ctx, &Execution{ + ID: e.executionID, + Status: "failed", + }, "Status") + e.Require().Nil(err) + execution, err := e.executionDAO.Get(e.ctx, e.executionID) + e.Require().Nil(err) + e.Equal("failed", execution.Status) +} + +func (e *executionDAOTestSuite) TestDelete() { + // not exist + err := e.executionDAO.Delete(e.ctx, 10000) + e.Require().NotNil(err) + e.True(errors.IsNotFoundErr(err)) + + // happy pass is covered by TearDownTest +} + +func TestExecutionDAOSuite(t *testing.T) { + suite.Run(t, &executionDAOTestSuite{}) +} diff --git a/src/pkg/task/dao/model.go b/src/pkg/task/dao/model.go index 78bea57e8..c3bf8f408 100644 --- a/src/pkg/task/dao/model.go +++ b/src/pkg/task/dao/model.go @@ -57,3 +57,9 @@ type Task struct { UpdateTime time.Time `orm:"column(update_time)"` EndTime time.Time `orm:"column(end_time)"` } + +// StatusCount model +type StatusCount struct { + Status string `orm:"column(status)"` + Count int64 `orm:"column(count)"` +} diff --git a/src/pkg/task/dao/task.go b/src/pkg/task/dao/task.go new file mode 100644 index 000000000..0a6bc0a7e --- /dev/null +++ b/src/pkg/task/dao/task.go @@ -0,0 +1,204 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "context" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" +) + +// TaskDAO is the data access object interface for task +type TaskDAO interface { + // Count returns the total count of tasks according to the query + Count(ctx context.Context, query *q.Query) (count int64, err error) + // List the tasks according to the query + List(ctx context.Context, query *q.Query) (tasks []*Task, err error) + // Get the specified task + Get(ctx context.Context, id int64) (task *Task, err error) + // Create a task + Create(ctx context.Context, task *Task) (id int64, err error) + // Update the specified task. Only the properties specified by "props" will be updated if it is set + Update(ctx context.Context, task *Task, props ...string) (err error) + // UpdateStatus updates the status of task + UpdateStatus(ctx context.Context, id int64, status string, statusRevision int64) (err error) + // Delete the specified task + Delete(ctx context.Context, id int64) (err error) + // ListStatusCount lists the status count for the tasks reference the specified execution + ListStatusCount(ctx context.Context, executionID int64) (statusCounts []*StatusCount, err error) + // GetMaxEndTime gets the max end time for the tasks references the specified execution + GetMaxEndTime(ctx context.Context, executionID int64) (endTime time.Time, err error) +} + +// NewTaskDAO returns an instance of TaskDAO +func NewTaskDAO() TaskDAO { + return &taskDAO{} +} + +type taskDAO struct{} + +func (t *taskDAO) Count(ctx context.Context, query *q.Query) (int64, error) { + if query != nil { + // ignore the page number and size + query = &q.Query{ + Keywords: query.Keywords, + } + } + qs, err := orm.QuerySetter(ctx, &Task{}, query) + if err != nil { + return 0, err + } + return qs.Count() +} + +func (t *taskDAO) List(ctx context.Context, query *q.Query) ([]*Task, error) { + tasks := []*Task{} + qs, err := orm.QuerySetter(ctx, &Task{}, query) + if err != nil { + return nil, err + } + qs = qs.OrderBy("-StartTime") + if _, err = qs.All(&tasks); err != nil { + return nil, err + } + return tasks, nil +} + +func (t *taskDAO) Get(ctx context.Context, id int64) (*Task, error) { + task := &Task{ + ID: id, + } + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + if err := ormer.Read(task); err != nil { + if e := orm.AsNotFoundError(err, "task %d not found", id); e != nil { + err = e + } + return nil, err + } + return task, nil +} + +func (t *taskDAO) Create(ctx context.Context, task *Task) (int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return 0, err + } + id, err := ormer.Insert(task) + if err != nil { + if e := orm.AsForeignKeyError(err, + "the task tries to reference a non existing execution %d", task.ExecutionID); e != nil { + err = e + } + return 0, err + } + return id, nil +} + +func (t *taskDAO) Update(ctx context.Context, task *Task, props ...string) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + n, err := ormer.Update(task, props...) + if err != nil { + return err + } + if n == 0 { + return errors.NotFoundError(nil).WithMessage("task %d not found", task.ID) + } + return nil +} + +func (t *taskDAO) UpdateStatus(ctx context.Context, id int64, status string, statusRevision int64) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + + // status revision is the unix timestamp of job starting time, it's changing means a retrying of the job + startTime := time.Unix(statusRevision, 0) + // update run count and start time when status revision changes + sql := `update task set run_count = run_count +1, start_time = ? + where id = ? and status_revision < ?` + if _, err = ormer.Raw(sql, startTime, id, statusRevision).Exec(); err != nil { + return err + } + + jobStatus := job.Status(status) + statusCode := jobStatus.Code() + var endTime time.Time + // when the task is in final status, update the end time + // when the task re-runs again, the end time should be cleared, so set the end time + // to null if the task isn't in final status + if jobStatus.Final() { + endTime = time.Now() + } + // use raw sql rather than the ORM as the sql generated by ORM isn't a "single" statement + // which means the operation isn't atomic, this will cause issues when running in concurrency + sql = `update task set status = ?, status_code = ?, status_revision = ?, end_time = ? + where id = ? and (status_revision = ? and status_code < ? or status_revision < ?) ` + _, err = ormer.Raw(sql, status, statusCode, statusRevision, endTime, + id, statusRevision, statusCode, statusRevision).Exec() + return err +} + +func (t *taskDAO) Delete(ctx context.Context, id int64) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + n, err := ormer.Delete(&Task{ + ID: id, + }) + if err != nil { + return err + } + if n == 0 { + return errors.NotFoundError(nil).WithMessage("task %d not found", id) + } + return nil +} + +func (t *taskDAO) ListStatusCount(ctx context.Context, executionID int64) ([]*StatusCount, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + statusCounts := []*StatusCount{} + _, err = ormer.Raw("select status, count(*) as count from task where execution_id=? group by status", executionID). + QueryRows(&statusCounts) + if err != nil { + return nil, err + } + return statusCounts, nil +} + +func (t *taskDAO) GetMaxEndTime(ctx context.Context, executionID int64) (time.Time, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return time.Time{}, err + } + var endTime time.Time + err = ormer.Raw("select max(end_time) from task where execution_id = ?", executionID). + QueryRow(&endTime) + return endTime, nil +} diff --git a/src/pkg/task/dao/task_test.go b/src/pkg/task/dao/task_test.go new file mode 100644 index 000000000..9a2184fc3 --- /dev/null +++ b/src/pkg/task/dao/task_test.go @@ -0,0 +1,211 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "context" + "testing" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" + "github.com/stretchr/testify/suite" +) + +type taskDAOTestSuite struct { + suite.Suite + ctx context.Context + taskDAO *taskDAO + executionDAO *executionDAO + executionID int64 + taskID int64 +} + +func (t *taskDAOTestSuite) SetupSuite() { + t.ctx = orm.Context() + t.taskDAO = &taskDAO{} + t.executionDAO = &executionDAO{} +} + +func (t *taskDAOTestSuite) SetupTest() { + id, err := t.executionDAO.Create(t.ctx, &Execution{ + VendorType: "test", + Trigger: "test", + ExtraAttrs: "{}", + }) + t.Require().Nil(err) + t.executionID = id + id, err = t.taskDAO.Create(t.ctx, &Task{ + ExecutionID: t.executionID, + Status: "success", + StatusCode: 1, + ExtraAttrs: "{}", + }) + t.Require().Nil(err) + t.taskID = id +} + +func (t *taskDAOTestSuite) TearDownTest() { + err := t.taskDAO.Delete(t.ctx, t.taskID) + t.Nil(err) + + err = t.executionDAO.Delete(t.ctx, t.executionID) + t.Nil(err) +} + +func (t *taskDAOTestSuite) TestCount() { + count, err := t.taskDAO.Count(t.ctx, &q.Query{ + Keywords: map[string]interface{}{ + "ExecutionID": t.executionID, + }, + }) + t.Require().Nil(err) + t.Equal(int64(1), count) +} + +func (t *taskDAOTestSuite) TestList() { + tasks, err := t.taskDAO.List(t.ctx, &q.Query{ + Keywords: map[string]interface{}{ + "ExecutionID": t.executionID, + }, + }) + t.Require().Nil(err) + t.Require().Len(tasks, 1) + t.Equal(t.taskID, tasks[0].ID) +} + +func (t *taskDAOTestSuite) TestGet() { + // not exist + _, err := t.taskDAO.Get(t.ctx, 10000) + t.Require().NotNil(err) + t.True(errors.IsNotFoundErr(err)) + + // exist + task, err := t.taskDAO.Get(t.ctx, t.taskID) + t.Require().Nil(err) + t.NotNil(task) +} + +func (t *taskDAOTestSuite) TestCreate() { + // reference the non-existing execution + _, err := t.taskDAO.Create(t.ctx, &Task{ + ExecutionID: 10000, + Status: "success", + StatusCode: 1, + ExtraAttrs: "{}", + }) + t.Require().NotNil(err) + t.True(errors.IsErr(err, errors.ViolateForeignKeyConstraintCode)) + + // reference the existing execution is covered by SetupTest +} + +func (t *taskDAOTestSuite) TestUpdate() { + // not exist + err := t.taskDAO.Update(t.ctx, &Task{ID: 10000}, "Status") + t.Require().NotNil(err) + t.True(errors.IsNotFoundErr(err)) + + // exist + err = t.taskDAO.Update(t.ctx, &Task{ + ID: t.taskID, + Status: "failed", + }, "Status") + t.Require().Nil(err) + task, err := t.taskDAO.Get(t.ctx, t.taskID) + t.Require().Nil(err) + t.Equal("failed", task.Status) +} + +func (t *taskDAOTestSuite) TestUpdateStatus() { + // update status to running + status := job.RunningStatus.String() + statusRevision := time.Now().Unix() + err := t.taskDAO.UpdateStatus(t.ctx, t.taskID, status, statusRevision) + t.Require().Nil(err) + + task, err := t.taskDAO.Get(t.ctx, t.taskID) + t.Require().Nil(err) + t.Equal(1, task.RunCount) + t.True(time.Unix(statusRevision, 0).Equal(task.StartTime)) + t.Equal(status, task.Status) + t.Equal(job.RunningStatus.Code(), task.StatusCode) + t.Equal(statusRevision, task.StatusRevision) + t.Equal(time.Time{}, task.EndTime) + + // update status to success + status = job.SuccessStatus.String() + err = t.taskDAO.UpdateStatus(t.ctx, t.taskID, status, statusRevision) + t.Require().Nil(err) + + task, err = t.taskDAO.Get(t.ctx, t.taskID) + t.Require().Nil(err) + t.Equal(1, task.RunCount) + t.True(time.Unix(statusRevision, 0).Equal(task.StartTime)) + t.Equal(status, task.Status) + t.Equal(job.SuccessStatus.Code(), task.StatusCode) + t.Equal(statusRevision, task.StatusRevision) + t.NotEqual(time.Time{}, task.EndTime) + + // update status to running again with different revision + status = job.RunningStatus.String() + statusRevision = time.Now().Add(1 * time.Second).Unix() + err = t.taskDAO.UpdateStatus(t.ctx, t.taskID, status, statusRevision) + t.Require().Nil(err) + + task, err = t.taskDAO.Get(t.ctx, t.taskID) + t.Require().Nil(err) + t.Equal(2, task.RunCount) + t.True(time.Unix(statusRevision, 0).Equal(task.StartTime)) + t.Equal(status, task.Status) + t.Equal(job.RunningStatus.Code(), task.StatusCode) + t.Equal(statusRevision, task.StatusRevision) + t.Equal(time.Time{}, task.EndTime) +} + +func (t *taskDAOTestSuite) TestDelete() { + // not exist + err := t.taskDAO.Delete(t.ctx, 10000) + t.Require().NotNil(err) + t.True(errors.IsNotFoundErr(err)) + + // happy pass is covered by TearDownTest +} + +func (t *taskDAOTestSuite) TestListStatusCount() { + scs, err := t.taskDAO.ListStatusCount(t.ctx, t.executionID) + t.Require().Nil(err) + t.Require().Len(scs, 1) + t.Equal("success", scs[0].Status) + t.Equal(int64(1), scs[0].Count) +} + +func (t *taskDAOTestSuite) TestGetMaxEndTime() { + now := time.Now() + err := t.taskDAO.Update(t.ctx, &Task{ + ID: t.taskID, + EndTime: now, + }, "EndTime") + t.Require().Nil(err) + endTime, err := t.taskDAO.GetMaxEndTime(t.ctx, t.executionID) + t.Require().Nil(err) + t.Equal(now.Unix(), endTime.Unix()) +} + +func TestTaskDAOSuite(t *testing.T) { + suite.Run(t, &taskDAOTestSuite{}) +}