mirror of
https://github.com/goharbor/harbor
synced 2024-09-20 23:59:56 +00:00
Merge pull request #14451 from ywk253100/210316_sweep_2.2
Fix the consume too much CPU issue
This commit is contained in:
commit
63a45aa8ec
4
make/migrations/postgresql/0051_2.2.1_schema.up.sql
Normal file
4
make/migrations/postgresql/0051_2.2.1_schema.up.sql
Normal file
|
@ -0,0 +1,4 @@
|
|||
/*fixes #14358*/
|
||||
UPDATE execution SET status='Success' WHERE status='Succeed';
|
||||
|
||||
CREATE INDEX IF NOT EXISTS execution_id_idx ON task (execution_id);
|
|
@ -136,43 +136,68 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
|
|||
}
|
||||
|
||||
func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error {
|
||||
count := executionSweeperCount[vendorType]
|
||||
if count == 0 {
|
||||
log.Debugf("the execution sweeper count doesn't set for %s, skip sweep", vendorType)
|
||||
size := int64(executionSweeperCount[vendorType])
|
||||
if size == 0 {
|
||||
log.Debugf("the execution sweeper size doesn't set for %s, skip sweep", vendorType)
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
// the function "List" of the execution manager returns the execution records
|
||||
// ordered by start time. After the sorting is supported in query, we should
|
||||
// specify the sorting explicitly
|
||||
// the execution records in second page are always the candidates should to be swept
|
||||
executions, err := e.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": vendorType,
|
||||
"VendorID": vendorID,
|
||||
},
|
||||
PageNumber: 2,
|
||||
PageSize: int64(count),
|
||||
})
|
||||
// the function "List" of the execution manager returns the execution records
|
||||
// ordered by start time. After the sorting is supported in query, we should
|
||||
// specify the sorting explicitly
|
||||
// get the #size execution record
|
||||
query := &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": vendorType,
|
||||
"VendorID": vendorID,
|
||||
},
|
||||
PageSize: 1,
|
||||
PageNumber: size,
|
||||
}
|
||||
executions, err := e.executionDAO.List(ctx, query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// list is null means that the execution count < size, return directly
|
||||
if len(executions) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
query.Keywords["StartTime"] = &q.Range{
|
||||
Max: executions[0].StartTime,
|
||||
}
|
||||
totalOfCandidate, err := e.executionDAO.Count(ctx, query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// n is the page count of all candidates
|
||||
n := totalOfCandidate / 1000
|
||||
if totalOfCandidate%1000 > 0 {
|
||||
n = n + 1
|
||||
}
|
||||
query.PageSize = 1000
|
||||
for i := n; i >= 1; i-- {
|
||||
query.PageNumber = i
|
||||
executions, err := e.List(ctx, query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// no execution records need to be swept, return directly
|
||||
if len(executions) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, execution := range executions {
|
||||
// if the status of the execution isn't final, skip
|
||||
if !job.Status(execution.Status).Final() {
|
||||
continue
|
||||
}
|
||||
if err = e.Delete(ctx, execution.ID); err != nil {
|
||||
// the execution may be deleted by the other sweep operation, ignore the not found error
|
||||
if errors.IsNotFoundErr(err) {
|
||||
continue
|
||||
}
|
||||
log.Errorf("failed to delete the execution %d: %v", execution.ID, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error {
|
||||
|
|
Loading…
Reference in New Issue
Block a user