From ccceacfa73db3cb26e2dd3ef8ffa8f706eef3030 Mon Sep 17 00:00:00 2001 From: Chlins Zhang Date: Mon, 12 Aug 2024 20:24:16 +0800 Subject: [PATCH] fix: fix the GC job index data race (#20830) Signed-off-by: chlins --- .../job/impl/gc/garbage_collection.go | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/src/jobservice/job/impl/gc/garbage_collection.go b/src/jobservice/job/impl/gc/garbage_collection.go index 7ae9a0b68..7968a1020 100644 --- a/src/jobservice/job/impl/gc/garbage_collection.go +++ b/src/jobservice/job/impl/gc/garbage_collection.go @@ -318,18 +318,16 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { return errGcStop } - atomic.AddInt64(&index, 1) - index := atomic.LoadInt64(&index) - + localIndex := atomic.AddInt64(&index, 1) // set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip. blob.Status = blobModels.StatusDeleting count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) if err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, index, total, blob.Digest, blob.Status) + gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, localIndex, total, blob.Digest, blob.Status) continue } if count == 0 { - gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, index, total, blob.ID, blob.Digest) + gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, localIndex, total, blob.ID, blob.Digest) continue } @@ -339,7 +337,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { for _, art := range gc.trashedArts[blob.Digest] { // Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them. gc.logger.Infof("[%s][%d/%d] delete the manifest with registry v2 API: %s, %s, %s", - uid, index, total, art.RepositoryName, blob.ContentType, blob.Digest) + uid, localIndex, total, art.RepositoryName, blob.ContentType, blob.Digest) if err := retry.Retry(func() error { return ignoreNotFound(func() error { err := v2DeleteManifest(art.RepositoryName, blob.Digest) @@ -350,13 +348,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { return err }) }, retry.Callback(func(err error, sleep time.Duration) { - gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep) + gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep) })); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err) + gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, localIndex, total, art.RepositoryName, blob.Digest, err) if err := ignoreNotFound(func() error { return gc.markDeleteFailed(ctx, blob) }); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, index, total, blob.Digest, err) + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, localIndex, total, blob.Digest, err) return err } // if the system is set to read-only mode, return directly @@ -367,7 +365,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { continue } // for manifest, it has to delete the revisions folder of each repository - gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, index, total, blob.Digest) + gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, localIndex, total, blob.Digest) if err := retry.Retry(func() error { return ignoreNotFound(func() error { err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest) @@ -378,13 +376,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { return err }) }, retry.Callback(func(err error, sleep time.Duration) { - gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep) + gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep) })); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, index, total, art.RepositoryName, blob.Digest, err) + gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, localIndex, total, art.RepositoryName, blob.Digest, err) if err := ignoreNotFound(func() error { return gc.markDeleteFailed(ctx, blob) }); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err) + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, localIndex, total, art.RepositoryName, blob.Digest, err) return err } // if the system is set to read-only mode, return directly @@ -395,19 +393,19 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { continue } - gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest) + gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, localIndex, total, art.ID, art.RepositoryName, art.Digest) if err := ignoreNotFound(func() error { return gc.blobMgr.CleanupAssociationsForArtifact(ctx.SystemContext(), art.Digest) }); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, index, total, art.Digest, err) + gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, localIndex, total, art.Digest, err) return err } - gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest) + gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, localIndex, total, art.ID, art.RepositoryName, art.Digest) if err := ignoreNotFound(func() error { return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID) }); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, index, total, art.ID, err) + gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, localIndex, total, art.ID, err) return err } } @@ -421,7 +419,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { // delete all the blobs, which include config, layer and manifest // for the foreign layer, as it's not stored in the storage, no need to call the delete api and count size, but still have to delete the DB record. if !blob.IsForeignLayer() { - gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, index, total, blob.Digest) + gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, localIndex, total, blob.Digest) if err := retry.Retry(func() error { return ignoreNotFound(func() error { err := gc.registryCtlClient.DeleteBlob(blob.Digest) @@ -432,13 +430,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { return err }) }, retry.Callback(func(err error, sleep time.Duration) { - gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, index, total, err, sleep) + gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep) })); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err) + gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, localIndex, total, blob.Digest, blob.Status, err) if err := ignoreNotFound(func() error { return gc.markDeleteFailed(ctx, blob) }); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, index, total, blob.Digest, err) + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, localIndex, total, blob.Digest, err) return err } // if the system is set to read-only mode, return directly @@ -450,15 +448,15 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { atomic.AddInt64(&sweepSize, blob.Size) } - gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, index, total, blob.ID, blob.Digest) + gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, localIndex, total, blob.ID, blob.Digest) if err := ignoreNotFound(func() error { return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID) }); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err) + gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, localIndex, total, blob.Digest, blob.Status, err) if err := ignoreNotFound(func() error { return gc.markDeleteFailed(ctx, blob) }); err != nil { - gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, index, total, blob.ID, blob.Digest, err) + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, localIndex, total, blob.ID, blob.Digest, err) return err } return err