Merge pull request #16286 from chlins/fix/skip-replication-for-proxy-cache

fix: skip replication to proxy cache project
This commit is contained in:
Chenyu Zhang 2022-02-07 17:06:03 +08:00 committed by GitHub
commit 4ef2d65451
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 108 additions and 15 deletions

View File

@ -20,6 +20,7 @@ import (
repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/reg/model"
"github.com/goharbor/harbor/src/pkg/task"
@ -103,12 +104,29 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
}
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error {
for i, resource := range srcResources {
src, err := json.Marshal(resource)
var taskCnt int
defer func() {
// if no task be created, mark execution done.
if taskCnt == 0 {
if err := c.executionMgr.MarkDone(ctx, c.executionID, "no resources need to be replicated"); err != nil {
logger.Errorf("failed to mark done for the execution %d: %v", c.executionID, err)
}
}
}()
for i, srcResource := range srcResources {
dstResource := dstResources[i]
// if dest resource should be skipped, ignore replicate.
if dstResource.Skip {
log.Warningf("skip create replication task because of dest limitation, src: %s, dst: %s", srcResource.Metadata, dstResource.Metadata)
continue
}
src, err := json.Marshal(srcResource)
if err != nil {
return err
}
dest, err := json.Marshal(dstResources[i])
dest, err := json.Marshal(dstResource)
if err != nil {
return err
}
@ -127,11 +145,13 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
if _, err = c.taskMgr.Create(ctx, c.executionID, job, map[string]interface{}{
"operation": "copy",
"resource_type": string(resource.Type),
"source_resource": getResourceName(resource),
"destination_resource": getResourceName(dstResources[i])}); err != nil {
"resource_type": string(srcResource.Type),
"source_resource": getResourceName(srcResource),
"destination_resource": getResourceName(dstResource)}); err != nil {
return err
}
taskCnt++
}
return nil
}

View File

@ -51,6 +51,17 @@ func (c *copyFlowTestSuite) TestRun() {
},
Override: false,
},
{
Type: model.ResourceTypeArtifact,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "proxy/hello-world",
},
Vtags: []string{"latest"},
},
Override: false,
Skip: true,
},
}, nil)
adp.On("PrepareForPush", mock.Anything).Return(nil)
@ -60,7 +71,7 @@ func (c *copyFlowTestSuite) TestRun() {
}, nil)
taskMgr := &testingTask.Manager{}
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil).Once()
policy := &repctlmodel.Policy{
SrcRegistry: &model.Registry{
Type: "TEST_FOR_COPY_FLOW",

View File

@ -16,10 +16,11 @@ package flow
import (
"fmt"
"github.com/goharbor/harbor/src/lib/errors"
"path"
"strings"
"github.com/goharbor/harbor/src/lib/errors"
repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/lib/log"
adp "github.com/goharbor/harbor/src/pkg/reg/adapter"
@ -138,6 +139,7 @@ func assembleDestinationResources(resources []*model.Resource,
Deleted: resource.Deleted,
IsDeleteTag: resource.IsDeleteTag,
Override: policy.Override,
Skip: resource.Skip,
}
res.Metadata = &model.ResourceMetadata{
Repository: &model.Repository{

View File

@ -182,15 +182,19 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error {
return errors.Wrapf(err, "list projects with query %s", q)
}
existProjects := make(map[string]*Project)
proxyCacheProjects := make(map[string]bool)
existProjects := make(map[string]bool)
for _, p := range queryProjects {
existProjects[p.Name] = p
existProjects[p.Name] = true
// if project with registry_id, that means this is a proxy cache project.
if p.RegistryID > 0 {
proxyCacheProjects[p.Name] = true
}
}
var notExistProjects []*Project
for _, p := range projects {
_, exist := existProjects[p.Name]
if !exist {
if !existProjects[p.Name] {
notExistProjects = append(notExistProjects, p)
}
}
@ -205,6 +209,17 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error {
}
log.Debugf("project %s created", project.Name)
}
// do filter for proxy cache projects.
for _, res := range resources {
paths := strings.Split(res.Metadata.Repository.Name, "/")
projectName := paths[0]
if proxyCacheProjects[projectName] {
// set resource skip flag to true if it's a proxy cache project.
res.Skip = true
}
}
return nil
}
@ -295,9 +310,10 @@ func parsePublic(metadata map[string]interface{}) bool {
// Project model
type Project struct {
ID int64 `json:"project_id"`
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
ID int64 `json:"project_id"`
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
RegistryID int64 `json:"registry_id"`
}
func isLocalHarbor(url string) bool {

View File

@ -170,6 +170,33 @@ func TestPrepareForPush(t *testing.T) {
},
})
require.Nil(t, err)
// project already exists and the type is proxy cache
server = test.NewServer(&test.RequestHandlerMapping{
Method: http.MethodGet,
Pattern: "/api/projects",
Handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`[{"name": "library", "registry_id": 1}]`))
},
})
registry = &model.Registry{
URL: server.URL,
}
adapter, err = New(registry)
require.Nil(t, err)
resources := []*model.Resource{
{
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
},
},
}
err = adapter.PrepareForPush(resources)
require.Nil(t, err)
require.True(t, resources[0].Skip)
}
func TestParsePublic(t *testing.T) {

View File

@ -14,6 +14,11 @@
package model
import (
"encoding/json"
"fmt"
)
// the resource type
const (
ResourceTypeArtifact = "artifact"
@ -33,6 +38,9 @@ type Resource struct {
IsDeleteTag bool `json:"is_delete_tag"`
// indicate whether the resource can be overridden
Override bool `json:"override"`
// Skip is a flag for resource which satisfies replication rules but should
// be skipped because of other limits like when dest project's type is proxy cache.
Skip bool `json:"-"`
}
// ResourceMetadata of resource
@ -55,3 +63,12 @@ type Artifact struct {
Labels []string `json:"labels"`
Tags []string `json:"tags"`
}
func (r *ResourceMetadata) String() string {
data, err := json.Marshal(r)
if err == nil {
return string(data)
}
return fmt.Sprintf("repository: %+v, artifacts: %+v, tags: %+v", r.Repository, r.Artifacts, r.Vtags)
}