From aa8b3a13437646ca846980959de543ced8849d50 Mon Sep 17 00:00:00 2001
From: stonezdj <stonezdj@gmail.com>
Date: Thu, 16 Jul 2020 18:52:45 +0800
Subject: [PATCH] Fix #12487: Proxy cache create duplicated operation log

Change method UseLocalManifest to avoid pull manifest frequently

Signed-off-by: stonezdj <stonezdj@gmail.com>
---
 src/controller/proxy/controller.go       | 43 ++++++++++-----
 src/controller/proxy/controller_test.go  | 67 ++++++++++++++++++------
 src/controller/proxy/local.go            | 60 +++++++++++++++------
 src/controller/proxy/local_test.go       |  5 +-
 src/controller/proxy/remote.go           | 26 +++++----
 src/server/middleware/repoproxy/proxy.go | 10 +++-
 6 files changed, 154 insertions(+), 57 deletions(-)

diff --git a/src/controller/proxy/controller.go b/src/controller/proxy/controller.go
index a2a3a5752..f2276a786 100644
--- a/src/controller/proxy/controller.go
+++ b/src/controller/proxy/controller.go
@@ -30,8 +30,7 @@ import (
 	"github.com/goharbor/harbor/src/lib"
 	"github.com/goharbor/harbor/src/lib/errors"
 	"github.com/goharbor/harbor/src/lib/log"
-	"github.com/goharbor/harbor/src/replication/registry"
-	v1 "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/opencontainers/image-spec/specs-go/v1"
 )
 
 const (
@@ -52,7 +51,7 @@ type Controller interface {
 	// UseLocalBlob check if the blob should use local copy
 	UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool
 	// UseLocalManifest check manifest should use local copy
-	UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool
+	UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) (bool, error)
 	// ProxyBlob proxy the blob request to the remote server, p is the proxy project
 	// art is the ArtifactInfo which includes the digest of the blob
 	ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error)
@@ -62,7 +61,6 @@ type Controller interface {
 }
 type controller struct {
 	blobCtl     blob.Controller
-	registryMgr registry.Manager
 	artifactCtl artifact.Controller
 	local       localInterface
 }
@@ -74,7 +72,6 @@ func ControllerInstance() Controller {
 	once.Do(func() {
 		ctl = &controller{
 			blobCtl:     blob.Ctl,
-			registryMgr: registry.NewDefaultManager(),
 			artifactCtl: artifact.Ctl,
 			local:       newLocalHelper(),
 		}
@@ -94,22 +91,23 @@ func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) boo
 	return exist
 }
 
-func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool {
+func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
 	if len(art.Digest) == 0 {
-		return false
+		return false, nil
 	}
-	return c.local.ManifestExist(ctx, art)
+	a, err := c.local.GetManifest(ctx, art)
+	return a != nil, err
 }
 
 func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) {
 	var man distribution.Manifest
 	remoteRepo := getRemoteRepo(art)
-	r, err := newRemoteHelper(p.RegistryID)
+	ref := getReference(art)
+	remote, err := newRemoteHelper(p.RegistryID)
 	if err != nil {
 		return man, err
 	}
-	ref := getReference(art)
-	man, err = r.Manifest(remoteRepo, ref)
+	man, dig, err := remote.Manifest(remoteRepo, ref)
 	if err != nil {
 		if errors.IsNotFoundErr(err) {
 			go func() {
@@ -124,7 +122,26 @@ func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art l
 	}
 	// Push manifest in background
 	go func() {
-		c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, r)
+		a, err := c.local.GetManifest(ctx, art)
+		if err != nil {
+			log.Errorf("failed to get manifest, error %v", err)
+		}
+		// Push manifest to local when pull with digest, or artifact not found, or digest mismatch
+		if len(art.Tag) == 0 || a == nil || a.Digest != dig {
+			// pull with digest
+			c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, remote)
+		}
+
+		// Query artifact after push
+		if a == nil {
+			a, err = c.local.GetManifest(ctx, art)
+			if err != nil {
+				log.Errorf("failed to get manifest, error %v", err)
+			}
+		}
+		if a != nil {
+			SendPullEvent(ctx, a, art.Tag)
+		}
 	}()
 
 	return man, nil
@@ -184,7 +201,7 @@ func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string,
 	}
 	if len(waitBlobs) > 0 {
 		// docker client will skip to pull layers exist in local
-		// these blobs is not exist in the proxy server
+		// these blobs are not exist in the proxy server
 		// it will cause the manifest dependency check always fail
 		// need to push these blobs before push manifest to avoid failure
 		log.Debug("Waiting blobs not empty, push it to local repo directly")
diff --git a/src/controller/proxy/controller_test.go b/src/controller/proxy/controller_test.go
index 8f66d9c0c..77ce351c2 100644
--- a/src/controller/proxy/controller_test.go
+++ b/src/controller/proxy/controller_test.go
@@ -17,30 +17,60 @@ package proxy
 import (
 	"context"
 	"github.com/docker/distribution"
+	"github.com/goharbor/harbor/src/common/models"
 	"github.com/goharbor/harbor/src/controller/artifact"
 	"github.com/goharbor/harbor/src/controller/blob"
 	"github.com/goharbor/harbor/src/lib"
-	"github.com/goharbor/harbor/src/replication/registry"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/suite"
 	"io"
 	"testing"
 )
 
+type remoteInterfaceMock struct {
+	mock.Mock
+}
+
+func (r *remoteInterfaceMock) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
+	panic("implement me")
+}
+
+func (r *remoteInterfaceMock) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
+	panic("implement me")
+}
+
+func (r *remoteInterfaceMock) ManifestExist(repo, ref string) (bool, string, error) {
+	args := r.Called(repo, ref)
+	return args.Bool(0), args.String(1), args.Error(2)
+}
+
 type localInterfaceMock struct {
 	mock.Mock
 }
 
+func (l *localInterfaceMock) SendPullEvent(ctx context.Context, repo, tag string) {
+	panic("implement me")
+}
+
+func (l *localInterfaceMock) GetManifest(ctx context.Context, art lib.ArtifactInfo) (*artifact.Artifact, error) {
+	args := l.Called(ctx, art)
+
+	var a *artifact.Artifact
+	if args.Get(0) != nil {
+		a = args.Get(0).(*artifact.Artifact)
+	}
+	return a, args.Error(1)
+}
+
+func (l *localInterfaceMock) SameArtifact(ctx context.Context, repo, tag, dig string) (bool, error) {
+	panic("implement me")
+}
+
 func (l *localInterfaceMock) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
 	args := l.Called(ctx, art)
 	return args.Bool(0), args.Error(1)
 }
 
-func (l *localInterfaceMock) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool {
-	args := l.Called(ctx, art)
-	return args.Bool(0)
-}
-
 func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error {
 	panic("implement me")
 }
@@ -63,15 +93,18 @@ func (l *localInterfaceMock) DeleteManifest(repo, ref string) {
 
 type proxyControllerTestSuite struct {
 	suite.Suite
-	local *localInterfaceMock
-	ctr   Controller
+	local  *localInterfaceMock
+	remote *remoteInterfaceMock
+	ctr    Controller
+	proj   *models.Project
 }
 
 func (p *proxyControllerTestSuite) SetupTest() {
 	p.local = &localInterfaceMock{}
+	p.remote = &remoteInterfaceMock{}
+	p.proj = &models.Project{RegistryID: 1}
 	p.ctr = &controller{
 		blobCtl:     blob.Ctl,
-		registryMgr: registry.NewDefaultManager(),
 		artifactCtl: artifact.Ctl,
 		local:       p.local,
 	}
@@ -81,8 +114,10 @@ func (p *proxyControllerTestSuite) TestUseLocalManifest_True() {
 	ctx := context.Background()
 	dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
 	art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
-	p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil)
-	result := p.ctr.UseLocalManifest(ctx, art)
+	p.local.On("GetManifest", mock.Anything, mock.Anything).Return(&artifact.Artifact{}, nil)
+
+	result, err := p.ctr.UseLocalManifest(ctx, art)
+	p.Assert().Nil(err)
 	p.Assert().True(result)
 }
 
@@ -90,16 +125,18 @@ func (p *proxyControllerTestSuite) TestUseLocalManifest_False() {
 	ctx := context.Background()
 	dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
 	art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
-	p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(false, nil)
-	result := p.ctr.UseLocalManifest(ctx, art)
+	p.local.On("GetManifest", mock.Anything, mock.Anything).Return(nil, nil)
+	result, err := p.ctr.UseLocalManifest(ctx, art)
+	p.Assert().Nil(err)
 	p.Assert().False(result)
 }
 
 func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() {
 	ctx := context.Background()
 	art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"}
-	p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil)
-	result := p.ctr.UseLocalManifest(ctx, art)
+	p.local.On("GetManifest", mock.Anything, mock.Anything).Return(&artifact.Artifact{}, nil)
+	result, err := p.ctr.UseLocalManifest(ctx, art)
+	p.Assert().Nil(err)
 	p.Assert().False(result)
 }
 
diff --git a/src/controller/proxy/local.go b/src/controller/proxy/local.go
index 76660e75e..55cc5ecf8 100644
--- a/src/controller/proxy/local.go
+++ b/src/controller/proxy/local.go
@@ -16,26 +16,30 @@ package proxy
 
 import (
 	"context"
-	"errors"
 	"fmt"
+	"io"
+	"time"
+
 	"github.com/docker/distribution"
 	"github.com/docker/distribution/manifest/manifestlist"
 	"github.com/goharbor/harbor/src/controller/artifact"
+	"github.com/goharbor/harbor/src/controller/event/metadata"
 	"github.com/goharbor/harbor/src/core/config"
 	"github.com/goharbor/harbor/src/lib"
+	"github.com/goharbor/harbor/src/lib/errors"
 	"github.com/goharbor/harbor/src/lib/log"
+	"github.com/goharbor/harbor/src/pkg/notifier/event"
 	"github.com/goharbor/harbor/src/pkg/proxy/secret"
 	"github.com/goharbor/harbor/src/pkg/registry"
-	"io"
-	"time"
+	"github.com/opencontainers/go-digest"
 )
 
 // localInterface defines operations related to local repo under proxy mode
 type localInterface interface {
 	// BlobExist check if the blob exist in local repo
 	BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error)
-	// Manifest check if the manifest exist in local repo
-	ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool
+	// GetManifest get the manifest info
+	GetManifest(ctx context.Context, art lib.ArtifactInfo) (*artifact.Artifact, error)
 	// PushBlob push blob to local repo
 	PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error
 	// PushManifest push manifest to local repo, ref can be digest or tag
@@ -48,13 +52,16 @@ type localInterface interface {
 	DeleteManifest(repo, ref string)
 }
 
-func (l *localHelper) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool {
-	a, err := l.artifactCtl.GetByReference(ctx, art.Repository, art.Digest, nil)
+func (l *localHelper) GetManifest(ctx context.Context, art lib.ArtifactInfo) (*artifact.Artifact, error) {
+	ref := getReference(art)
+	a, err := l.artifactCtl.GetByReference(ctx, art.Repository, ref, nil)
 	if err != nil {
-		log.Errorf("check manifest exist failed, error %v", err)
-		return false
+		if errors.IsNotFoundErr(err) {
+			return nil, nil
+		}
+		return nil, err
 	}
-	return a != nil
+	return a, nil
 }
 
 // localHelper defines operations related to local repo under proxy mode
@@ -134,7 +141,11 @@ func (l *localHelper) updateManifestList(ctx context.Context, repo string, manif
 		existMans := make([]manifestlist.ManifestDescriptor, 0)
 		for _, m := range v.Manifests {
 			art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)}
-			if l.ManifestExist(ctx, art) {
+			a, err := l.GetManifest(ctx, art)
+			if err != nil {
+				return nil, err
+			}
+			if a != nil {
 				existMans = append(existMans, m)
 			}
 		}
@@ -143,14 +154,14 @@ func (l *localHelper) updateManifestList(ctx context.Context, repo string, manif
 	return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
 }
 
-func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error {
+func (l *localHelper) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
 	// For manifest list, it might include some different manifest
 	// it will wait and check for 30 mins, if all depend manifests exist then push it
 	// if time exceed, only push the new updated manifest list which contains existing manifest
 	var newMan distribution.Manifest
 	var err error
 	for n := 0; n < maxManifestListWait; n++ {
-		log.Debugf("waiting for the manifest ready, repo %v, ref:%v", repo, ref)
+		log.Debugf("waiting for the manifest ready, repo %v, tag:%v", repo, tag)
 		time.Sleep(sleepIntervalSec * time.Second)
 		newMan, err = l.updateManifestList(ctx, repo, man)
 		if err != nil {
@@ -160,7 +171,6 @@ func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref str
 			break
 		}
 	}
-
 	if len(newMan.References()) == 0 {
 		return errors.New("manifest list doesn't contain any pushed manifest")
 	}
@@ -170,7 +180,17 @@ func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref str
 		return err
 	}
 	log.Debugf("The manifest list payload: %v", string(pl))
-	return l.PushManifest(repo, ref, newMan)
+	dig := digest.FromBytes(pl)
+	// Because the manifest list maybe updated, need to recheck if it is exist in local
+	art := lib.ArtifactInfo{Repository: repo, Tag: tag}
+	a, err := l.GetManifest(ctx, art)
+	if err != nil {
+		return err
+	}
+	if a != nil && a.Digest == string(dig) {
+		return nil
+	}
+	return l.PushManifest(repo, tag, newMan)
 }
 
 func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
@@ -188,3 +208,13 @@ func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man di
 	log.Debugf("Check dependency result %v", waitDesc)
 	return waitDesc
 }
+
+// SendPullEvent send a pull image event
+func SendPullEvent(ctx context.Context, a *artifact.Artifact, tag string) {
+	e := &metadata.PullArtifactEventMetadata{
+		Ctx:      ctx,
+		Artifact: &a.Artifact,
+		Tag:      tag,
+	}
+	event.BuildAndPublish(e)
+}
diff --git a/src/controller/proxy/local_test.go b/src/controller/proxy/local_test.go
index e3a1ceeba..b74736e02 100644
--- a/src/controller/proxy/local_test.go
+++ b/src/controller/proxy/local_test.go
@@ -191,8 +191,9 @@ func (lh *localHelperTestSuite) TestManifestExist() {
 	var opt *artifact.Option
 	lh.artCtl.On("GetByReference", ctx, "library/hello-world", dig, opt).Return(ar, nil)
 	art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
-	exist := lh.local.ManifestExist(ctx, art)
-	lh.Assert().True(exist)
+	a, err := lh.local.GetManifest(ctx, art)
+	lh.Assert().Nil(err)
+	lh.Assert().NotNil(a)
 }
 
 func TestLocalHelperTestSuite(t *testing.T) {
diff --git a/src/controller/proxy/remote.go b/src/controller/proxy/remote.go
index 30056658e..db3a8d273 100644
--- a/src/controller/proxy/remote.go
+++ b/src/controller/proxy/remote.go
@@ -17,8 +17,8 @@ package proxy
 import (
 	"fmt"
 	"github.com/docker/distribution"
-	"github.com/goharbor/harbor/src/lib/log"
 	"github.com/goharbor/harbor/src/replication/adapter"
+	"github.com/goharbor/harbor/src/replication/model"
 	"github.com/goharbor/harbor/src/replication/registry"
 	"io"
 )
@@ -28,20 +28,22 @@ type remoteInterface interface {
 	// BlobReader create a reader for remote blob
 	BlobReader(repo, dig string) (int64, io.ReadCloser, error)
 	// Manifest get manifest by reference
-	Manifest(repo string, ref string) (distribution.Manifest, error)
+	Manifest(repo string, ref string) (distribution.Manifest, string, error)
 }
 
 // remoteHelper defines operations related to remote repository under proxy
 type remoteHelper struct {
-	regID    int64
-	registry adapter.ArtifactRegistry
+	regID       int64
+	registry    adapter.ArtifactRegistry
+	registryMgr registry.Manager
 }
 
 // newRemoteHelper create a remoteHelper interface
-func newRemoteHelper(regID int64) (remoteInterface, error) {
-	r := &remoteHelper{regID: regID}
+func newRemoteHelper(regID int64) (*remoteHelper, error) {
+	r := &remoteHelper{
+		regID:       regID,
+		registryMgr: registry.NewDefaultManager()}
 	if err := r.init(); err != nil {
-		log.Errorf("failed to create remoteHelper error %v", err)
 		return nil, err
 	}
 	return r, nil
@@ -52,13 +54,16 @@ func (r *remoteHelper) init() error {
 	if r.registry != nil {
 		return nil
 	}
-	reg, err := registry.NewDefaultManager().Get(r.regID)
+	reg, err := r.registryMgr.Get(r.regID)
 	if err != nil {
 		return err
 	}
 	if reg == nil {
 		return fmt.Errorf("failed to get registry, registryID: %v", r.regID)
 	}
+	if reg.Status != model.Healthy {
+		return fmt.Errorf("current registry is unhealthy, regID:%v, Name:%v, Status: %v", reg.ID, reg.Name, reg.Status)
+	}
 	factory, err := adapter.GetFactory(reg.Type)
 	if err != nil {
 		return err
@@ -75,7 +80,6 @@ func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error
 	return r.registry.PullBlob(repo, dig)
 }
 
-func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, error) {
-	man, _, err := r.registry.PullManifest(repo, ref)
-	return man, err
+func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
+	return r.registry.PullManifest(repo, ref)
 }
diff --git a/src/server/middleware/repoproxy/proxy.go b/src/server/middleware/repoproxy/proxy.go
index 1c8fe9a78..fdef48ba8 100644
--- a/src/server/middleware/repoproxy/proxy.go
+++ b/src/server/middleware/repoproxy/proxy.go
@@ -98,7 +98,15 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e
 	if err != nil {
 		return err
 	}
-	if !canProxy(p) || proxyCtl.UseLocalManifest(ctx, art) {
+	if !canProxy(p) {
+		next.ServeHTTP(w, r)
+		return nil
+	}
+	useLocal, err := proxyCtl.UseLocalManifest(ctx, art)
+	if err != nil {
+		return err
+	}
+	if useLocal {
 		next.ServeHTTP(w, r)
 		return nil
 	}