From e63d5a1c06eac30e3a12b2c91734ed0334217ced Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Fri, 23 Mar 2018 23:53:15 +0800 Subject: [PATCH] Create a job to call UI's replication API to do the period replication job --- src/common/job/const.go | 12 +- .../job/impl/replication/delete.go | 7 +- .../job/impl/replication/delete_test.go | 25 ++ .../job/impl/replication/replicate.go | 327 ++-------------- .../job/impl/replication/replicate_test.go | 12 +- .../job/impl/replication/transfer.go | 351 ++++++++++++++++++ .../job/impl/replication/transfer_test.go | 25 ++ src/jobservice_v2/runtime/bootstrap.go | 7 +- src/ui/api/replication.go | 2 +- 9 files changed, 453 insertions(+), 315 deletions(-) create mode 100644 src/jobservice_v2/job/impl/replication/delete_test.go create mode 100644 src/jobservice_v2/job/impl/replication/transfer.go create mode 100644 src/jobservice_v2/job/impl/replication/transfer_test.go diff --git a/src/common/job/const.go b/src/common/job/const.go index f485964ea..5ae5863be 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -3,10 +3,10 @@ package job const ( //ImageScanJob is name of scan job it will be used as key to register to job service. ImageScanJob = "IMAGE_SCAN" - // ImageReplicationTransfer : the name of replication transfer job in job service - ImageReplicationTransfer = "IMAGE_REPLICATION_TRANSFER" - // ImageReplicationDelete : the name of replication delete job in job service - ImageReplicationDelete = "IMAGE_REPLICATION_DELETE" - // ImagePeriodReplication : the name of period replication job in job service - ImagePeriodReplication = "IMAGE_PERIOD_REPLICATION" + // ImageTransfer : the name of image transfer job in job service + ImageTransfer = "IMAGE_TRANSFER" + // ImageDelete : the name of image delete job in job service + ImageDelete = "IMAGE_DELETE" + // ImageReplicate : the name of image replicate job in job service + ImageReplicate = "IMAGE_REPLICATE" ) diff --git a/src/jobservice_v2/job/impl/replication/delete.go b/src/jobservice_v2/job/impl/replication/delete.go index e77e702af..9ce86fa62 100644 --- a/src/jobservice_v2/job/impl/replication/delete.go +++ b/src/jobservice_v2/job/impl/replication/delete.go @@ -4,9 +4,9 @@ import ( "net/http" common_http "github.com/vmware/harbor/src/common/http" - "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/registry/auth" "github.com/vmware/harbor/src/jobservice_v2/env" + "github.com/vmware/harbor/src/jobservice_v2/logger" ) // Deleter deletes repository or images on the destination registry @@ -14,7 +14,7 @@ type Deleter struct { ctx env.JobContext repository *repository dstRegistry *registry - logger *log.Logger + logger logger.Interface retry bool } @@ -49,8 +49,7 @@ func (d *Deleter) run(ctx env.JobContext, params map[string]interface{}) error { } func (d *Deleter) init(ctx env.JobContext, params map[string]interface{}) error { - // TODO - d.logger = log.DefaultLogger() + d.logger = ctx.GetLogger() d.ctx = ctx if canceled(d.ctx) { diff --git a/src/jobservice_v2/job/impl/replication/delete_test.go b/src/jobservice_v2/job/impl/replication/delete_test.go new file mode 100644 index 000000000..f6a27fd98 --- /dev/null +++ b/src/jobservice_v2/job/impl/replication/delete_test.go @@ -0,0 +1,25 @@ +package replication + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMaxFailsOfDeleter(t *testing.T) { + d := &Deleter{} + assert.Equal(t, uint(3), d.MaxFails()) +} + +func TestValidateOfDeleter(t *testing.T) { + d := &Deleter{} + require.Nil(t, d.Validate(nil)) +} + +func TestShouldRetryOfDeleter(t *testing.T) { + d := &Deleter{} + assert.False(t, d.ShouldRetry()) + d.retry = true + assert.True(t, d.ShouldRetry()) +} diff --git a/src/jobservice_v2/job/impl/replication/replicate.go b/src/jobservice_v2/job/impl/replication/replicate.go index 6cdfc940c..d455e2688 100644 --- a/src/jobservice_v2/job/impl/replication/replicate.go +++ b/src/jobservice_v2/job/impl/replication/replicate.go @@ -1,49 +1,36 @@ package replication import ( - "errors" "fmt" - "net" "net/http" - "os" - "strings" - "github.com/docker/distribution" - "github.com/docker/distribution/manifest/schema1" - "github.com/docker/distribution/manifest/schema2" common_http "github.com/vmware/harbor/src/common/http" - "github.com/vmware/harbor/src/common/http/modifier" "github.com/vmware/harbor/src/common/models" - "github.com/vmware/harbor/src/common/utils" - "github.com/vmware/harbor/src/common/utils/log" reg "github.com/vmware/harbor/src/common/utils/registry" "github.com/vmware/harbor/src/common/utils/registry/auth" "github.com/vmware/harbor/src/jobservice_v2/env" - job_utils "github.com/vmware/harbor/src/jobservice_v2/job/impl/utils" + "github.com/vmware/harbor/src/jobservice_v2/logger" ) -var ( - errCanceled = errors.New("the job is canceled") -) - -// Replicator replicates images from source registry to the destination one +// Replicator call UI's API to start a repliation according to the policy ID +// passed in parameters type Replicator struct { - ctx env.JobContext - repository *repository - srcRegistry *registry - dstRegistry *registry - logger *log.Logger - retry bool + ctx env.JobContext + url string // the URL of UI service + insecure bool + policyID int64 + client *common_http.Client + logger logger.Interface } -// ShouldRetry : retry if the error is network error +// ShouldRetry ... func (r *Replicator) ShouldRetry() bool { - return r.retry + return false } // MaxFails ... func (r *Replicator) MaxFails() uint { - return 3 + return 0 } // Validate .... @@ -53,296 +40,48 @@ func (r *Replicator) Validate(params map[string]interface{}) error { // Run ... func (r *Replicator) Run(ctx env.JobContext, params map[string]interface{}) error { - err := r.run(ctx, params) - r.retry = retry(err) - return err -} - -func (r *Replicator) run(ctx env.JobContext, params map[string]interface{}) error { - // initialize if err := r.init(ctx, params); err != nil { return err } - // try to create project on destination registry - if err := r.createProject(); err != nil { - return err - } - // replicate the images - for _, tag := range r.repository.tags { - digest, manifest, err := r.pullManifest(tag) - if err != nil { - return err - } - if err := r.transferLayers(tag, manifest.References()); err != nil { - return err - } - if err := r.pushManifest(tag, digest, manifest); err != nil { - return err - } - } - - return nil + return r.replicate() } func (r *Replicator) init(ctx env.JobContext, params map[string]interface{}) error { - // TODO - r.logger = log.DefaultLogger() + r.logger = ctx.GetLogger() r.ctx = ctx - if canceled(r.ctx) { r.logger.Warning(errCanceled.Error()) return errCanceled } - // init images that need to be replicated - r.repository = &repository{ - name: params["repository"].(string), - } - if tags, ok := params["tags"]; ok { - tgs := tags.([]interface{}) - for _, tg := range tgs { - r.repository.tags = append(r.repository.tags, tg.(string)) - } - } - - var err error - // init source registry client - srcURL := params["src_registry_url"].(string) - srcInsecure := params["src_registry_insecure"].(bool) - srcCred := auth.NewCookieCredential(&http.Cookie{ + r.policyID = (int64)(params["policy_id"].(float64)) + r.url = params["url"].(string) + r.insecure = params["insecure"].(bool) + cred := auth.NewCookieCredential(&http.Cookie{ Name: models.UISecretCookie, - Value: os.Getenv("JOBSERVICE_SECRET"), + Value: secret(), }) - srcTokenServiceURL := "" - if stsu, ok := params["src_token_service_url"]; ok { - srcTokenServiceURL = stsu.(string) - } - if len(srcTokenServiceURL) > 0 { - r.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, r.repository.name, srcTokenServiceURL) - } else { - r.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, r.repository.name) - } - if err != nil { - r.logger.Errorf("failed to create client for source registry: %v", err) - return err - } + r.client = common_http.NewClient(&http.Client{ + Transport: reg.GetHTTPTransport(r.insecure), + }, cred) - // init destination registry client - dstURL := params["dst_registry_url"].(string) - dstInsecure := params["dst_registry_insecure"].(bool) - dstCred := auth.NewBasicAuthCredential( - params["dst_registry_username"].(string), - params["dst_registry_password"].(string)) - r.dstRegistry, err = initRegistry(dstURL, dstInsecure, dstCred, r.repository.name) - if err != nil { - r.logger.Errorf("failed to create client for destination registry: %v", err) - return err - } - - // get the tag list first if it is null - if len(r.repository.tags) == 0 { - tags, err := r.srcRegistry.ListTag() - if err != nil { - r.logger.Errorf("an error occurred while listing tags for the source repository: %v", err) - return err - } - - if len(tags) == 0 { - err = fmt.Errorf("empty tag list for repository %s", r.repository.name) - r.logger.Error(err) - return err - } - r.repository.tags = tags - } - - r.logger.Infof("initialization completed: repository: %s, tags: %v, source registry: URL-%s insecure-%v, destination registry: URL-%s insecure-%v", - r.repository.name, r.repository.tags, r.srcRegistry.url, r.srcRegistry.insecure, r.dstRegistry.url, r.dstRegistry.insecure) + r.logger.Infof("initialization completed: policy ID: %d, URL: %s, insecure: %v", + r.policyID, r.url, r.insecure) return nil } -func initRegistry(url string, insecure bool, credential modifier.Modifier, - repository string, tokenServiceURL ...string) (*registry, error) { - registry := ®istry{ - url: url, - insecure: insecure, - } - - // use the same transport for clients connecting to docker registry and Harbor UI - transport := reg.GetHTTPTransport(insecure) - - authorizer := auth.NewStandardTokenAuthorizer(&http.Client{ - Transport: transport, - }, credential, tokenServiceURL...) - uam := &job_utils.UserAgentModifier{ - UserAgent: "harbor-registry-client", - } - repositoryClient, err := reg.NewRepository(repository, url, - &http.Client{ - Transport: reg.NewTransport(transport, authorizer, uam), - }) - if err != nil { - return nil, err - } - registry.Repository = *repositoryClient - - registry.client = common_http.NewClient( - &http.Client{ - Transport: transport, - }, credential) - return registry, nil -} - -func (r *Replicator) createProject() error { - if canceled(r.ctx) { - r.logger.Warning(errCanceled.Error()) - return errCanceled - } - p, _ := utils.ParseRepository(r.repository.name) - project, err := r.srcRegistry.GetProject(p) - if err != nil { - r.logger.Errorf("failed to get project %s from source registry: %v", p, err) +func (r *Replicator) replicate() error { + if err := r.client.Post(fmt.Sprintf("%s/api/replications", r.url), struct { + PolicyID int64 `json:"policy_id"` + }{ + PolicyID: r.policyID, + }); err != nil { + r.logger.Errorf("failed to send the replication request to %s: %v", r.url, err) return err } - - if err = r.dstRegistry.CreateProject(project); err != nil { - // other jobs may be also doing the same thing when the current job - // is creating project or the project has already exist, so when the - // response code is 409, continue to do next step - if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusConflict { - r.logger.Warningf("the status code is 409 when creating project %s on destination registry, try to do next step", p) - return nil - } - - r.logger.Errorf("an error occurred while creating project %s on destination registry: %v", p, err) - return err - } - r.logger.Infof("project %s is created on destination registry", p) + r.logger.Infof("the replication request has been sent to %s successfully", r.url) return nil -} - -func (r *Replicator) pullManifest(tag string) (string, distribution.Manifest, error) { - if canceled(r.ctx) { - r.logger.Warning(errCanceled.Error()) - return "", nil, errCanceled - } - - acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest} - digest, mediaType, payload, err := r.srcRegistry.PullManifest(tag, acceptMediaTypes) - if err != nil { - r.logger.Errorf("an error occurred while pulling manifest of %s:%s from source registry: %v", - r.repository.name, tag, err) - return "", nil, err - } - r.logger.Infof("manifest of %s:%s pulled successfully from source registry: %s", - r.repository.name, tag, digest) - - if strings.Contains(mediaType, "application/json") { - mediaType = schema1.MediaTypeManifest - } - - manifest, _, err := reg.UnMarshal(mediaType, payload) - if err != nil { - r.logger.Errorf("an error occurred while parsing manifest: %v", err) - return "", nil, err - } - - return digest, manifest, nil -} - -func (r *Replicator) transferLayers(tag string, blobs []distribution.Descriptor) error { - repository := r.repository.name - - // all blobs(layers and config) - for _, blob := range blobs { - if canceled(r.ctx) { - r.logger.Warning(errCanceled.Error()) - return errCanceled - } - - digest := blob.Digest.String() - exist, err := r.dstRegistry.BlobExist(digest) - if err != nil { - r.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on destination registry: %v", - digest, repository, tag, err) - return err - } - if exist { - r.logger.Infof("blob %s of %s:%s already exists on the destination registry, skip", - digest, repository, tag) - continue - } - - r.logger.Infof("transferring blob %s of %s:%s to the destination registry ...", - digest, repository, tag) - size, data, err := r.srcRegistry.PullBlob(digest) - if err != nil { - r.logger.Errorf("an error occurred while pulling blob %s of %s:%s from the source registry: %v", - digest, repository, tag, err) - return err - } - if data != nil { - defer data.Close() - } - if err = r.dstRegistry.PushBlob(digest, size, data); err != nil { - r.logger.Errorf("an error occurred while pushing blob %s of %s:%s to the distination registry: %v", - digest, repository, tag, err) - return err - } - r.logger.Infof("blob %s of %s:%s transferred to the destination registry completed", - digest, repository, tag) - } - - return nil -} - -func (r *Replicator) pushManifest(tag, digest string, manifest distribution.Manifest) error { - if canceled(r.ctx) { - r.logger.Warning(errCanceled.Error()) - return errCanceled - } - - repository := r.repository.name - _, exist, err := r.dstRegistry.ManifestExist(digest) - if err != nil { - r.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest", - repository, tag, err) - } else { - if exist { - r.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing", - repository, tag) - return nil - } - } - - mediaType, data, err := manifest.Payload() - if err != nil { - r.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v", - repository, tag, err) - return err - } - - if _, err = r.dstRegistry.PushManifest(tag, mediaType, data); err != nil { - r.logger.Errorf("an error occurred while pushing manifest of %s:%s to the destination registry: %v", - repository, tag, err) - return err - } - r.logger.Infof("manifest of %s:%s has been pushed to the destination registry", - repository, tag) - - return nil -} - -func canceled(ctx env.JobContext) bool { - _, canceled := ctx.OPCommand() - return canceled -} - -func retry(err error) bool { - if err == nil { - return false - } - _, ok := err.(net.Error) - return ok + } diff --git a/src/jobservice_v2/job/impl/replication/replicate_test.go b/src/jobservice_v2/job/impl/replication/replicate_test.go index 263e29690..6a8618120 100644 --- a/src/jobservice_v2/job/impl/replication/replicate_test.go +++ b/src/jobservice_v2/job/impl/replication/replicate_test.go @@ -7,19 +7,17 @@ import ( "github.com/stretchr/testify/require" ) -func TestMaxFails(t *testing.T) { +func TestMaxFailsOfReplicator(t *testing.T) { r := &Replicator{} - assert.Equal(t, uint(3), r.MaxFails()) + assert.Equal(t, uint(0), r.MaxFails()) } -func TestValidate(t *testing.T) { +func TestValidateOfReplicator(t *testing.T) { r := &Replicator{} require.Nil(t, r.Validate(nil)) } -func TestShouldRetry(t *testing.T) { +func TestShouldRetryOfReplicator(t *testing.T) { r := &Replicator{} - assert.False(t, r.retry) - r.retry = true - assert.True(t, r.retry) + assert.False(t, r.ShouldRetry()) } diff --git a/src/jobservice_v2/job/impl/replication/transfer.go b/src/jobservice_v2/job/impl/replication/transfer.go new file mode 100644 index 000000000..3cda41c5c --- /dev/null +++ b/src/jobservice_v2/job/impl/replication/transfer.go @@ -0,0 +1,351 @@ +package replication + +import ( + "errors" + "fmt" + "net" + "net/http" + "os" + "strings" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + common_http "github.com/vmware/harbor/src/common/http" + "github.com/vmware/harbor/src/common/http/modifier" + "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/utils" + reg "github.com/vmware/harbor/src/common/utils/registry" + "github.com/vmware/harbor/src/common/utils/registry/auth" + "github.com/vmware/harbor/src/jobservice_v2/env" + job_utils "github.com/vmware/harbor/src/jobservice_v2/job/impl/utils" + "github.com/vmware/harbor/src/jobservice_v2/logger" +) + +var ( + errCanceled = errors.New("the job is canceled") +) + +// Transfer images from source registry to the destination one +type Transfer struct { + ctx env.JobContext + repository *repository + srcRegistry *registry + dstRegistry *registry + logger logger.Interface + retry bool +} + +// ShouldRetry : retry if the error is network error +func (t *Transfer) ShouldRetry() bool { + return t.retry +} + +// MaxFails ... +func (t *Transfer) MaxFails() uint { + return 3 +} + +// Validate .... +func (t *Transfer) Validate(params map[string]interface{}) error { + return nil +} + +// Run ... +func (t *Transfer) Run(ctx env.JobContext, params map[string]interface{}) error { + err := t.run(ctx, params) + t.retry = retry(err) + return err +} + +func (t *Transfer) run(ctx env.JobContext, params map[string]interface{}) error { + // initialize + if err := t.init(ctx, params); err != nil { + return err + } + // try to create project on destination registry + if err := t.createProject(); err != nil { + return err + } + // replicate the images + for _, tag := range t.repository.tags { + digest, manifest, err := t.pullManifest(tag) + if err != nil { + return err + } + if err := t.transferLayers(tag, manifest.References()); err != nil { + return err + } + if err := t.pushManifest(tag, digest, manifest); err != nil { + return err + } + } + + return nil +} + +func (t *Transfer) init(ctx env.JobContext, params map[string]interface{}) error { + t.logger = ctx.GetLogger() + t.ctx = ctx + + if canceled(t.ctx) { + t.logger.Warning(errCanceled.Error()) + return errCanceled + } + + // init images that need to be replicated + t.repository = &repository{ + name: params["repository"].(string), + } + if tags, ok := params["tags"]; ok { + tgs := tags.([]interface{}) + for _, tg := range tgs { + t.repository.tags = append(t.repository.tags, tg.(string)) + } + } + + var err error + // init source registry client + srcURL := params["src_registry_url"].(string) + srcInsecure := params["src_registry_insecure"].(bool) + srcCred := auth.NewCookieCredential(&http.Cookie{ + Name: models.UISecretCookie, + Value: secret(), + }) + srcTokenServiceURL := "" + if stsu, ok := params["src_token_service_url"]; ok { + srcTokenServiceURL = stsu.(string) + } + + if len(srcTokenServiceURL) > 0 { + t.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, t.repository.name, srcTokenServiceURL) + } else { + t.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, t.repository.name) + } + if err != nil { + t.logger.Errorf("failed to create client for source registry: %v", err) + return err + } + + // init destination registry client + dstURL := params["dst_registry_url"].(string) + dstInsecure := params["dst_registry_insecure"].(bool) + dstCred := auth.NewBasicAuthCredential( + params["dst_registry_username"].(string), + params["dst_registry_password"].(string)) + t.dstRegistry, err = initRegistry(dstURL, dstInsecure, dstCred, t.repository.name) + if err != nil { + t.logger.Errorf("failed to create client for destination registry: %v", err) + return err + } + + // get the tag list first if it is null + if len(t.repository.tags) == 0 { + tags, err := t.srcRegistry.ListTag() + if err != nil { + t.logger.Errorf("an error occurred while listing tags for the source repository: %v", err) + return err + } + + if len(tags) == 0 { + err = fmt.Errorf("empty tag list for repository %s", t.repository.name) + t.logger.Error(err) + return err + } + t.repository.tags = tags + } + + t.logger.Infof("initialization completed: repository: %s, tags: %v, source registry: URL-%s insecure-%v, destination registry: URL-%s insecure-%v", + t.repository.name, t.repository.tags, t.srcRegistry.url, t.srcRegistry.insecure, t.dstRegistry.url, t.dstRegistry.insecure) + + return nil +} + +func initRegistry(url string, insecure bool, credential modifier.Modifier, + repository string, tokenServiceURL ...string) (*registry, error) { + registry := ®istry{ + url: url, + insecure: insecure, + } + + // use the same transport for clients connecting to docker registry and Harbor UI + transport := reg.GetHTTPTransport(insecure) + + authorizer := auth.NewStandardTokenAuthorizer(&http.Client{ + Transport: transport, + }, credential, tokenServiceURL...) + uam := &job_utils.UserAgentModifier{ + UserAgent: "harbor-registry-client", + } + repositoryClient, err := reg.NewRepository(repository, url, + &http.Client{ + Transport: reg.NewTransport(transport, authorizer, uam), + }) + if err != nil { + return nil, err + } + registry.Repository = *repositoryClient + + registry.client = common_http.NewClient( + &http.Client{ + Transport: transport, + }, credential) + return registry, nil +} + +func (t *Transfer) createProject() error { + if canceled(t.ctx) { + t.logger.Warning(errCanceled.Error()) + return errCanceled + } + p, _ := utils.ParseRepository(t.repository.name) + project, err := t.srcRegistry.GetProject(p) + if err != nil { + t.logger.Errorf("failed to get project %s from source registry: %v", p, err) + return err + } + + if err = t.dstRegistry.CreateProject(project); err != nil { + // other jobs may be also doing the same thing when the current job + // is creating project or the project has already exist, so when the + // response code is 409, continue to do next step + if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusConflict { + t.logger.Warningf("the status code is 409 when creating project %s on destination registry, try to do next step", p) + return nil + } + + t.logger.Errorf("an error occurred while creating project %s on destination registry: %v", p, err) + return err + } + t.logger.Infof("project %s is created on destination registry", p) + return nil +} + +func (t *Transfer) pullManifest(tag string) (string, distribution.Manifest, error) { + if canceled(t.ctx) { + t.logger.Warning(errCanceled.Error()) + return "", nil, errCanceled + } + + acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest} + digest, mediaType, payload, err := t.srcRegistry.PullManifest(tag, acceptMediaTypes) + if err != nil { + t.logger.Errorf("an error occurred while pulling manifest of %s:%s from source registry: %v", + t.repository.name, tag, err) + return "", nil, err + } + t.logger.Infof("manifest of %s:%s pulled successfully from source registry: %s", + t.repository.name, tag, digest) + + if strings.Contains(mediaType, "application/json") { + mediaType = schema1.MediaTypeManifest + } + + manifest, _, err := reg.UnMarshal(mediaType, payload) + if err != nil { + t.logger.Errorf("an error occurred while parsing manifest: %v", err) + return "", nil, err + } + + return digest, manifest, nil +} + +func (t *Transfer) transferLayers(tag string, blobs []distribution.Descriptor) error { + repository := t.repository.name + + // all blobs(layers and config) + for _, blob := range blobs { + if canceled(t.ctx) { + t.logger.Warning(errCanceled.Error()) + return errCanceled + } + + digest := blob.Digest.String() + exist, err := t.dstRegistry.BlobExist(digest) + if err != nil { + t.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on destination registry: %v", + digest, repository, tag, err) + return err + } + if exist { + t.logger.Infof("blob %s of %s:%s already exists on the destination registry, skip", + digest, repository, tag) + continue + } + + t.logger.Infof("transferring blob %s of %s:%s to the destination registry ...", + digest, repository, tag) + size, data, err := t.srcRegistry.PullBlob(digest) + if err != nil { + t.logger.Errorf("an error occurred while pulling blob %s of %s:%s from the source registry: %v", + digest, repository, tag, err) + return err + } + if data != nil { + defer data.Close() + } + if err = t.dstRegistry.PushBlob(digest, size, data); err != nil { + t.logger.Errorf("an error occurred while pushing blob %s of %s:%s to the distination registry: %v", + digest, repository, tag, err) + return err + } + t.logger.Infof("blob %s of %s:%s transferred to the destination registry completed", + digest, repository, tag) + } + + return nil +} + +func (t *Transfer) pushManifest(tag, digest string, manifest distribution.Manifest) error { + if canceled(t.ctx) { + t.logger.Warning(errCanceled.Error()) + return errCanceled + } + + repository := t.repository.name + _, exist, err := t.dstRegistry.ManifestExist(digest) + if err != nil { + t.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest", + repository, tag, err) + } else { + if exist { + t.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing", + repository, tag) + return nil + } + } + + mediaType, data, err := manifest.Payload() + if err != nil { + t.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v", + repository, tag, err) + return err + } + + if _, err = t.dstRegistry.PushManifest(tag, mediaType, data); err != nil { + t.logger.Errorf("an error occurred while pushing manifest of %s:%s to the destination registry: %v", + repository, tag, err) + return err + } + t.logger.Infof("manifest of %s:%s has been pushed to the destination registry", + repository, tag) + + return nil +} + +func canceled(ctx env.JobContext) bool { + _, canceled := ctx.OPCommand() + return canceled +} + +func retry(err error) bool { + if err == nil { + return false + } + _, ok := err.(net.Error) + return ok +} + +func secret() string { + return os.Getenv("JOBSERVICE_SECRET") +} diff --git a/src/jobservice_v2/job/impl/replication/transfer_test.go b/src/jobservice_v2/job/impl/replication/transfer_test.go new file mode 100644 index 000000000..d1197eded --- /dev/null +++ b/src/jobservice_v2/job/impl/replication/transfer_test.go @@ -0,0 +1,25 @@ +package replication + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMaxFailsOfTransfer(t *testing.T) { + r := &Transfer{} + assert.Equal(t, uint(3), r.MaxFails()) +} + +func TestValidateOfTransfer(t *testing.T) { + r := &Transfer{} + require.Nil(t, r.Validate(nil)) +} + +func TestShouldRetryOfTransfer(t *testing.T) { + r := &Transfer{} + assert.False(t, r.ShouldRetry()) + r.retry = true + assert.True(t, r.ShouldRetry()) +} diff --git a/src/jobservice_v2/runtime/bootstrap.go b/src/jobservice_v2/runtime/bootstrap.go index 371239457..81e66d3f9 100644 --- a/src/jobservice_v2/runtime/bootstrap.go +++ b/src/jobservice_v2/runtime/bootstrap.go @@ -159,9 +159,10 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con } if err := redisWorkerPool.RegisterJobs( map[string]interface{}{ - job.ImageScanJob: (*scan.ClairJob)(nil), - job.ImageReplicationTransfer: (*replication.Replicator)(nil), - job.ImageReplicationDelete: (*replication.Deleter)(nil), + job.ImageScanJob: (*scan.ClairJob)(nil), + job.ImageTransfer: (*replication.Transfer)(nil), + job.ImageDelete: (*replication.Deleter)(nil), + job.ImageReplicate: (*replication.Replicator)(nil), }); err != nil { //exit ctx.ErrorChan <- err diff --git a/src/ui/api/replication.go b/src/ui/api/replication.go index 0290949bb..be89fcca1 100644 --- a/src/ui/api/replication.go +++ b/src/ui/api/replication.go @@ -41,7 +41,7 @@ func (r *ReplicationAPI) Prepare() { return } - if !r.SecurityCtx.IsSysAdmin() { + if !r.SecurityCtx.IsSysAdmin() && !r.SecurityCtx.IsSolutionUser() { r.HandleForbidden(r.SecurityCtx.GetUsername()) return }