update GC job

Remove the artifact trash record after manifest delete success.

Don't use the flush method is because that when all of records are removed, only GC job knows these informations, and they are in the memory. Once the jobservice is crashed and restarted at GC job execution phase, the trash records are lost, then these manifest are become orphan manifests, cannot be removed any more.
Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
wang yan 2020-08-03 11:59:36 +08:00
parent 3d3fabbd20
commit e7c7c03b8c
2 changed files with 23 additions and 32 deletions

View File

@ -62,7 +62,7 @@ type GarbageCollector struct {
// holds all of trashed artifacts' digest and repositories.
// The source data of trashedArts is the table ArtifactTrash and it's only used as a dictionary by sweep when to delete a manifest.
// As table blob has no repositories data, and the repositories are required when to delete a manifest, so use the table ArtifactTrash to capture them.
trashedArts map[string][]string
trashedArts map[string][]model.ArtifactTrash
// hold all of GC candidates(non-referenced blobs), it's captured by mark and consumed by sweep.
deleteSet []*blob_models.Blob
timeWindowHours int64
@ -92,7 +92,7 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
regCtlInit()
gc.logger = ctx.GetLogger()
gc.deleteSet = make([]*blob_models.Blob, 0)
gc.trashedArts = make(map[string][]string, 0)
gc.trashedArts = make(map[string][]model.ArtifactTrash, 0)
opCmd, flag := ctx.OPCommand()
if flag && opCmd.IsStop() {
gc.logger.Info("received the stop signal, quit GC job.")
@ -254,30 +254,37 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
// remove tags and revisions of a manifest
if _, exist := gc.trashedArts[blob.Digest]; exist && blob.IsManifest() {
for _, repo := range gc.trashedArts[blob.Digest] {
for _, art := range gc.trashedArts[blob.Digest] {
// Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them.
gc.logger.Infof("delete the manifest with registry v2 API: %s, %s, %s",
repo, blob.ContentType, blob.Digest)
if err := v2DeleteManifest(repo, blob.Digest); err != nil {
gc.logger.Errorf("failed to delete manifest with v2 API, %s, %s, %v", repo, blob.Digest, err)
art.RepositoryName, blob.ContentType, blob.Digest)
if err := v2DeleteManifest(art.RepositoryName, blob.Digest); err != nil {
gc.logger.Errorf("failed to delete manifest with v2 API, %s, %s, %v", art.RepositoryName, blob.Digest, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
return err
}
return errors.Wrapf(err, "failed to delete manifest with v2 API: %s, %s", repo, blob.Digest)
return errors.Wrapf(err, "failed to delete manifest with v2 API: %s, %s", art.RepositoryName, blob.Digest)
}
// for manifest, it has to delete the revisions folder of each repository
gc.logger.Infof("delete manifest from storage: %s", blob.Digest)
if err := ignoreNotFound(func() error {
return gc.registryCtlClient.DeleteManifest(repo, blob.Digest)
return gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest)
}); err != nil {
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
return err
}
return errors.Wrapf(err, "failed to remove manifest from storage: %s, %s", repo, blob.Digest)
return errors.Wrapf(err, "failed to remove manifest from storage: %s, %s", art.RepositoryName, blob.Digest)
}
gc.logger.Infof("delete artifact trash record from database: %d, %s, %s", art.ID, art.RepositoryName, art.Digest)
if err := ignoreNotFound(func() error {
return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID)
}); err != nil {
return err
}
}
}
@ -295,7 +302,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return errors.Wrapf(err, "failed to delete blob from storage: %s, %s", blob.Digest, blob.Status)
}
// remove the blob record
gc.logger.Infof("delete blob record from database: %d, %s", blob.ID, blob.Digest)
if err := ignoreNotFound(func() error {
return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID)
}); err != nil {
@ -350,24 +357,14 @@ func (gc *GarbageCollector) cleanCache() error {
// deletedArt contains the two parts of artifact
// 1, required part, the artifacts were removed from Harbor.
// 2, optional part, the untagged artifacts.
func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]string, error) {
func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.ArtifactTrash, error) {
if os.Getenv("UTTEST") == "true" {
gc.logger = ctx.GetLogger()
}
// default is not to clean trash
flushTrash := false
defer func() {
if flushTrash {
gc.logger.Info("flush artifact trash")
if err := gc.artrashMgr.Flush(ctx.SystemContext(), gc.timeWindowHours); err != nil {
gc.logger.Errorf("failed to flush artifact trash: %v", err)
}
}
}()
arts := make([]model.ArtifactTrash, 0)
// artMap : map[digest : []repo list]
artMap := make(map[string][]string)
// artMap : map[digest : []ArtifactTrash list]
artMap := make(map[string][]model.ArtifactTrash)
// handle the optional ones, and the artifact controller will move them into trash.
if gc.deleteUntagged {
untagged, err := gc.artCtl.List(ctx.SystemContext(), &q.Query{
@ -397,22 +394,19 @@ func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]string, er
return artMap, err
}
// the repositories of blob is needed when to delete as a manifest.
// group the deleted artifact by digest. The repositories of blob is needed when to delete as a manifest.
for _, art := range arts {
_, exist := artMap[art.Digest]
if !exist {
artMap[art.Digest] = []string{art.RepositoryName}
artMap[art.Digest] = []model.ArtifactTrash{art}
} else {
repos := artMap[art.Digest]
repos = append(repos, art.RepositoryName)
repos = append(repos, art)
artMap[art.Digest] = repos
}
}
gc.logger.Info("required candidate: %+v", arts)
if !gc.dryRun {
flushTrash = true
}
return artMap, nil
}

View File

@ -62,7 +62,6 @@ func (suite *gcTestSuite) TestDeletedArt() {
logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
suite.artrashMgr.On("Flush").Return(nil)
suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{
ID: 1,
@ -194,7 +193,6 @@ func (suite *gcTestSuite) TestRun() {
ctx.On("OPCommand").Return(job.NilCommand, true)
mock.OnAnything(ctx, "Get").Return("core url", true)
suite.artrashMgr.On("Flush").Return(nil)
suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{
ID: 1,
@ -265,7 +263,6 @@ func (suite *gcTestSuite) TestMark() {
logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
suite.artrashMgr.On("Flush").Return(nil)
suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{
ID: 1,