Merge pull request #6093 from cd1989/replication-record-id

Add op uuid to image replication
This commit is contained in:
Steven Zou 2018-11-30 14:54:43 +08:00 committed by GitHub
commit ec2ad4d0b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 114 additions and 14 deletions

View File

@ -1506,6 +1506,11 @@ paths:
format: int
required: true
description: The ID of the policy that triggered this job.
- name: op_uuid
in: query
type: string
required: false
description: The UUID of one trigger of replication policy.
- name: num
in: query
type: integer
@ -2018,6 +2023,8 @@ paths:
responses:
'200':
description: Trigger the replication successfully.
schema:
$ref: '#/definitions/ReplicationResponse'
'401':
description: User need to log in first.
'404':
@ -4083,6 +4090,12 @@ definitions:
policy_id:
type: integer
description: The ID of replication policy
ReplicationResponse:
type: object
properties:
uuid:
type: string
description: UUID of the replication
RepositoryDescription:
type: object
properties:

View File

@ -0,0 +1 @@
ALTER TABLE replication_job ADD COLUMN op_uuid varchar(64);

View File

@ -356,6 +356,9 @@ func repJobQueryConditions(query ...*models.RepJobQuery) orm.QuerySeter {
if q.PolicyID != 0 {
qs = qs.Filter("PolicyID", q.PolicyID)
}
if len(q.OpUUID) > 0 {
qs = qs.Filter("OpUUID__exact", q.OpUUID)
}
if len(q.Repository) > 0 {
qs = qs.Filter("Repository__icontains", q.Repository)
}

View File

@ -58,11 +58,11 @@ type RepJob struct {
Status string `orm:"column(status)" json:"status"`
Repository string `orm:"column(repository)" json:"repository"`
PolicyID int64 `orm:"column(policy_id)" json:"policy_id"`
OpUUID string `orm:"column(op_uuid)" json:"op_uuid"`
Operation string `orm:"column(operation)" json:"operation"`
Tags string `orm:"column(tags)" json:"-"`
TagList []string `orm:"-" json:"tags"`
UUID string `orm:"column(job_uuid)" json:"-"`
// Policy RepPolicy `orm:"-" json:"policy"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
@ -126,6 +126,7 @@ func (r *RepPolicy) TableName() string {
// RepJobQuery holds query conditions for replication job
type RepJobQuery struct {
PolicyID int64
OpUUID string
Repository string
Statuses []string
Operations []string

View File

@ -23,6 +23,11 @@ type Replication struct {
PolicyID int64 `json:"policy_id"`
}
// ReplicationResponse describes response of a replication request, it gives
type ReplicationResponse struct {
UUID string `json:"uuid"`
}
// Valid ...
func (r *Replication) Valid(v *validation.Validation) {
if r.PolicyID <= 0 {

View File

@ -17,6 +17,7 @@ package api
import (
"fmt"
"net/http"
"strings"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
@ -26,6 +27,8 @@ import (
"github.com/goharbor/harbor/src/replication/core"
"github.com/goharbor/harbor/src/replication/event/notification"
"github.com/goharbor/harbor/src/replication/event/topic"
"github.com/docker/distribution/uuid"
)
// ReplicationAPI handles API calls for replication
@ -78,16 +81,27 @@ func (r *ReplicationAPI) Post() {
return
}
if err = startReplication(replication.PolicyID); err != nil {
opUUID, err := startReplication(replication.PolicyID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to publish replication topic for policy %d: %v", replication.PolicyID, err))
return
}
log.Infof("replication signal for policy %d sent", replication.PolicyID)
r.Data["json"] = api_models.ReplicationResponse{
UUID: opUUID,
}
r.ServeJSON()
}
func startReplication(policyID int64) error {
return notifier.Publish(topic.StartReplicationTopic,
// startReplication triggers a replication and return the uuid of this replication.
func startReplication(policyID int64) (string, error) {
opUUID := strings.Replace(uuid.Generate().String(), "-", "", -1)
return opUUID, notifier.Publish(topic.StartReplicationTopic,
notification.StartReplicationNotification{
PolicyID: policyID,
Metadata: map[string]interface{}{
"op_uuid": opUUID,
},
})
}

View File

@ -94,6 +94,7 @@ func (ra *RepJobAPI) List() {
query.Repository = ra.GetString("repository")
query.Statuses = ra.GetStrings("status")
query.OpUUID = ra.GetString("op_uuid")
startTimeStr := ra.GetString("start_time")
if len(startTimeStr) != 0 {

View File

@ -192,7 +192,7 @@ func (pa *RepPolicyAPI) Post() {
if policy.ReplicateExistingImageNow {
go func() {
if err = startReplication(id); err != nil {
if _, err = startReplication(id); err != nil {
log.Errorf("failed to send replication signal for policy %d: %v", id, err)
return
}
@ -304,7 +304,7 @@ func (pa *RepPolicyAPI) Put() {
if policy.ReplicateExistingImageNow {
go func() {
if err = startReplication(id); err != nil {
if _, err = startReplication(id); err != nil {
log.Errorf("failed to send replication signal for policy %d: %v", id, err)
return
}

View File

@ -16,6 +16,8 @@ package core
import (
"fmt"
"reflect"
"strings"
common_models "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
@ -27,6 +29,8 @@ import (
"github.com/goharbor/harbor/src/replication/source"
"github.com/goharbor/harbor/src/replication/target"
"github.com/goharbor/harbor/src/replication/trigger"
"github.com/docker/distribution/uuid"
)
// Controller defines the methods that a replicatoin controllter should implement
@ -220,9 +224,16 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i
targets = append(targets, target)
}
// Get operation uuid from metadata, if none provided, generate one.
opUUID, err := getOpUUID(metadata...)
if err != nil {
return err
}
// submit the replication
return ctl.replicator.Replicate(&replicator.Replication{
PolicyID: policyID,
OpUUID: opUUID,
Candidates: candidates,
Targets: targets,
})
@ -290,3 +301,26 @@ func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer)
return source.NewDefaultFilterChain(filters)
}
// getOpUUID get operation uuid from metadata or generate one if none found.
func getOpUUID(metadata ...map[string]interface{}) (string, error) {
if len(metadata) <= 0 {
return strings.Replace(uuid.Generate().String(), "-", "", -1), nil
}
opUUID, ok := metadata[0]["op_uuid"]
if !ok {
return strings.Replace(uuid.Generate().String(), "-", "", -1), nil
}
id, ok := opUUID.(string)
if !ok {
return "", fmt.Errorf("operation uuid should have type 'string', but got '%s'", reflect.TypeOf(opUUID).Name())
}
if id == "" {
return "", fmt.Errorf("provided operation uuid is empty")
}
return id, nil
}

View File

@ -156,3 +156,26 @@ func TestBuildFilterChain(t *testing.T) {
chain := buildFilterChain(policy, sourcer)
assert.Equal(t, 3, len(chain.Filters()))
}
func TestGetOpUUID(t *testing.T) {
uuid, err := getOpUUID()
assert.Nil(t, err)
assert.NotEmpty(t, uuid)
uuid, err = getOpUUID(map[string]interface{}{
"name": "test",
})
assert.Nil(t, err)
assert.NotEmpty(t, uuid)
uuid, err = getOpUUID(map[string]interface{}{
"op_uuid": 0,
})
assert.NotNil(t, err)
uuid, err = getOpUUID(map[string]interface{}{
"op_uuid": "0",
})
assert.Nil(t, err)
assert.Equal(t, uuid, "0")
}

View File

@ -30,6 +30,7 @@ import (
// Replication holds information for a replication
type Replication struct {
PolicyID int64
OpUUID string
Candidates []models.FilterItem
Targets []*common_models.RepTarget
Operation string
@ -60,6 +61,9 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error {
operation := ""
for _, candidate := range replication.Candidates {
strs := strings.SplitN(candidate.Value, ":", 2)
if len(strs) != 2 {
return fmt.Errorf("malforld image '%s'", candidate.Value)
}
repositories[strs[0]] = append(repositories[strs[0]], strs[1])
operation = candidate.Operation
}
@ -69,6 +73,7 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error {
// create job in database
id, err := dao.AddRepJob(common_models.RepJob{
PolicyID: replication.PolicyID,
OpUUID: replication.OpUUID,
Repository: repository,
TagList: tags,
Operation: operation,