Merge pull request #7583 from ywk253100/190428_replication

Don't fail the process when one tag replication failed
This commit is contained in:
Wenkai Yin 2019-04-29 14:16:07 +08:00 committed by GitHub
commit da338806fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 100 additions and 105 deletions

View File

@ -66,21 +66,13 @@ func newAdapter(registry *model.Registry) (*adapter, error) {
modifiers = append(modifiers, authorizer)
}
// The registry URL and core service URL are different when the adapter
// is created for a local Harbor. If the "registry.CoreURL" is null, the
// registry URL will be used as the coreServiceURL instead
url := registry.URL
if len(registry.CoreURL) > 0 {
url = registry.CoreURL
}
reg, err := adp.NewDefaultImageRegistry(registry)
if err != nil {
return nil, err
}
return &adapter{
registry: registry,
coreServiceURL: url,
coreServiceURL: registry.URL,
client: common_http.NewClient(
&http.Client{
Transport: transport,

View File

@ -16,7 +16,6 @@ package adapter
import (
"errors"
"fmt"
"io"
"net/http"
"strings"
@ -72,15 +71,9 @@ func NewDefaultImageRegistry(registry *model.Registry) (*DefaultImageRegistry, e
registry.Credential.AccessKey,
registry.Credential.AccessSecret)
}
tokenServiceURL := ""
// the registry is a local Harbor instance if the core URL is specified,
// use the internal token service URL instead
if len(registry.CoreURL) > 0 {
tokenServiceURL = fmt.Sprintf("%s/service/token", registry.CoreURL)
}
authorizer = auth.NewStandardTokenAuthorizer(&http.Client{
Transport: util.GetHTTPTransport(registry.Insecure),
}, cred, tokenServiceURL)
}, cred, registry.TokenServiceURL)
}
return NewDefaultImageRegistryWithCustomizedAuthorizer(registry, authorizer)
}

View File

@ -22,7 +22,6 @@ var (
// Configuration holds the configuration information for replication
type Configuration struct {
CoreURL string
RegistryURL string
TokenServiceURL string
JobserviceURL string
SecretKey string

View File

@ -234,7 +234,7 @@ func UpdateExecution(execution *models.Execution, props ...string) (int64, error
func AddTask(task *models.Task) (int64, error) {
o := dao.GetOrmer()
now := time.Now()
task.StartTime = now
task.StartTime = &now
return o.Insert(task)
}

View File

@ -91,6 +91,7 @@ func TestMethodOfExecution(t *testing.T) {
}
func TestMethodOfTask(t *testing.T) {
now := time.Now()
task1 := &models.Task{
ExecutionID: 112200,
ResourceType: "resourceType1",
@ -98,7 +99,7 @@ func TestMethodOfTask(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Initialized",
StartTime: time.Now(),
StartTime: &now,
}
task2 := &models.Task{
ExecutionID: 112200,
@ -107,8 +108,8 @@ func TestMethodOfTask(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StartTime: time.Now(),
EndTime: time.Now(),
StartTime: &now,
EndTime: &now,
}
// test add
@ -143,7 +144,7 @@ func TestMethodOfTask(t *testing.T) {
taskNew := &models.Task{
ID: id1,
Status: "Failed",
EndTime: time.Now(),
EndTime: &now,
}
n, err := UpdateTask(taskNew, models.TaskPropsName.Status, models.TaskPropsName.EndTime)
require.Nil(t, err)
@ -171,6 +172,7 @@ func TestMethodOfTask(t *testing.T) {
}
func TestExecutionFill(t *testing.T) {
now := time.Now()
execution := &models.Execution{
PolicyID: 11209,
Status: "InProgress",
@ -190,8 +192,8 @@ func TestExecutionFill(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Succeed",
StartTime: time.Now(),
EndTime: et1,
StartTime: &now,
EndTime: &et1,
}
task2 := &models.Task{
ID: 20192,
@ -201,8 +203,8 @@ func TestExecutionFill(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StartTime: time.Now(),
EndTime: et2,
StartTime: &now,
EndTime: &et2,
}
AddTask(task1)
AddTask(task2)
@ -224,6 +226,7 @@ func TestExecutionFill(t *testing.T) {
}
func TestExecutionFill2(t *testing.T) {
now := time.Now()
execution := &models.Execution{
PolicyID: 11209,
Status: "InProgress",
@ -241,7 +244,7 @@ func TestExecutionFill2(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: models.TaskStatusInProgress,
StartTime: time.Now(),
StartTime: &now,
}
task2 := &models.Task{
ID: 20192,
@ -251,8 +254,8 @@ func TestExecutionFill2(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StartTime: time.Now(),
EndTime: time.Now(),
StartTime: &now,
EndTime: &now,
}
taskID1, _ := AddTask(task1)
AddTask(task2)

View File

@ -109,16 +109,16 @@ type TaskFieldsName struct {
// Task represent the tasks in one execution.
type Task struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`
ResourceType string `orm:"column(resource_type)" json:"resource_type"`
SrcResource string `orm:"column(src_resource)" json:"src_resource"`
DstResource string `orm:"column(dst_resource)" json:"dst_resource"`
Operation string `orm:"column(operation)" json:"operation"`
JobID string `orm:"column(job_id)" json:"job_id"`
Status string `orm:"column(status)" json:"status"`
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
EndTime time.Time `orm:"column(end_time)" json:"end_time"`
ID int64 `orm:"pk;auto;column(id)" json:"id"`
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`
ResourceType string `orm:"column(resource_type)" json:"resource_type"`
SrcResource string `orm:"column(src_resource)" json:"src_resource"`
DstResource string `orm:"column(dst_resource)" json:"dst_resource"`
Operation string `orm:"column(operation)" json:"operation"`
JobID string `orm:"column(job_id)" json:"job_id"`
Status string `orm:"column(status)" json:"status"`
StartTime *time.Time `orm:"column(start_time)" json:"start_time"`
EndTime *time.Time `orm:"column(end_time)" json:"end_time,omitempty"`
}
// TableName is required by by beego orm to map Execution to table replication_execution

View File

@ -181,11 +181,11 @@ func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model
// GetLocalRegistry returns the info of the local Harbor registry
func GetLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
CoreURL: config.Config.CoreURL,
Status: "healthy",
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.CoreURL,
TokenServiceURL: config.Config.TokenServiceURL,
Status: "healthy",
Credential: &model.Credential{
Type: model.CredentialTypeSecret,
// use secret to do the auth for the local Harbor

View File

@ -81,14 +81,14 @@ type Registry struct {
Description string `json:"description"`
Type RegistryType `json:"type"`
URL string `json:"url"`
// CoreURL is only used for local harbor instance to
// avoid the requests passing through the external proxy
CoreURL string `json:"core_url"`
Credential *Credential `json:"credential"`
Insecure bool `json:"insecure"`
Status string `json:"status"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
// TokenServiceURL is only used for local harbor instance to
// avoid the requests passing through the external proxy for now
TokenServiceURL string `json:"token_service_url"`
Credential *Credential `json:"credential"`
Insecure bool `json:"insecure"`
Status string `json:"status"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
}
// RegistryQuery defines the query conditions for listing registries

View File

@ -91,6 +91,7 @@ func TestMethodOfExecutionManager(t *testing.T) {
}
func TestMethodOfTaskManager(t *testing.T) {
now := time.Now()
task := &models.Task{
ExecutionID: 112200,
ResourceType: "resourceType1",
@ -98,7 +99,7 @@ func TestMethodOfTaskManager(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Initialized",
StartTime: time.Now(),
StartTime: &now,
}
defer func() {

View File

@ -292,10 +292,11 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil {
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
}
now := time.Now()
if err = executionMgr.UpdateTask(&models.Task{
ID: result.TaskID,
JobID: result.JobID,
StartTime: time.Now(),
StartTime: &now,
}, "JobID", "StartTime"); err != nil {
log.Errorf("failed to update the task %d: %v", result.TaskID, err)
}

View File

@ -192,7 +192,7 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
func TestMain(m *testing.M) {
url := "https://registry.harbor.local"
config.Config = &config.Configuration{
RegistryURL: url,
CoreURL: url,
}
if err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory); err != nil {
os.Exit(1)

View File

@ -51,17 +51,12 @@ var (
// Init the global variables and configurations
func Init(closing chan struct{}) error {
// init config
registryURL, err := cfg.RegistryURL()
if err != nil {
return err
}
secretKey, err := cfg.SecretKey()
if err != nil {
return err
}
config.Config = &config.Configuration{
CoreURL: cfg.InternalCoreURL(),
RegistryURL: registryURL,
TokenServiceURL: cfg.InternalTokenServiceEndpoint(),
JobserviceURL: cfg.InternalJobServiceURL(),
SecretKey: secretKey,

View File

@ -134,58 +134,69 @@ func (t *transfer) copy(src *repository, dst *repository, override bool) error {
dstRepo := dst.repository
t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
var err error
for i := range src.tags {
srcTag := src.tags[i]
dstTag := dst.tags[i]
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
srcRepo, srcTag, dstRepo, dstTag)
// pull the manifest from the source registry
manifest, digest, err := t.pullManifest(srcRepo, srcTag)
if err != nil {
return err
if e := t.copyTag(srcRepo, src.tags[i], dstRepo, dst.tags[i], override); e != nil {
t.logger.Errorf(e.Error())
err = e
}
// check the existence of the image on the destination registry
exist, digest2, err := t.exist(dstRepo, dstTag)
if err != nil {
return err
}
if exist {
// the same image already exists
if digest == digest2 {
t.logger.Infof("the image %s:%s already exists on the destination registry, skip",
dstRepo, dstTag)
continue
}
// the same name image exists, but not allowed to override
if !override {
t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip",
dstRepo, dstTag)
continue
}
// the same name image exists, but allowed to override
t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...",
dstRepo, dstTag)
}
// copy blobs between the source and destination registries
if err = t.copyBlobs(manifest.References(), srcRepo, dstRepo); err != nil {
return err
}
// push the manifest to the destination registry
if err := t.pushManifest(manifest, dstRepo, dstTag); err != nil {
return err
}
t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed",
srcRepo, srcTag, dstRepo, dstTag)
}
if err != nil {
return err
}
t.logger.Infof("copy %s:[%s](source registry) to %s:[%s](destination registry) completed",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
return nil
}
func (t *transfer) copyTag(srcRepo, srcTag, dstRepo, dstTag string, override bool) error {
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
srcRepo, srcTag, dstRepo, dstTag)
// pull the manifest from the source registry
manifest, digest, err := t.pullManifest(srcRepo, srcTag)
if err != nil {
return err
}
// check the existence of the image on the destination registry
exist, digest2, err := t.exist(dstRepo, dstTag)
if err != nil {
return err
}
if exist {
// the same image already exists
if digest == digest2 {
t.logger.Infof("the image %s:%s already exists on the destination registry, skip",
dstRepo, dstTag)
return nil
}
// the same name image exists, but not allowed to override
if !override {
t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip",
dstRepo, dstTag)
return nil
}
// the same name image exists, but allowed to override
t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...",
dstRepo, dstTag)
}
// copy blobs between the source and destination registries
if err = t.copyBlobs(manifest.References(), srcRepo, dstRepo); err != nil {
return err
}
// push the manifest to the destination registry
if err := t.pushManifest(manifest, dstRepo, dstTag); err != nil {
return err
}
t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed",
srcRepo, srcTag, dstRepo, dstTag)
return nil
}
func (t *transfer) pullManifest(repository, tag string) (
distribution.Manifest, string, error) {
if t.shouldStop() {