add debugging env for GC time window (#12528)

* add debugging env for GC time window

For debugging, the tester/users wants to run GC to delete the removed artifact immediately instead of waitting for two hours, add the env(GC_BLOB_TIME_WINDOW) to meet this.

Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
Wang Yan 2020-07-22 11:09:01 +08:00 committed by GitHub
parent 46fa64462a
commit eeb8fca255
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 67 additions and 25 deletions

View File

@ -154,6 +154,6 @@ const (
CountPerProject = "count_per_project"
StoragePerProject = "storage_per_project"
// ForeignLayer
ForeignLayer = "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip"
// DefaultGCTimeWindowHours is the reserve blob time window used by GC, default is 2 hours
DefaultGCTimeWindowHours = int64(2)
)

View File

@ -16,6 +16,7 @@ package api
import (
"errors"
"github.com/goharbor/harbor/src/core/config"
"net/http"
"os"
"strconv"
@ -74,6 +75,7 @@ func (gc *GCAPI) Post() {
}
ajr.Name = common_job.ImageGC
ajr.Parameters["redis_url_reg"] = os.Getenv("_REDIS_URL_REG")
ajr.Parameters["time_window"] = config.GetGCTimeWindow()
gc.submit(&ajr)
gc.Redirect(http.StatusCreated, strconv.FormatInt(ajr.ID, 10))
}
@ -101,6 +103,7 @@ func (gc *GCAPI) Put() {
}
ajr.Name = common_job.ImageGC
ajr.Parameters["redis_url_reg"] = os.Getenv("_REDIS_URL_REG")
ajr.Parameters["time_window"] = config.GetGCTimeWindow()
gc.updateSchedule(ajr)
}

View File

@ -20,6 +20,7 @@ package config
import (
"errors"
"os"
"strconv"
"strings"
"github.com/goharbor/harbor/src/common"
@ -478,3 +479,15 @@ func GetPermittedRegistryTypesForProxyCache() []string {
}
return strings.Split(types, ",")
}
// GetGCTimeWindow returns the reserve time window of blob.
func GetGCTimeWindow() int64 {
// the env is for testing/debugging. For production, Do NOT set it.
if env, exist := os.LookupEnv("GC_TIME_WINDOW_HOURS"); exist {
timeWindow, err := strconv.ParseInt(env, 10, 64)
if err == nil {
return timeWindow
}
}
return common.DefaultGCTimeWindowHours
}

View File

@ -59,6 +59,7 @@ func TestConfig(t *testing.T) {
defer os.Setenv("TOKEN_PRIVATE_KEY_PATH", oriKeyPath)
os.Setenv("JOBSERVICE_URL", "http://myjob:8888")
os.Setenv("GC_TIME_WINDOW_HOURS", "0")
Init()
@ -188,6 +189,8 @@ func TestConfig(t *testing.T) {
assert.Equal("http://127.0.0.1:8080", localCoreURL)
assert.True(NotificationEnable())
assert.Equal(int64(0), GetGCTimeWindow())
}
func currPath() string {

View File

@ -16,7 +16,6 @@ package gc
import (
"os"
"strconv"
"time"
"github.com/goharbor/harbor/src/lib/errors"
@ -118,6 +117,7 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
// parseParams set the parameters according to the GC API call.
func (gc *GarbageCollector) parseParams(params job.Parameters) {
// redis url
gc.logger.Info(params)
gc.redisURL = params["redis_url_reg"].(string)
// delete untagged: default is to delete the untagged artifact
@ -133,13 +133,8 @@ func (gc *GarbageCollector) parseParams(params job.Parameters) {
gc.timeWindowHours = 2
timeWindow, exist := params["time_window"]
if exist {
if timeWindowHours, ok := timeWindow.(string); ok {
str, err := strconv.ParseInt(timeWindowHours, 10, 64)
if err != nil {
gc.logger.Warningf("wrong type of time windows, set the default value. %v", err)
} else {
gc.timeWindowHours = str
}
if timeWindow, ok := timeWindow.(float64); ok {
gc.timeWindowHours = int64(timeWindow)
}
}
@ -215,14 +210,19 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
// update delete status for the candidates.
blobCt := 0
mfCt := 0
makeSize := int64(0)
for _, blob := range blobs {
if !gc.dryRun {
blob.Status = blob_models.StatusDelete
_, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil {
gc.logger.Warningf("failed to mark gc candidate, skip it.: %s, error: %v", blob.Digest, err)
continue
}
if count == 0 {
gc.logger.Warningf("no blob found to mark gc candidate, skip it. ID:%d, digest:%s", blob.ID, blob.Digest)
continue
}
}
gc.logger.Infof("blob eligible for deletion: %s", blob.Digest)
gc.deleteSet = append(gc.deleteSet, blob)
@ -232,21 +232,28 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
} else {
blobCt++
}
makeSize = makeSize + blob.Size
}
gc.logger.Infof("%d blobs and %d manifests eligible for deletion", blobCt, mfCt)
gc.logger.Infof("The GC could free up %d MB space, the size is a rough estimate.", makeSize/1024/1024)
return nil
}
func (gc *GarbageCollector) sweep(ctx job.Context) error {
gc.logger = ctx.GetLogger()
sweepSize := int64(0)
for _, blob := range gc.deleteSet {
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
blob.Status = blob_models.StatusDeleting
_, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil {
gc.logger.Errorf("failed to mark gc candidate deleting, skip: %s, %s", blob.Digest, blob.Status)
continue
}
if count == 0 {
gc.logger.Warningf("no blob found to mark gc candidate deleting, ID:%d, digest:%s", blob.ID, blob.Digest)
continue
}
// remove tags and revisions of a manifest
if _, exist := gc.trashedArts[blob.Digest]; exist && blob.IsManifest() {
@ -256,7 +263,9 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
repo, blob.ContentType, blob.Digest)
if err := v2DeleteManifest(repo, blob.Digest); err != nil {
gc.logger.Errorf("failed to delete manifest with v2 API, %s, %s, %v", repo, blob.Digest, err)
if err := gc.markDeleteFailed(ctx, blob); err != nil {
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
return err
}
return errors.Wrapf(err, "failed to delete manifest with v2 API: %s, %s", repo, blob.Digest)
@ -266,7 +275,9 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
if err := ignoreNotFound(func() error {
return gc.registryCtlClient.DeleteManifest(repo, blob.Digest)
}); err != nil {
if err := gc.markDeleteFailed(ctx, blob); err != nil {
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
return err
}
return errors.Wrapf(err, "failed to remove manifest from storage: %s, %s", repo, blob.Digest)
@ -279,7 +290,9 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
if err := ignoreNotFound(func() error {
return gc.registryCtlClient.DeleteBlob(blob.Digest)
}); err != nil {
if err := gc.markDeleteFailed(ctx, blob); err != nil {
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
return err
}
return errors.Wrapf(err, "failed to delete blob from storage: %s, %s", blob.Digest, blob.Status)
@ -290,12 +303,16 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
if err := ignoreNotFound(func() error {
return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID)
}); err != nil {
if err := gc.markDeleteFailed(ctx, blob); err != nil {
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
return err
}
return errors.Wrapf(err, "failed to delete blob from database: %s, %s", blob.Digest, blob.Status)
}
sweepSize = sweepSize + blob.Size
}
gc.logger.Infof("The GC job actual frees up %d MB space.", sweepSize/1024/1024)
return nil
}
@ -458,10 +475,13 @@ func (gc *GarbageCollector) removeUntaggedBlobs(ctx job.Context) {
// markDeleteFailed set the blob status to StatusDeleteFailed
func (gc *GarbageCollector) markDeleteFailed(ctx job.Context, blob *blob_models.Blob) error {
blob.Status = blob_models.StatusDeleteFailed
_, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil {
gc.logger.Errorf("failed to mark gc candidate delete failed: %s, %s", blob.Digest, blob.Status)
return errors.Wrapf(err, "failed to mark gc candidate delete failed: %s, %s", blob.Digest, blob.Status)
}
if count == 0 {
return errors.New(nil).WithMessage("no blob found to mark gc candidate, ID:%d, digest:%s", blob.ID, blob.Digest).WithCode(errors.NotFoundCode)
}
return nil
}

View File

@ -391,7 +391,7 @@ func (d *dao) GetBlobsNotRefedByProjectBlob(ctx context.Context, timeWindowHours
return noneRefed, err
}
sql := fmt.Sprintf(`SELECT b.id, b.digest, b.content_type, b.status FROM blob AS b LEFT JOIN project_blob pb ON b.id = pb.blob_id WHERE pb.id IS NULL AND b.update_time <= now() - interval '%d hours';`, timeWindowHours)
sql := fmt.Sprintf(`SELECT b.id, b.digest, b.content_type, b.status, b.version, b.size FROM blob AS b LEFT JOIN project_blob pb ON b.id = pb.blob_id WHERE pb.id IS NULL AND b.update_time <= now() - interval '%d hours';`, timeWindowHours)
_, err = ormer.Raw(sql).QueryRows(&noneRefed)
if err != nil {
return noneRefed, err

View File

@ -3,6 +3,7 @@ package blob
import (
"fmt"
"github.com/goharbor/harbor/src/controller/blob"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/errors"
lib_http "github.com/goharbor/harbor/src/lib/http"
@ -48,7 +49,7 @@ func handleHead(req *http.Request) error {
case blob_models.StatusDeleting:
now := time.Now().UTC()
// if the deleting exceed 2 hours, marks the blob as StatusDeleteFailed and gives a 404, so client can push it again
if now.Sub(bb.UpdateTime) > time.Duration(BlobDeleteingTimeWindow)*time.Hour {
if now.Sub(bb.UpdateTime) > time.Duration(config.GetGCTimeWindow())*time.Hour {
if err := blob.Ctl.Fail(req.Context(), bb); err != nil {
log.Errorf("failed to update blob: %s status to StatusDeleteFailed, error:%v", blobInfo.Digest, err)
return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", req.Header.Get(requestid.HeaderXRequestID)))

View File

@ -3,6 +3,7 @@ package blob
import (
"fmt"
"github.com/goharbor/harbor/src/controller/blob"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/blob/models"
@ -11,9 +12,6 @@ import (
"time"
)
// BlobDeleteingTimeWindow is the time window used in GC to reserve blobs
const BlobDeleteingTimeWindow = 2
// probeBlob handles config/layer and manifest status in the PUT Blob & Manifest middleware, and update the status before it passed into proxy(distribution).
func probeBlob(r *http.Request, digest string) error {
logger := log.G(r.Context())
@ -36,7 +34,7 @@ func probeBlob(r *http.Request, digest string) error {
case models.StatusDeleting:
now := time.Now().UTC()
// if the deleting exceed 2 hours, marks the blob as StatusDeleteFailed
if now.Sub(bb.UpdateTime) > time.Duration(BlobDeleteingTimeWindow)*time.Hour {
if now.Sub(bb.UpdateTime) > time.Duration(config.GetGCTimeWindow())*time.Hour {
if err := blob.Ctl.Fail(r.Context(), bb); err != nil {
log.Errorf("failed to update blob: %s status to StatusDeleteFailed, error:%v", bb.Digest, err)
return errors.Wrapf(err, fmt.Sprintf("the request id is: %s", r.Header.Get(requestid.HeaderXRequestID)))

View File

@ -93,7 +93,7 @@ class System(base.Base):
def create_gc_schedule(self, schedule_type, cron = None, expect_status_code = 201, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
gc_parameters = {'delete_untagged':True, 'time_window':'0'}
gc_parameters = {'delete_untagged':True}
gc_schedule = swagger_client.AdminJobScheduleObj()
gc_schedule.type = schedule_type

View File

@ -30,7 +30,11 @@ if [ $GITHUB_TOKEN ];
then
sed "s/# github_token: xxx/github_token: $GITHUB_TOKEN/" -i make/harbor.yml
fi
sudo make install COMPILETAG=compile_golangimage GOBUILDTAGS="include_oss include_gcs" NOTARYFLAG=true CLAIRFLAG=true TRIVYFLAG=true CHARTFLAG=true GEN_TLS=true
sudo make compile build prepare COMPILETAG=compile_golangimage GOBUILDTAGS="include_oss include_gcs" NOTARYFLAG=true CLAIRFLAG=true TRIVYFLAG=true CHARTFLAG=true GEN_TLS=true
# set the debugging env
echo "GC_TIME_WINDOW_HOURS=0" | sudo tee -a ./make/common/config/core/env
sudo make start
# waiting 5 minutes to start
for((i=1;i<=30;i++)); do