fix: fix the GC job index data race (#20830)

Signed-off-by: chlins <chlins.zhang@gmail.com>
This commit is contained in:
Chlins Zhang 2024-08-12 20:24:16 +08:00 committed by GitHub
parent cadd3825aa
commit ccceacfa73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -318,18 +318,16 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return errGcStop
}
atomic.AddInt64(&index, 1)
index := atomic.LoadInt64(&index)
localIndex := atomic.AddInt64(&index, 1)
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
blob.Status = blobModels.StatusDeleting
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, index, total, blob.Digest, blob.Status)
gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, localIndex, total, blob.Digest, blob.Status)
continue
}
if count == 0 {
gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, index, total, blob.ID, blob.Digest)
gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, localIndex, total, blob.ID, blob.Digest)
continue
}
@ -339,7 +337,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
for _, art := range gc.trashedArts[blob.Digest] {
// Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them.
gc.logger.Infof("[%s][%d/%d] delete the manifest with registry v2 API: %s, %s, %s",
uid, index, total, art.RepositoryName, blob.ContentType, blob.Digest)
uid, localIndex, total, art.RepositoryName, blob.ContentType, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
err := v2DeleteManifest(art.RepositoryName, blob.Digest)
@ -350,13 +348,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep)
gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep)
})); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, localIndex, total, art.RepositoryName, blob.Digest, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, index, total, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, localIndex, total, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
@ -367,7 +365,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
continue
}
// for manifest, it has to delete the revisions folder of each repository
gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, index, total, blob.Digest)
gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, localIndex, total, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest)
@ -378,13 +376,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep)
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep)
})); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, index, total, art.RepositoryName, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, localIndex, total, art.RepositoryName, blob.Digest, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, localIndex, total, art.RepositoryName, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
@ -395,19 +393,19 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
continue
}
gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest)
gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, localIndex, total, art.ID, art.RepositoryName, art.Digest)
if err := ignoreNotFound(func() error {
return gc.blobMgr.CleanupAssociationsForArtifact(ctx.SystemContext(), art.Digest)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, index, total, art.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, localIndex, total, art.Digest, err)
return err
}
gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest)
gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, localIndex, total, art.ID, art.RepositoryName, art.Digest)
if err := ignoreNotFound(func() error {
return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, index, total, art.ID, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, localIndex, total, art.ID, err)
return err
}
}
@ -421,7 +419,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
// delete all the blobs, which include config, layer and manifest
// for the foreign layer, as it's not stored in the storage, no need to call the delete api and count size, but still have to delete the DB record.
if !blob.IsForeignLayer() {
gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, index, total, blob.Digest)
gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, localIndex, total, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
err := gc.registryCtlClient.DeleteBlob(blob.Digest)
@ -432,13 +430,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, index, total, err, sleep)
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep)
})); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err)
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, localIndex, total, blob.Digest, blob.Status, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, index, total, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, localIndex, total, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
@ -450,15 +448,15 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
atomic.AddInt64(&sweepSize, blob.Size)
}
gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, index, total, blob.ID, blob.Digest)
gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, localIndex, total, blob.ID, blob.Digest)
if err := ignoreNotFound(func() error {
return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err)
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, localIndex, total, blob.Digest, blob.Status, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, index, total, blob.ID, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, localIndex, total, blob.ID, blob.Digest, err)
return err
}
return err