diff --git a/src/replication/adapter/harbor/adapter.go b/src/replication/adapter/harbor/adapter.go index 7c604fa6b..6e8b54c79 100644 --- a/src/replication/adapter/harbor/adapter.go +++ b/src/replication/adapter/harbor/adapter.go @@ -66,21 +66,13 @@ func newAdapter(registry *model.Registry) (*adapter, error) { modifiers = append(modifiers, authorizer) } - // The registry URL and core service URL are different when the adapter - // is created for a local Harbor. If the "registry.CoreURL" is null, the - // registry URL will be used as the coreServiceURL instead - url := registry.URL - if len(registry.CoreURL) > 0 { - url = registry.CoreURL - } - reg, err := adp.NewDefaultImageRegistry(registry) if err != nil { return nil, err } return &adapter{ registry: registry, - coreServiceURL: url, + coreServiceURL: registry.URL, client: common_http.NewClient( &http.Client{ Transport: transport, diff --git a/src/replication/adapter/image_registry.go b/src/replication/adapter/image_registry.go index 4529cc9c0..fa4122fdc 100644 --- a/src/replication/adapter/image_registry.go +++ b/src/replication/adapter/image_registry.go @@ -16,7 +16,6 @@ package adapter import ( "errors" - "fmt" "io" "net/http" "strings" @@ -72,15 +71,9 @@ func NewDefaultImageRegistry(registry *model.Registry) (*DefaultImageRegistry, e registry.Credential.AccessKey, registry.Credential.AccessSecret) } - tokenServiceURL := "" - // the registry is a local Harbor instance if the core URL is specified, - // use the internal token service URL instead - if len(registry.CoreURL) > 0 { - tokenServiceURL = fmt.Sprintf("%s/service/token", registry.CoreURL) - } authorizer = auth.NewStandardTokenAuthorizer(&http.Client{ Transport: util.GetHTTPTransport(registry.Insecure), - }, cred, tokenServiceURL) + }, cred, registry.TokenServiceURL) } return NewDefaultImageRegistryWithCustomizedAuthorizer(registry, authorizer) } diff --git a/src/replication/config/config.go b/src/replication/config/config.go index 27169a812..4423a68e9 100644 --- a/src/replication/config/config.go +++ b/src/replication/config/config.go @@ -22,7 +22,6 @@ var ( // Configuration holds the configuration information for replication type Configuration struct { CoreURL string - RegistryURL string TokenServiceURL string JobserviceURL string SecretKey string diff --git a/src/replication/dao/execution.go b/src/replication/dao/execution.go index c9af37754..1825d6cb2 100644 --- a/src/replication/dao/execution.go +++ b/src/replication/dao/execution.go @@ -234,7 +234,7 @@ func UpdateExecution(execution *models.Execution, props ...string) (int64, error func AddTask(task *models.Task) (int64, error) { o := dao.GetOrmer() now := time.Now() - task.StartTime = now + task.StartTime = &now return o.Insert(task) } diff --git a/src/replication/dao/execution_test.go b/src/replication/dao/execution_test.go index df492d1d8..faa16e1cc 100644 --- a/src/replication/dao/execution_test.go +++ b/src/replication/dao/execution_test.go @@ -91,6 +91,7 @@ func TestMethodOfExecution(t *testing.T) { } func TestMethodOfTask(t *testing.T) { + now := time.Now() task1 := &models.Task{ ExecutionID: 112200, ResourceType: "resourceType1", @@ -98,7 +99,7 @@ func TestMethodOfTask(t *testing.T) { DstResource: "dstResource1", JobID: "jobID1", Status: "Initialized", - StartTime: time.Now(), + StartTime: &now, } task2 := &models.Task{ ExecutionID: 112200, @@ -107,8 +108,8 @@ func TestMethodOfTask(t *testing.T) { DstResource: "dstResource2", JobID: "jobID2", Status: "Stopped", - StartTime: time.Now(), - EndTime: time.Now(), + StartTime: &now, + EndTime: &now, } // test add @@ -143,7 +144,7 @@ func TestMethodOfTask(t *testing.T) { taskNew := &models.Task{ ID: id1, Status: "Failed", - EndTime: time.Now(), + EndTime: &now, } n, err := UpdateTask(taskNew, models.TaskPropsName.Status, models.TaskPropsName.EndTime) require.Nil(t, err) @@ -171,6 +172,7 @@ func TestMethodOfTask(t *testing.T) { } func TestExecutionFill(t *testing.T) { + now := time.Now() execution := &models.Execution{ PolicyID: 11209, Status: "InProgress", @@ -190,8 +192,8 @@ func TestExecutionFill(t *testing.T) { DstResource: "dstResource1", JobID: "jobID1", Status: "Succeed", - StartTime: time.Now(), - EndTime: et1, + StartTime: &now, + EndTime: &et1, } task2 := &models.Task{ ID: 20192, @@ -201,8 +203,8 @@ func TestExecutionFill(t *testing.T) { DstResource: "dstResource2", JobID: "jobID2", Status: "Stopped", - StartTime: time.Now(), - EndTime: et2, + StartTime: &now, + EndTime: &et2, } AddTask(task1) AddTask(task2) @@ -224,6 +226,7 @@ func TestExecutionFill(t *testing.T) { } func TestExecutionFill2(t *testing.T) { + now := time.Now() execution := &models.Execution{ PolicyID: 11209, Status: "InProgress", @@ -241,7 +244,7 @@ func TestExecutionFill2(t *testing.T) { DstResource: "dstResource1", JobID: "jobID1", Status: models.TaskStatusInProgress, - StartTime: time.Now(), + StartTime: &now, } task2 := &models.Task{ ID: 20192, @@ -251,8 +254,8 @@ func TestExecutionFill2(t *testing.T) { DstResource: "dstResource2", JobID: "jobID2", Status: "Stopped", - StartTime: time.Now(), - EndTime: time.Now(), + StartTime: &now, + EndTime: &now, } taskID1, _ := AddTask(task1) AddTask(task2) diff --git a/src/replication/dao/models/execution.go b/src/replication/dao/models/execution.go index a32ae49c9..eb9a06057 100644 --- a/src/replication/dao/models/execution.go +++ b/src/replication/dao/models/execution.go @@ -109,16 +109,16 @@ type TaskFieldsName struct { // Task represent the tasks in one execution. type Task struct { - ID int64 `orm:"pk;auto;column(id)" json:"id"` - ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` - ResourceType string `orm:"column(resource_type)" json:"resource_type"` - SrcResource string `orm:"column(src_resource)" json:"src_resource"` - DstResource string `orm:"column(dst_resource)" json:"dst_resource"` - Operation string `orm:"column(operation)" json:"operation"` - JobID string `orm:"column(job_id)" json:"job_id"` - Status string `orm:"column(status)" json:"status"` - StartTime time.Time `orm:"column(start_time)" json:"start_time"` - EndTime time.Time `orm:"column(end_time)" json:"end_time"` + ID int64 `orm:"pk;auto;column(id)" json:"id"` + ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` + ResourceType string `orm:"column(resource_type)" json:"resource_type"` + SrcResource string `orm:"column(src_resource)" json:"src_resource"` + DstResource string `orm:"column(dst_resource)" json:"dst_resource"` + Operation string `orm:"column(operation)" json:"operation"` + JobID string `orm:"column(job_id)" json:"job_id"` + Status string `orm:"column(status)" json:"status"` + StartTime *time.Time `orm:"column(start_time)" json:"start_time"` + EndTime *time.Time `orm:"column(end_time)" json:"end_time,omitempty"` } // TableName is required by by beego orm to map Execution to table replication_execution diff --git a/src/replication/event/handler.go b/src/replication/event/handler.go index 96a7dc509..94b0dfe01 100644 --- a/src/replication/event/handler.go +++ b/src/replication/event/handler.go @@ -181,11 +181,11 @@ func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model // GetLocalRegistry returns the info of the local Harbor registry func GetLocalRegistry() *model.Registry { return &model.Registry{ - Type: model.RegistryTypeHarbor, - Name: "Local", - URL: config.Config.RegistryURL, - CoreURL: config.Config.CoreURL, - Status: "healthy", + Type: model.RegistryTypeHarbor, + Name: "Local", + URL: config.Config.CoreURL, + TokenServiceURL: config.Config.TokenServiceURL, + Status: "healthy", Credential: &model.Credential{ Type: model.CredentialTypeSecret, // use secret to do the auth for the local Harbor diff --git a/src/replication/model/registry.go b/src/replication/model/registry.go index a32d2bfd7..9dabda09a 100644 --- a/src/replication/model/registry.go +++ b/src/replication/model/registry.go @@ -81,14 +81,14 @@ type Registry struct { Description string `json:"description"` Type RegistryType `json:"type"` URL string `json:"url"` - // CoreURL is only used for local harbor instance to - // avoid the requests passing through the external proxy - CoreURL string `json:"core_url"` - Credential *Credential `json:"credential"` - Insecure bool `json:"insecure"` - Status string `json:"status"` - CreationTime time.Time `json:"creation_time"` - UpdateTime time.Time `json:"update_time"` + // TokenServiceURL is only used for local harbor instance to + // avoid the requests passing through the external proxy for now + TokenServiceURL string `json:"token_service_url"` + Credential *Credential `json:"credential"` + Insecure bool `json:"insecure"` + Status string `json:"status"` + CreationTime time.Time `json:"creation_time"` + UpdateTime time.Time `json:"update_time"` } // RegistryQuery defines the query conditions for listing registries diff --git a/src/replication/operation/execution/execution_test.go b/src/replication/operation/execution/execution_test.go index c26512e7a..9805754e6 100644 --- a/src/replication/operation/execution/execution_test.go +++ b/src/replication/operation/execution/execution_test.go @@ -91,6 +91,7 @@ func TestMethodOfExecutionManager(t *testing.T) { } func TestMethodOfTaskManager(t *testing.T) { + now := time.Now() task := &models.Task{ ExecutionID: 112200, ResourceType: "resourceType1", @@ -98,7 +99,7 @@ func TestMethodOfTaskManager(t *testing.T) { DstResource: "dstResource1", JobID: "jobID1", Status: "Initialized", - StartTime: time.Now(), + StartTime: &now, } defer func() { diff --git a/src/replication/operation/flow/stage.go b/src/replication/operation/flow/stage.go index 0451961d1..f3cfef416 100644 --- a/src/replication/operation/flow/stage.go +++ b/src/replication/operation/flow/stage.go @@ -292,10 +292,11 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil { log.Errorf("failed to update the task status %d: %v", result.TaskID, err) } + now := time.Now() if err = executionMgr.UpdateTask(&models.Task{ ID: result.TaskID, JobID: result.JobID, - StartTime: time.Now(), + StartTime: &now, }, "JobID", "StartTime"); err != nil { log.Errorf("failed to update the task %d: %v", result.TaskID, err) } diff --git a/src/replication/operation/flow/stage_test.go b/src/replication/operation/flow/stage_test.go index 40aa8fbea..303f6cc10 100644 --- a/src/replication/operation/flow/stage_test.go +++ b/src/replication/operation/flow/stage_test.go @@ -192,7 +192,7 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) { func TestMain(m *testing.M) { url := "https://registry.harbor.local" config.Config = &config.Configuration{ - RegistryURL: url, + CoreURL: url, } if err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory); err != nil { os.Exit(1) diff --git a/src/replication/replication.go b/src/replication/replication.go index 48418ec9e..e15d2aeee 100644 --- a/src/replication/replication.go +++ b/src/replication/replication.go @@ -51,17 +51,12 @@ var ( // Init the global variables and configurations func Init(closing chan struct{}) error { // init config - registryURL, err := cfg.RegistryURL() - if err != nil { - return err - } secretKey, err := cfg.SecretKey() if err != nil { return err } config.Config = &config.Configuration{ CoreURL: cfg.InternalCoreURL(), - RegistryURL: registryURL, TokenServiceURL: cfg.InternalTokenServiceEndpoint(), JobserviceURL: cfg.InternalJobServiceURL(), SecretKey: secretKey, diff --git a/src/replication/transfer/image/transfer.go b/src/replication/transfer/image/transfer.go index e91f17296..b88ac73d5 100644 --- a/src/replication/transfer/image/transfer.go +++ b/src/replication/transfer/image/transfer.go @@ -134,58 +134,69 @@ func (t *transfer) copy(src *repository, dst *repository, override bool) error { dstRepo := dst.repository t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...", srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ",")) + var err error for i := range src.tags { - srcTag := src.tags[i] - dstTag := dst.tags[i] - t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...", - srcRepo, srcTag, dstRepo, dstTag) - // pull the manifest from the source registry - manifest, digest, err := t.pullManifest(srcRepo, srcTag) - if err != nil { - return err + if e := t.copyTag(srcRepo, src.tags[i], dstRepo, dst.tags[i], override); e != nil { + t.logger.Errorf(e.Error()) + err = e } - - // check the existence of the image on the destination registry - exist, digest2, err := t.exist(dstRepo, dstTag) - if err != nil { - return err - } - if exist { - // the same image already exists - if digest == digest2 { - t.logger.Infof("the image %s:%s already exists on the destination registry, skip", - dstRepo, dstTag) - continue - } - // the same name image exists, but not allowed to override - if !override { - t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip", - dstRepo, dstTag) - continue - } - // the same name image exists, but allowed to override - t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...", - dstRepo, dstTag) - } - - // copy blobs between the source and destination registries - if err = t.copyBlobs(manifest.References(), srcRepo, dstRepo); err != nil { - return err - } - - // push the manifest to the destination registry - if err := t.pushManifest(manifest, dstRepo, dstTag); err != nil { - return err - } - - t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed", - srcRepo, srcTag, dstRepo, dstTag) } + if err != nil { + return err + } + t.logger.Infof("copy %s:[%s](source registry) to %s:[%s](destination registry) completed", srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ",")) return nil } +func (t *transfer) copyTag(srcRepo, srcTag, dstRepo, dstTag string, override bool) error { + t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...", + srcRepo, srcTag, dstRepo, dstTag) + // pull the manifest from the source registry + manifest, digest, err := t.pullManifest(srcRepo, srcTag) + if err != nil { + return err + } + + // check the existence of the image on the destination registry + exist, digest2, err := t.exist(dstRepo, dstTag) + if err != nil { + return err + } + if exist { + // the same image already exists + if digest == digest2 { + t.logger.Infof("the image %s:%s already exists on the destination registry, skip", + dstRepo, dstTag) + return nil + } + // the same name image exists, but not allowed to override + if !override { + t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip", + dstRepo, dstTag) + return nil + } + // the same name image exists, but allowed to override + t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...", + dstRepo, dstTag) + } + + // copy blobs between the source and destination registries + if err = t.copyBlobs(manifest.References(), srcRepo, dstRepo); err != nil { + return err + } + + // push the manifest to the destination registry + if err := t.pushManifest(manifest, dstRepo, dstTag); err != nil { + return err + } + + t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed", + srcRepo, srcTag, dstRepo, dstTag) + return nil +} + func (t *transfer) pullManifest(repository, tag string) ( distribution.Manifest, string, error) { if t.shouldStop() {