From 1f75b7aaef21a1d87da8dbbff20022f58d82f9d4 Mon Sep 17 00:00:00 2001 From: Shengwen YU Date: Thu, 8 Aug 2024 19:02:54 +0800 Subject: [PATCH] feat: implement bandwidth limit for proxy-cache (#20812) Signed-off-by: Shengwen Yu --- api/v2.0/swagger.yaml | 4 ++ src/controller/proxy/controller.go | 2 +- src/controller/proxy/options.go | 37 +++++++++++++++++++ src/controller/proxy/options_test.go | 33 +++++++++++++++++ src/controller/proxy/remote.go | 17 +++++++-- .../replication/transfer/image/transfer.go | 5 ++- .../transfer => lib}/iothrottler.go | 2 +- src/pkg/project/models/pro_meta.go | 1 + src/pkg/project/models/project.go | 13 +++++++ src/server/middleware/repoproxy/proxy.go | 10 ++--- src/server/middleware/repoproxy/tag.go | 4 +- src/server/v2.0/handler/project.go | 17 +++++++++ src/server/v2.0/handler/project_metadata.go | 6 +++ 13 files changed, 137 insertions(+), 14 deletions(-) create mode 100644 src/controller/proxy/options.go create mode 100644 src/controller/proxy/options_test.go rename src/{controller/replication/transfer => lib}/iothrottler.go (98%) diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index b219902e3..681d0e354 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -7340,6 +7340,10 @@ definitions: type: string description: 'The ID of the tag retention policy for the project' x-nullable: true + proxy_speed_kb: + type: string + description: 'The bandwidth limit of proxy cache, in Kbps (kilobits per second). It limits the communication between Harbor and the upstream registry, not the client and the Harbor.' + x-nullable: true ProjectSummary: type: object properties: diff --git a/src/controller/proxy/controller.go b/src/controller/proxy/controller.go index 38d55397c..7138b6f0b 100644 --- a/src/controller/proxy/controller.go +++ b/src/controller/proxy/controller.go @@ -264,7 +264,7 @@ func (c *controller) HeadManifest(_ context.Context, art lib.ArtifactInfo, remot func (c *controller) ProxyBlob(ctx context.Context, p *proModels.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) { remoteRepo := getRemoteRepo(art) log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo) - rHelper, err := NewRemoteHelper(ctx, p.RegistryID) + rHelper, err := NewRemoteHelper(ctx, p.RegistryID, WithSpeed(p.ProxyCacheSpeed())) if err != nil { return 0, nil, err } diff --git a/src/controller/proxy/options.go b/src/controller/proxy/options.go new file mode 100644 index 000000000..2f81bfc3f --- /dev/null +++ b/src/controller/proxy/options.go @@ -0,0 +1,37 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +type Option func(*Options) + +type Options struct { + // Speed is the data transfer speed for proxy cache from Harbor to upstream registry, no limit by default. + Speed int32 +} + +func NewOptions(opts ...Option) *Options { + o := &Options{} + for _, opt := range opts { + opt(o) + } + + return o +} + +func WithSpeed(speed int32) Option { + return func(o *Options) { + o.Speed = speed + } +} diff --git a/src/controller/proxy/options_test.go b/src/controller/proxy/options_test.go new file mode 100644 index 000000000..2b0a4ef80 --- /dev/null +++ b/src/controller/proxy/options_test.go @@ -0,0 +1,33 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewOptions(t *testing.T) { + // test default options + o := NewOptions() + assert.Equal(t, int32(0), o.Speed) + + // test with options + // with speed + withSpeed := WithSpeed(1024) + o = NewOptions(withSpeed) + assert.Equal(t, int32(1024), o.Speed) +} diff --git a/src/controller/proxy/remote.go b/src/controller/proxy/remote.go index ac7f23f28..4143b8170 100644 --- a/src/controller/proxy/remote.go +++ b/src/controller/proxy/remote.go @@ -21,6 +21,7 @@ import ( "github.com/docker/distribution" + "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/pkg/reg" "github.com/goharbor/harbor/src/pkg/reg/adapter" "github.com/goharbor/harbor/src/pkg/reg/model" @@ -43,13 +44,16 @@ type remoteHelper struct { regID int64 registry adapter.ArtifactRegistry registryMgr reg.Manager + opts *Options } // NewRemoteHelper create a remote interface -func NewRemoteHelper(ctx context.Context, regID int64) (RemoteInterface, error) { +func NewRemoteHelper(ctx context.Context, regID int64, opts ...Option) (RemoteInterface, error) { r := &remoteHelper{ regID: regID, - registryMgr: reg.Mgr} + registryMgr: reg.Mgr, + opts: NewOptions(opts...), + } if err := r.init(ctx); err != nil { return nil, err } @@ -83,7 +87,14 @@ func (r *remoteHelper) init(ctx context.Context) error { } func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error) { - return r.registry.PullBlob(repo, dig) + sz, bReader, err := r.registry.PullBlob(repo, dig) + if err != nil { + return 0, nil, err + } + if r.opts != nil && r.opts.Speed > 0 { + bReader = lib.NewReader(bReader, r.opts.Speed) + } + return sz, bReader, err } func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, string, error) { diff --git a/src/controller/replication/transfer/image/transfer.go b/src/controller/replication/transfer/image/transfer.go index 55bda9646..09c1bea27 100644 --- a/src/controller/replication/transfer/image/transfer.go +++ b/src/controller/replication/transfer/image/transfer.go @@ -30,6 +30,7 @@ import ( common_http "github.com/goharbor/harbor/src/common/http" trans "github.com/goharbor/harbor/src/controller/replication/transfer" + "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/pkg/reg/adapter" "github.com/goharbor/harbor/src/pkg/reg/model" @@ -380,7 +381,7 @@ func (t *transfer) copyBlobByMonolithic(srcRepo, dstRepo, digest string, sizeFro return err } if speed > 0 { - data = trans.NewReader(data, speed) + data = lib.NewReader(data, speed) } defer data.Close() // get size 0 from PullBlob, use size from distribution.Descriptor instead. @@ -435,7 +436,7 @@ func (t *transfer) copyBlobByChunk(srcRepo, dstRepo, digest string, sizeFromDesc } if speed > 0 { - data = trans.NewReader(data, speed) + data = lib.NewReader(data, speed) } // failureEnd will only be used for adjusting content range when issue happened during push the chunk. var failureEnd int64 diff --git a/src/controller/replication/transfer/iothrottler.go b/src/lib/iothrottler.go similarity index 98% rename from src/controller/replication/transfer/iothrottler.go rename to src/lib/iothrottler.go index 828c44035..b0853e0e5 100644 --- a/src/controller/replication/transfer/iothrottler.go +++ b/src/lib/iothrottler.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package transfer +package lib import ( "fmt" diff --git a/src/pkg/project/models/pro_meta.go b/src/pkg/project/models/pro_meta.go index a8a0659fe..25f7e41be 100644 --- a/src/pkg/project/models/pro_meta.go +++ b/src/pkg/project/models/pro_meta.go @@ -24,4 +24,5 @@ const ( ProMetaAutoScan = "auto_scan" ProMetaReuseSysCVEAllowlist = "reuse_sys_cve_allowlist" ProMetaAutoSBOMGen = "auto_sbom_generation" + ProMetaProxySpeed = "proxy_speed_kb" ) diff --git a/src/pkg/project/models/project.go b/src/pkg/project/models/project.go index e533bd911..80318aa21 100644 --- a/src/pkg/project/models/project.go +++ b/src/pkg/project/models/project.go @@ -156,6 +156,19 @@ func (p *Project) AutoSBOMGen() bool { return isTrue(auto) } +// ProxyCacheSpeed ... +func (p *Project) ProxyCacheSpeed() int32 { + speed, exist := p.GetMetadata(ProMetaProxySpeed) + if !exist { + return 0 + } + speedInt, err := strconv.ParseInt(speed, 10, 32) + if err != nil { + return 0 + } + return int32(speedInt) +} + // FilterByPublic returns orm.QuerySeter with public filter func (p *Project) FilterByPublic(_ context.Context, qs orm.QuerySeter, _ string, value interface{}) orm.QuerySeter { subQuery := `SELECT project_id FROM project_metadata WHERE name = 'public' AND value = '%s'` diff --git a/src/server/middleware/repoproxy/proxy.go b/src/server/middleware/repoproxy/proxy.go index ddb201784..641638766 100644 --- a/src/server/middleware/repoproxy/proxy.go +++ b/src/server/middleware/repoproxy/proxy.go @@ -60,7 +60,7 @@ func BlobGetMiddleware() func(http.Handler) http.Handler { func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error { ctx := r.Context() - art, p, proxyCtl, err := preCheck(ctx) + art, p, proxyCtl, err := preCheck(ctx, true) if err != nil { return err } @@ -96,14 +96,14 @@ func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error return nil } -func preCheck(ctx context.Context) (art lib.ArtifactInfo, p *proModels.Project, ctl proxy.Controller, err error) { +func preCheck(ctx context.Context, withProjectMetadata bool) (art lib.ArtifactInfo, p *proModels.Project, ctl proxy.Controller, err error) { none := lib.ArtifactInfo{} art = lib.GetArtifactInfo(ctx) if art == none { return none, nil, nil, errors.New("artifactinfo is not found").WithCode(errors.NotFoundCode) } ctl = proxy.ControllerInstance() - p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(false)) + p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(withProjectMetadata)) return } @@ -155,7 +155,7 @@ func defaultBlobURL(projectName string, name string, digest string) string { func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error { ctx := r.Context() - art, p, proxyCtl, err := preCheck(ctx) + art, p, proxyCtl, err := preCheck(ctx, true) if err != nil { return err } @@ -174,7 +174,7 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e next.ServeHTTP(w, r) return nil } - remote, err := proxy.NewRemoteHelper(r.Context(), p.RegistryID) + remote, err := proxy.NewRemoteHelper(r.Context(), p.RegistryID, proxy.WithSpeed(p.ProxyCacheSpeed())) if err != nil { return err } diff --git a/src/server/middleware/repoproxy/tag.go b/src/server/middleware/repoproxy/tag.go index 6e0d3070d..005a5f777 100644 --- a/src/server/middleware/repoproxy/tag.go +++ b/src/server/middleware/repoproxy/tag.go @@ -35,7 +35,7 @@ func TagsListMiddleware() func(http.Handler) http.Handler { return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) { ctx := r.Context() - art, p, _, err := preCheck(ctx) + art, p, _, err := preCheck(ctx, false) if err != nil { libhttp.SendError(w, err) return @@ -69,7 +69,7 @@ func TagsListMiddleware() func(http.Handler) http.Handler { util.SendListTagsResponse(w, r, tags) }() - remote, err := proxy.NewRemoteHelper(ctx, p.RegistryID) + remote, err := proxy.NewRemoteHelper(ctx, p.RegistryID, proxy.WithSpeed(p.ProxyCacheSpeed())) if err != nil { logger.Warningf("failed to get remote interface, error: %v, fallback to local tags", err) return diff --git a/src/server/v2.0/handler/project.go b/src/server/v2.0/handler/project.go index 6a133fb30..047992954 100644 --- a/src/server/v2.0/handler/project.go +++ b/src/server/v2.0/handler/project.go @@ -159,6 +159,11 @@ func (a *projectAPI) CreateProject(ctx context.Context, params operation.CreateP } } + // ignore metadata.proxy_speed_kb for non-proxy-cache project + if req.RegistryID == nil { + req.Metadata.ProxySpeedKb = nil + } + // ignore enable_content_trust metadata for proxy cache project // see https://github.com/goharbor/harbor/issues/12940 to get more info if req.RegistryID != nil { @@ -551,6 +556,11 @@ func (a *projectAPI) UpdateProject(ctx context.Context, params operation.UpdateP } } + // ignore metadata.proxy_speed_kb for non-proxy-cache project + if params.Project.Metadata != nil && !p.IsProxy() { + params.Project.Metadata.ProxySpeedKb = nil + } + // ignore enable_content_trust metadata for proxy cache project // see https://github.com/goharbor/harbor/issues/12940 to get more info if params.Project.Metadata != nil && p.IsProxy() { @@ -792,6 +802,13 @@ func (a *projectAPI) validateProjectReq(ctx context.Context, req *models.Project if !permitted { return errors.BadRequestError(fmt.Errorf("unsupported registry type %s", string(registry.Type))) } + + // validate metadata.proxy_speed_kb. It should be an int32 + if ps := req.Metadata.ProxySpeedKb; ps != nil { + if _, err := strconv.ParseInt(*ps, 10, 32); err != nil { + return errors.BadRequestError(nil).WithMessage(fmt.Sprintf("metadata.proxy_speed_kb should by an int32, but got: '%s', err: %s", *ps, err)) + } + } } if req.StorageLimit != nil { diff --git a/src/server/v2.0/handler/project_metadata.go b/src/server/v2.0/handler/project_metadata.go index a5bde2e45..7fa297b72 100644 --- a/src/server/v2.0/handler/project_metadata.go +++ b/src/server/v2.0/handler/project_metadata.go @@ -155,6 +155,12 @@ func (p *projectMetadataAPI) validate(metas map[string]string) (map[string]strin return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid value: %s", value) } metas[proModels.ProMetaSeverity] = strings.ToLower(severity.String()) + case proModels.ProMetaProxySpeed: + v, err := strconv.ParseInt(value, 10, 32) + if err != nil { + return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid value: %s", value) + } + metas[proModels.ProMetaProxySpeed] = strconv.FormatInt(v, 10) default: return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid key: %s", key) }