Merge pull request #4485 from ywk253100/180326_period_job

Create a job to call UI's replication API to do the period replication job
This commit is contained in:
Wenkai Yin 2018-03-26 17:31:10 +08:00 committed by GitHub
commit 38568a1d2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 453 additions and 315 deletions

View File

@ -3,10 +3,10 @@ package job
const ( const (
//ImageScanJob is name of scan job it will be used as key to register to job service. //ImageScanJob is name of scan job it will be used as key to register to job service.
ImageScanJob = "IMAGE_SCAN" ImageScanJob = "IMAGE_SCAN"
// ImageReplicationTransfer : the name of replication transfer job in job service // ImageTransfer : the name of image transfer job in job service
ImageReplicationTransfer = "IMAGE_REPLICATION_TRANSFER" ImageTransfer = "IMAGE_TRANSFER"
// ImageReplicationDelete : the name of replication delete job in job service // ImageDelete : the name of image delete job in job service
ImageReplicationDelete = "IMAGE_REPLICATION_DELETE" ImageDelete = "IMAGE_DELETE"
// ImagePeriodReplication : the name of period replication job in job service // ImageReplicate : the name of image replicate job in job service
ImagePeriodReplication = "IMAGE_PERIOD_REPLICATION" ImageReplicate = "IMAGE_REPLICATE"
) )

View File

@ -4,9 +4,9 @@ import (
"net/http" "net/http"
common_http "github.com/vmware/harbor/src/common/http" common_http "github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/registry/auth" "github.com/vmware/harbor/src/common/utils/registry/auth"
"github.com/vmware/harbor/src/jobservice_v2/env" "github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/logger"
) )
// Deleter deletes repository or images on the destination registry // Deleter deletes repository or images on the destination registry
@ -14,7 +14,7 @@ type Deleter struct {
ctx env.JobContext ctx env.JobContext
repository *repository repository *repository
dstRegistry *registry dstRegistry *registry
logger *log.Logger logger logger.Interface
retry bool retry bool
} }
@ -49,8 +49,7 @@ func (d *Deleter) run(ctx env.JobContext, params map[string]interface{}) error {
} }
func (d *Deleter) init(ctx env.JobContext, params map[string]interface{}) error { func (d *Deleter) init(ctx env.JobContext, params map[string]interface{}) error {
// TODO d.logger = ctx.GetLogger()
d.logger = log.DefaultLogger()
d.ctx = ctx d.ctx = ctx
if canceled(d.ctx) { if canceled(d.ctx) {

View File

@ -0,0 +1,25 @@
package replication
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMaxFailsOfDeleter(t *testing.T) {
d := &Deleter{}
assert.Equal(t, uint(3), d.MaxFails())
}
func TestValidateOfDeleter(t *testing.T) {
d := &Deleter{}
require.Nil(t, d.Validate(nil))
}
func TestShouldRetryOfDeleter(t *testing.T) {
d := &Deleter{}
assert.False(t, d.ShouldRetry())
d.retry = true
assert.True(t, d.ShouldRetry())
}

View File

@ -1,49 +1,36 @@
package replication package replication
import ( import (
"errors"
"fmt" "fmt"
"net"
"net/http" "net/http"
"os"
"strings"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
common_http "github.com/vmware/harbor/src/common/http" common_http "github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/http/modifier"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log"
reg "github.com/vmware/harbor/src/common/utils/registry" reg "github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/common/utils/registry/auth" "github.com/vmware/harbor/src/common/utils/registry/auth"
"github.com/vmware/harbor/src/jobservice_v2/env" "github.com/vmware/harbor/src/jobservice_v2/env"
job_utils "github.com/vmware/harbor/src/jobservice_v2/job/impl/utils" "github.com/vmware/harbor/src/jobservice_v2/logger"
) )
var ( // Replicator call UI's API to start a repliation according to the policy ID
errCanceled = errors.New("the job is canceled") // passed in parameters
)
// Replicator replicates images from source registry to the destination one
type Replicator struct { type Replicator struct {
ctx env.JobContext ctx env.JobContext
repository *repository url string // the URL of UI service
srcRegistry *registry insecure bool
dstRegistry *registry policyID int64
logger *log.Logger client *common_http.Client
retry bool logger logger.Interface
} }
// ShouldRetry : retry if the error is network error // ShouldRetry ...
func (r *Replicator) ShouldRetry() bool { func (r *Replicator) ShouldRetry() bool {
return r.retry return false
} }
// MaxFails ... // MaxFails ...
func (r *Replicator) MaxFails() uint { func (r *Replicator) MaxFails() uint {
return 3 return 0
} }
// Validate .... // Validate ....
@ -53,296 +40,48 @@ func (r *Replicator) Validate(params map[string]interface{}) error {
// Run ... // Run ...
func (r *Replicator) Run(ctx env.JobContext, params map[string]interface{}) error { func (r *Replicator) Run(ctx env.JobContext, params map[string]interface{}) error {
err := r.run(ctx, params)
r.retry = retry(err)
return err
}
func (r *Replicator) run(ctx env.JobContext, params map[string]interface{}) error {
// initialize
if err := r.init(ctx, params); err != nil { if err := r.init(ctx, params); err != nil {
return err return err
} }
// try to create project on destination registry return r.replicate()
if err := r.createProject(); err != nil {
return err
}
// replicate the images
for _, tag := range r.repository.tags {
digest, manifest, err := r.pullManifest(tag)
if err != nil {
return err
}
if err := r.transferLayers(tag, manifest.References()); err != nil {
return err
}
if err := r.pushManifest(tag, digest, manifest); err != nil {
return err
}
}
return nil
} }
func (r *Replicator) init(ctx env.JobContext, params map[string]interface{}) error { func (r *Replicator) init(ctx env.JobContext, params map[string]interface{}) error {
// TODO r.logger = ctx.GetLogger()
r.logger = log.DefaultLogger()
r.ctx = ctx r.ctx = ctx
if canceled(r.ctx) { if canceled(r.ctx) {
r.logger.Warning(errCanceled.Error()) r.logger.Warning(errCanceled.Error())
return errCanceled return errCanceled
} }
// init images that need to be replicated r.policyID = (int64)(params["policy_id"].(float64))
r.repository = &repository{ r.url = params["url"].(string)
name: params["repository"].(string), r.insecure = params["insecure"].(bool)
} cred := auth.NewCookieCredential(&http.Cookie{
if tags, ok := params["tags"]; ok {
tgs := tags.([]interface{})
for _, tg := range tgs {
r.repository.tags = append(r.repository.tags, tg.(string))
}
}
var err error
// init source registry client
srcURL := params["src_registry_url"].(string)
srcInsecure := params["src_registry_insecure"].(bool)
srcCred := auth.NewCookieCredential(&http.Cookie{
Name: models.UISecretCookie, Name: models.UISecretCookie,
Value: os.Getenv("JOBSERVICE_SECRET"), Value: secret(),
}) })
srcTokenServiceURL := ""
if stsu, ok := params["src_token_service_url"]; ok {
srcTokenServiceURL = stsu.(string)
}
if len(srcTokenServiceURL) > 0 { r.client = common_http.NewClient(&http.Client{
r.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, r.repository.name, srcTokenServiceURL) Transport: reg.GetHTTPTransport(r.insecure),
} else { }, cred)
r.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, r.repository.name)
}
if err != nil {
r.logger.Errorf("failed to create client for source registry: %v", err)
return err
}
// init destination registry client r.logger.Infof("initialization completed: policy ID: %d, URL: %s, insecure: %v",
dstURL := params["dst_registry_url"].(string) r.policyID, r.url, r.insecure)
dstInsecure := params["dst_registry_insecure"].(bool)
dstCred := auth.NewBasicAuthCredential(
params["dst_registry_username"].(string),
params["dst_registry_password"].(string))
r.dstRegistry, err = initRegistry(dstURL, dstInsecure, dstCred, r.repository.name)
if err != nil {
r.logger.Errorf("failed to create client for destination registry: %v", err)
return err
}
// get the tag list first if it is null
if len(r.repository.tags) == 0 {
tags, err := r.srcRegistry.ListTag()
if err != nil {
r.logger.Errorf("an error occurred while listing tags for the source repository: %v", err)
return err
}
if len(tags) == 0 {
err = fmt.Errorf("empty tag list for repository %s", r.repository.name)
r.logger.Error(err)
return err
}
r.repository.tags = tags
}
r.logger.Infof("initialization completed: repository: %s, tags: %v, source registry: URL-%s insecure-%v, destination registry: URL-%s insecure-%v",
r.repository.name, r.repository.tags, r.srcRegistry.url, r.srcRegistry.insecure, r.dstRegistry.url, r.dstRegistry.insecure)
return nil return nil
} }
func initRegistry(url string, insecure bool, credential modifier.Modifier, func (r *Replicator) replicate() error {
repository string, tokenServiceURL ...string) (*registry, error) { if err := r.client.Post(fmt.Sprintf("%s/api/replications", r.url), struct {
registry := &registry{ PolicyID int64 `json:"policy_id"`
url: url, }{
insecure: insecure, PolicyID: r.policyID,
} }); err != nil {
r.logger.Errorf("failed to send the replication request to %s: %v", r.url, err)
// use the same transport for clients connecting to docker registry and Harbor UI
transport := reg.GetHTTPTransport(insecure)
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, credential, tokenServiceURL...)
uam := &job_utils.UserAgentModifier{
UserAgent: "harbor-registry-client",
}
repositoryClient, err := reg.NewRepository(repository, url,
&http.Client{
Transport: reg.NewTransport(transport, authorizer, uam),
})
if err != nil {
return nil, err
}
registry.Repository = *repositoryClient
registry.client = common_http.NewClient(
&http.Client{
Transport: transport,
}, credential)
return registry, nil
}
func (r *Replicator) createProject() error {
if canceled(r.ctx) {
r.logger.Warning(errCanceled.Error())
return errCanceled
}
p, _ := utils.ParseRepository(r.repository.name)
project, err := r.srcRegistry.GetProject(p)
if err != nil {
r.logger.Errorf("failed to get project %s from source registry: %v", p, err)
return err return err
} }
r.logger.Infof("the replication request has been sent to %s successfully", r.url)
if err = r.dstRegistry.CreateProject(project); err != nil {
// other jobs may be also doing the same thing when the current job
// is creating project or the project has already exist, so when the
// response code is 409, continue to do next step
if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusConflict {
r.logger.Warningf("the status code is 409 when creating project %s on destination registry, try to do next step", p)
return nil
}
r.logger.Errorf("an error occurred while creating project %s on destination registry: %v", p, err)
return err
}
r.logger.Infof("project %s is created on destination registry", p)
return nil return nil
}
func (r *Replicator) pullManifest(tag string) (string, distribution.Manifest, error) {
if canceled(r.ctx) {
r.logger.Warning(errCanceled.Error())
return "", nil, errCanceled
}
acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest}
digest, mediaType, payload, err := r.srcRegistry.PullManifest(tag, acceptMediaTypes)
if err != nil {
r.logger.Errorf("an error occurred while pulling manifest of %s:%s from source registry: %v",
r.repository.name, tag, err)
return "", nil, err
}
r.logger.Infof("manifest of %s:%s pulled successfully from source registry: %s",
r.repository.name, tag, digest)
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := reg.UnMarshal(mediaType, payload)
if err != nil {
r.logger.Errorf("an error occurred while parsing manifest: %v", err)
return "", nil, err
}
return digest, manifest, nil
}
func (r *Replicator) transferLayers(tag string, blobs []distribution.Descriptor) error {
repository := r.repository.name
// all blobs(layers and config)
for _, blob := range blobs {
if canceled(r.ctx) {
r.logger.Warning(errCanceled.Error())
return errCanceled
}
digest := blob.Digest.String()
exist, err := r.dstRegistry.BlobExist(digest)
if err != nil {
r.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on destination registry: %v",
digest, repository, tag, err)
return err
}
if exist {
r.logger.Infof("blob %s of %s:%s already exists on the destination registry, skip",
digest, repository, tag)
continue
}
r.logger.Infof("transferring blob %s of %s:%s to the destination registry ...",
digest, repository, tag)
size, data, err := r.srcRegistry.PullBlob(digest)
if err != nil {
r.logger.Errorf("an error occurred while pulling blob %s of %s:%s from the source registry: %v",
digest, repository, tag, err)
return err
}
if data != nil {
defer data.Close()
}
if err = r.dstRegistry.PushBlob(digest, size, data); err != nil {
r.logger.Errorf("an error occurred while pushing blob %s of %s:%s to the distination registry: %v",
digest, repository, tag, err)
return err
}
r.logger.Infof("blob %s of %s:%s transferred to the destination registry completed",
digest, repository, tag)
}
return nil
}
func (r *Replicator) pushManifest(tag, digest string, manifest distribution.Manifest) error {
if canceled(r.ctx) {
r.logger.Warning(errCanceled.Error())
return errCanceled
}
repository := r.repository.name
_, exist, err := r.dstRegistry.ManifestExist(digest)
if err != nil {
r.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest",
repository, tag, err)
} else {
if exist {
r.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing",
repository, tag)
return nil
}
}
mediaType, data, err := manifest.Payload()
if err != nil {
r.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v",
repository, tag, err)
return err
}
if _, err = r.dstRegistry.PushManifest(tag, mediaType, data); err != nil {
r.logger.Errorf("an error occurred while pushing manifest of %s:%s to the destination registry: %v",
repository, tag, err)
return err
}
r.logger.Infof("manifest of %s:%s has been pushed to the destination registry",
repository, tag)
return nil
}
func canceled(ctx env.JobContext) bool {
_, canceled := ctx.OPCommand()
return canceled
}
func retry(err error) bool {
if err == nil {
return false
}
_, ok := err.(net.Error)
return ok
} }

View File

@ -7,19 +7,17 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestMaxFails(t *testing.T) { func TestMaxFailsOfReplicator(t *testing.T) {
r := &Replicator{} r := &Replicator{}
assert.Equal(t, uint(3), r.MaxFails()) assert.Equal(t, uint(0), r.MaxFails())
} }
func TestValidate(t *testing.T) { func TestValidateOfReplicator(t *testing.T) {
r := &Replicator{} r := &Replicator{}
require.Nil(t, r.Validate(nil)) require.Nil(t, r.Validate(nil))
} }
func TestShouldRetry(t *testing.T) { func TestShouldRetryOfReplicator(t *testing.T) {
r := &Replicator{} r := &Replicator{}
assert.False(t, r.retry) assert.False(t, r.ShouldRetry())
r.retry = true
assert.True(t, r.retry)
} }

View File

@ -0,0 +1,351 @@
package replication
import (
"errors"
"fmt"
"net"
"net/http"
"os"
"strings"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
common_http "github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/http/modifier"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils"
reg "github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/common/utils/registry/auth"
"github.com/vmware/harbor/src/jobservice_v2/env"
job_utils "github.com/vmware/harbor/src/jobservice_v2/job/impl/utils"
"github.com/vmware/harbor/src/jobservice_v2/logger"
)
var (
errCanceled = errors.New("the job is canceled")
)
// Transfer images from source registry to the destination one
type Transfer struct {
ctx env.JobContext
repository *repository
srcRegistry *registry
dstRegistry *registry
logger logger.Interface
retry bool
}
// ShouldRetry : retry if the error is network error
func (t *Transfer) ShouldRetry() bool {
return t.retry
}
// MaxFails ...
func (t *Transfer) MaxFails() uint {
return 3
}
// Validate ....
func (t *Transfer) Validate(params map[string]interface{}) error {
return nil
}
// Run ...
func (t *Transfer) Run(ctx env.JobContext, params map[string]interface{}) error {
err := t.run(ctx, params)
t.retry = retry(err)
return err
}
func (t *Transfer) run(ctx env.JobContext, params map[string]interface{}) error {
// initialize
if err := t.init(ctx, params); err != nil {
return err
}
// try to create project on destination registry
if err := t.createProject(); err != nil {
return err
}
// replicate the images
for _, tag := range t.repository.tags {
digest, manifest, err := t.pullManifest(tag)
if err != nil {
return err
}
if err := t.transferLayers(tag, manifest.References()); err != nil {
return err
}
if err := t.pushManifest(tag, digest, manifest); err != nil {
return err
}
}
return nil
}
func (t *Transfer) init(ctx env.JobContext, params map[string]interface{}) error {
t.logger = ctx.GetLogger()
t.ctx = ctx
if canceled(t.ctx) {
t.logger.Warning(errCanceled.Error())
return errCanceled
}
// init images that need to be replicated
t.repository = &repository{
name: params["repository"].(string),
}
if tags, ok := params["tags"]; ok {
tgs := tags.([]interface{})
for _, tg := range tgs {
t.repository.tags = append(t.repository.tags, tg.(string))
}
}
var err error
// init source registry client
srcURL := params["src_registry_url"].(string)
srcInsecure := params["src_registry_insecure"].(bool)
srcCred := auth.NewCookieCredential(&http.Cookie{
Name: models.UISecretCookie,
Value: secret(),
})
srcTokenServiceURL := ""
if stsu, ok := params["src_token_service_url"]; ok {
srcTokenServiceURL = stsu.(string)
}
if len(srcTokenServiceURL) > 0 {
t.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, t.repository.name, srcTokenServiceURL)
} else {
t.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, t.repository.name)
}
if err != nil {
t.logger.Errorf("failed to create client for source registry: %v", err)
return err
}
// init destination registry client
dstURL := params["dst_registry_url"].(string)
dstInsecure := params["dst_registry_insecure"].(bool)
dstCred := auth.NewBasicAuthCredential(
params["dst_registry_username"].(string),
params["dst_registry_password"].(string))
t.dstRegistry, err = initRegistry(dstURL, dstInsecure, dstCred, t.repository.name)
if err != nil {
t.logger.Errorf("failed to create client for destination registry: %v", err)
return err
}
// get the tag list first if it is null
if len(t.repository.tags) == 0 {
tags, err := t.srcRegistry.ListTag()
if err != nil {
t.logger.Errorf("an error occurred while listing tags for the source repository: %v", err)
return err
}
if len(tags) == 0 {
err = fmt.Errorf("empty tag list for repository %s", t.repository.name)
t.logger.Error(err)
return err
}
t.repository.tags = tags
}
t.logger.Infof("initialization completed: repository: %s, tags: %v, source registry: URL-%s insecure-%v, destination registry: URL-%s insecure-%v",
t.repository.name, t.repository.tags, t.srcRegistry.url, t.srcRegistry.insecure, t.dstRegistry.url, t.dstRegistry.insecure)
return nil
}
func initRegistry(url string, insecure bool, credential modifier.Modifier,
repository string, tokenServiceURL ...string) (*registry, error) {
registry := &registry{
url: url,
insecure: insecure,
}
// use the same transport for clients connecting to docker registry and Harbor UI
transport := reg.GetHTTPTransport(insecure)
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, credential, tokenServiceURL...)
uam := &job_utils.UserAgentModifier{
UserAgent: "harbor-registry-client",
}
repositoryClient, err := reg.NewRepository(repository, url,
&http.Client{
Transport: reg.NewTransport(transport, authorizer, uam),
})
if err != nil {
return nil, err
}
registry.Repository = *repositoryClient
registry.client = common_http.NewClient(
&http.Client{
Transport: transport,
}, credential)
return registry, nil
}
func (t *Transfer) createProject() error {
if canceled(t.ctx) {
t.logger.Warning(errCanceled.Error())
return errCanceled
}
p, _ := utils.ParseRepository(t.repository.name)
project, err := t.srcRegistry.GetProject(p)
if err != nil {
t.logger.Errorf("failed to get project %s from source registry: %v", p, err)
return err
}
if err = t.dstRegistry.CreateProject(project); err != nil {
// other jobs may be also doing the same thing when the current job
// is creating project or the project has already exist, so when the
// response code is 409, continue to do next step
if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusConflict {
t.logger.Warningf("the status code is 409 when creating project %s on destination registry, try to do next step", p)
return nil
}
t.logger.Errorf("an error occurred while creating project %s on destination registry: %v", p, err)
return err
}
t.logger.Infof("project %s is created on destination registry", p)
return nil
}
func (t *Transfer) pullManifest(tag string) (string, distribution.Manifest, error) {
if canceled(t.ctx) {
t.logger.Warning(errCanceled.Error())
return "", nil, errCanceled
}
acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest}
digest, mediaType, payload, err := t.srcRegistry.PullManifest(tag, acceptMediaTypes)
if err != nil {
t.logger.Errorf("an error occurred while pulling manifest of %s:%s from source registry: %v",
t.repository.name, tag, err)
return "", nil, err
}
t.logger.Infof("manifest of %s:%s pulled successfully from source registry: %s",
t.repository.name, tag, digest)
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := reg.UnMarshal(mediaType, payload)
if err != nil {
t.logger.Errorf("an error occurred while parsing manifest: %v", err)
return "", nil, err
}
return digest, manifest, nil
}
func (t *Transfer) transferLayers(tag string, blobs []distribution.Descriptor) error {
repository := t.repository.name
// all blobs(layers and config)
for _, blob := range blobs {
if canceled(t.ctx) {
t.logger.Warning(errCanceled.Error())
return errCanceled
}
digest := blob.Digest.String()
exist, err := t.dstRegistry.BlobExist(digest)
if err != nil {
t.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on destination registry: %v",
digest, repository, tag, err)
return err
}
if exist {
t.logger.Infof("blob %s of %s:%s already exists on the destination registry, skip",
digest, repository, tag)
continue
}
t.logger.Infof("transferring blob %s of %s:%s to the destination registry ...",
digest, repository, tag)
size, data, err := t.srcRegistry.PullBlob(digest)
if err != nil {
t.logger.Errorf("an error occurred while pulling blob %s of %s:%s from the source registry: %v",
digest, repository, tag, err)
return err
}
if data != nil {
defer data.Close()
}
if err = t.dstRegistry.PushBlob(digest, size, data); err != nil {
t.logger.Errorf("an error occurred while pushing blob %s of %s:%s to the distination registry: %v",
digest, repository, tag, err)
return err
}
t.logger.Infof("blob %s of %s:%s transferred to the destination registry completed",
digest, repository, tag)
}
return nil
}
func (t *Transfer) pushManifest(tag, digest string, manifest distribution.Manifest) error {
if canceled(t.ctx) {
t.logger.Warning(errCanceled.Error())
return errCanceled
}
repository := t.repository.name
_, exist, err := t.dstRegistry.ManifestExist(digest)
if err != nil {
t.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest",
repository, tag, err)
} else {
if exist {
t.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing",
repository, tag)
return nil
}
}
mediaType, data, err := manifest.Payload()
if err != nil {
t.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v",
repository, tag, err)
return err
}
if _, err = t.dstRegistry.PushManifest(tag, mediaType, data); err != nil {
t.logger.Errorf("an error occurred while pushing manifest of %s:%s to the destination registry: %v",
repository, tag, err)
return err
}
t.logger.Infof("manifest of %s:%s has been pushed to the destination registry",
repository, tag)
return nil
}
func canceled(ctx env.JobContext) bool {
_, canceled := ctx.OPCommand()
return canceled
}
func retry(err error) bool {
if err == nil {
return false
}
_, ok := err.(net.Error)
return ok
}
func secret() string {
return os.Getenv("JOBSERVICE_SECRET")
}

View File

@ -0,0 +1,25 @@
package replication
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMaxFailsOfTransfer(t *testing.T) {
r := &Transfer{}
assert.Equal(t, uint(3), r.MaxFails())
}
func TestValidateOfTransfer(t *testing.T) {
r := &Transfer{}
require.Nil(t, r.Validate(nil))
}
func TestShouldRetryOfTransfer(t *testing.T) {
r := &Transfer{}
assert.False(t, r.ShouldRetry())
r.retry = true
assert.True(t, r.ShouldRetry())
}

View File

@ -172,9 +172,10 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
} }
if err := redisWorkerPool.RegisterJobs( if err := redisWorkerPool.RegisterJobs(
map[string]interface{}{ map[string]interface{}{
job.ImageScanJob: (*scan.ClairJob)(nil), job.ImageScanJob: (*scan.ClairJob)(nil),
job.ImageReplicationTransfer: (*replication.Replicator)(nil), job.ImageTransfer: (*replication.Transfer)(nil),
job.ImageReplicationDelete: (*replication.Deleter)(nil), job.ImageDelete: (*replication.Deleter)(nil),
job.ImageReplicate: (*replication.Replicator)(nil),
}); err != nil { }); err != nil {
//exit //exit
ctx.ErrorChan <- err ctx.ErrorChan <- err

View File

@ -41,7 +41,7 @@ func (r *ReplicationAPI) Prepare() {
return return
} }
if !r.SecurityCtx.IsSysAdmin() { if !r.SecurityCtx.IsSysAdmin() && !r.SecurityCtx.IsSolutionUser() {
r.HandleForbidden(r.SecurityCtx.GetUsername()) r.HandleForbidden(r.SecurityCtx.GetUsername())
return return
} }