From fb871dbbe8e763c006eb0ca168bc4865ceddd9bf Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 6 Jan 2021 16:41:46 +0800 Subject: [PATCH] Provide a mechanism to sweep the execution/task records in task manager Provide a mechanism to sweep the execution/task records in task manager Fixes #13888 Signed-off-by: Wenkai Yin --- src/controller/scan/base_controller.go | 5 ++ src/pkg/task/execution.go | 67 +++++++++++++++++++++++++- src/pkg/task/execution_test.go | 17 +++++-- 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/src/controller/scan/base_controller.go b/src/controller/scan/base_controller.go index 4fd990e60..e30b1422b 100644 --- a/src/controller/scan/base_controller.go +++ b/src/controller/scan/base_controller.go @@ -60,6 +60,11 @@ const ( robotIDKey = "robot_id" ) +func init() { + // keep only the latest created 5 scan all execution records + task.SetExecutionSweeperCount(job.ImageScanAllJob, 5) +} + // uuidGenerator is a func template which is for generating UUID. type uuidGenerator func() (string, error) diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index 900412134..93ad902de 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -24,13 +24,16 @@ import ( "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task/dao" ) var ( // ExecMgr is a global execution manager instance - ExecMgr = NewExecutionManager() + ExecMgr = NewExecutionManager() + executionSweeperCount = map[string]uint8{} + defaultExecutionSweeperCount uint8 = 50 ) // ExecutionManager manages executions. @@ -75,6 +78,7 @@ func NewExecutionManager() ExecutionManager { executionDAO: dao.NewExecutionDAO(), taskMgr: Mgr, taskDAO: dao.NewTaskDAO(), + ormCreator: orm.Crt, } } @@ -82,6 +86,7 @@ type executionManager struct { executionDAO dao.ExecutionDAO taskMgr Manager taskDAO dao.TaskDAO + ormCreator orm.Creator } func (e *executionManager) Count(ctx context.Context, query *q.Query) (int64, error) { @@ -109,7 +114,58 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor StartTime: now, UpdateTime: now, } - return e.executionDAO.Create(ctx, execution) + id, err := e.executionDAO.Create(ctx, execution) + if err != nil { + return 0, err + } + + // sweep the execution records to avoid the execution/task records explosion + go func() { + ctx := orm.NewContext(context.Background(), e.ormCreator.Create()) + if err := e.sweep(ctx, vendorType); err != nil { + log.Errorf("failed to sweep the executions of %s: %v", vendorType, err) + return + } + }() + + return id, nil +} + +func (e *executionManager) sweep(ctx context.Context, vendorType string) error { + count := executionSweeperCount[vendorType] + if count == 0 { + count = defaultExecutionSweeperCount + } + for { + // the function "List" of the execution manager returns the execution records + // ordered by start time. After the sorting is supported in query, we should + // specify the sorting explicitly + // the execution records in second page are always the candidates should to be swept + executions, err := e.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": vendorType, + }, + PageNumber: 2, + PageSize: int64(count), + }) + if err != nil { + return err + } + // no execution records need to be swept, return directly + if len(executions) == 0 { + return nil + } + for _, execution := range executions { + // if the status of the execution isn't final, skip + if !job.Status(execution.Status).Final() { + continue + } + if err = e.Delete(ctx, execution.ID); err != nil { + log.Errorf("failed to delete the execution %d: %v", execution.ID, err) + continue + } + } + } } func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error { @@ -297,3 +353,10 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao return exec } + +// SetExecutionSweeperCount sets the count of execution records retained by the sweeper +// If no count is set for the specified vendor, the default value will be used +// The sweeper retains the latest created #count execution records for the specified vendor +func SetExecutionSweeperCount(vendorType string, count uint8) { + executionSweeperCount[vendorType] = count +} diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go index 8d497d7bc..f79b5ae4e 100644 --- a/src/pkg/task/execution_test.go +++ b/src/pkg/task/execution_test.go @@ -22,26 +22,30 @@ import ( "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task/dao" + "github.com/goharbor/harbor/src/testing/lib/orm" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) type executionManagerTestSuite struct { suite.Suite - execMgr *executionManager - taskMgr *mockTaskManager - execDAO *mockExecutionDAO - taskDAO *mockTaskDAO + execMgr *executionManager + taskMgr *mockTaskManager + execDAO *mockExecutionDAO + taskDAO *mockTaskDAO + ormCreator *orm.Creator } func (e *executionManagerTestSuite) SetupTest() { e.taskMgr = &mockTaskManager{} e.execDAO = &mockExecutionDAO{} e.taskDAO = &mockTaskDAO{} + e.ormCreator = &orm.Creator{} e.execMgr = &executionManager{ executionDAO: e.execDAO, taskMgr: e.taskMgr, taskDAO: e.taskDAO, + ormCreator: e.ormCreator, } } @@ -55,11 +59,16 @@ func (e *executionManagerTestSuite) TestCount() { func (e *executionManagerTestSuite) TestCreate() { e.execDAO.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) + e.ormCreator.On("Create").Return(&orm.FakeOrmer{}) + e.execDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) id, err := e.execMgr.Create(nil, "vendor", 0, ExecutionTriggerManual, map[string]interface{}{"k": "v"}) e.Require().Nil(err) e.Equal(int64(1), id) + // sleep to make sure the function in the goroutine run + time.Sleep(1 * time.Second) e.execDAO.AssertExpectations(e.T()) + e.ormCreator.AssertExpectations(e.T()) } func (e *executionManagerTestSuite) TestMarkDone() {