Create replicator to submit replication job to jobservice

This commit is contained in:
Wenkai Yin 2017-12-07 19:07:51 +08:00
parent 8b4fdfc2cc
commit fe10c2e7f5
18 changed files with 476 additions and 107 deletions

View File

@ -23,8 +23,8 @@ import (
"net/http"
"strings"
"github.com/vmware/harbor/src/adminserver/client/auth"
"github.com/vmware/harbor/src/adminserver/systeminfo/imagestorage"
httpclient "github.com/vmware/harbor/src/common/http/client"
"github.com/vmware/harbor/src/common/utils"
)
@ -43,22 +43,20 @@ type Client interface {
}
// NewClient return an instance of Adminserver client
func NewClient(baseURL string, authorizer auth.Authorizer) Client {
func NewClient(baseURL string, c httpclient.Client) Client {
baseURL = strings.TrimRight(baseURL, "/")
if !strings.Contains(baseURL, "://") {
baseURL = "http://" + baseURL
}
return &client{
baseURL: baseURL,
client: &http.Client{},
authorizer: authorizer,
baseURL: baseURL,
client: c,
}
}
type client struct {
baseURL string
client *http.Client
authorizer auth.Authorizer
baseURL string
client httpclient.Client
}
// do creates request and authorizes it if authorizer is not nil
@ -69,11 +67,6 @@ func (c *client) do(method, relativePath string, body io.Reader) (*http.Response
return nil, err
}
if c.authorizer != nil {
if err := c.authorizer.Authorize(req); err != nil {
return nil, err
}
}
return c.client.Do(req)
}

View File

@ -16,6 +16,7 @@ package client
import (
"fmt"
"net/http"
"os"
"testing"
@ -34,7 +35,7 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
c = NewClient(server.URL, nil)
c = NewClient(server.URL, &http.Client{})
os.Exit(m.Run())
}

View File

@ -0,0 +1,73 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"errors"
"net/http"
)
const (
secretCookieName = "secret"
)
// Authorizer authorizes the requests
type Authorizer interface {
Authorize(*http.Request) error
}
// CookieAuthorizer authorizes the requests by adding cookie specified by name and value
type CookieAuthorizer struct {
name string
value string
}
// NewCookieAuthorizer returns an instance of CookieAuthorizer
func NewCookieAuthorizer(name, value string) *CookieAuthorizer {
return &CookieAuthorizer{
name: name,
value: value,
}
}
// Authorize the request with the cookie
func (c *CookieAuthorizer) Authorize(req *http.Request) error {
if req == nil {
return errors.New("the request is null")
}
req.AddCookie(&http.Cookie{
Name: c.name,
Value: c.value,
})
return nil
}
// SecretAuthorizer authorizes the requests with the specified secret
type SecretAuthorizer struct {
authorizer *CookieAuthorizer
}
// NewSecretAuthorizer returns an instance of SecretAuthorizer
func NewSecretAuthorizer(secret string) *SecretAuthorizer {
return &SecretAuthorizer{
authorizer: NewCookieAuthorizer(secretCookieName, secret),
}
}
// Authorize the request with the secret
func (s *SecretAuthorizer) Authorize(req *http.Request) error {
return s.authorizer.Authorize(req)
}

View File

@ -0,0 +1,56 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"net/http"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAuthorizeOfCookieAuthorizer(t *testing.T) {
name, value := "name", "value"
authorizer := NewCookieAuthorizer(name, value)
// nil request
require.NotNil(t, authorizer.Authorize(nil))
// valid request
req, err := http.NewRequest("", "", nil)
require.Nil(t, err)
require.Nil(t, authorizer.Authorize(req))
require.Equal(t, 1, len(req.Cookies()))
v, err := req.Cookie(name)
require.Nil(t, err)
assert.Equal(t, value, v.Value)
}
func TestAuthorizeOfSecretAuthorizer(t *testing.T) {
secret := "secret"
authorizer := NewSecretAuthorizer(secret)
// nil request
require.NotNil(t, authorizer.Authorize(nil))
// valid request
req, err := http.NewRequest("", "", nil)
require.Nil(t, err)
require.Nil(t, authorizer.Authorize(req))
require.Equal(t, 1, len(req.Cookies()))
v, err := req.Cookie(secretCookieName)
require.Nil(t, err)
assert.Equal(t, secret, v.Value)
}

View File

@ -0,0 +1,56 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"net/http"
"github.com/vmware/harbor/src/common/http/client/auth"
)
// Client defines the method that a HTTP client should implement
type Client interface {
Do(*http.Request) (*http.Response, error)
}
// AuthorizedClient authorizes the requests before sending them
type AuthorizedClient struct {
client *http.Client
authorizer auth.Authorizer
}
// NewAuthorizedClient returns an instance of the AuthorizedClient
func NewAuthorizedClient(authorizer auth.Authorizer, client ...*http.Client) *AuthorizedClient {
c := &AuthorizedClient{
authorizer: authorizer,
}
if len(client) > 0 {
c.client = client[0]
}
if c.client == nil {
c.client = &http.Client{}
}
return c
}
// Do authorizes the request before sending it
func (a *AuthorizedClient) Do(req *http.Request) (*http.Response, error) {
if a.authorizer != nil {
if err := a.authorizer.Authorize(req); err != nil {
return nil, err
}
}
return a.client.Do(req)
}

View File

@ -12,39 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
package http
import (
"net/http"
"fmt"
)
// Authorizer authorizes request
type Authorizer interface {
Authorize(*http.Request) error
// Error wrap HTTP status code and message as an error
type Error struct {
Code int
Message string
}
// NewSecretAuthorizer returns an instance of secretAuthorizer
func NewSecretAuthorizer(cookieName, secret string) Authorizer {
return &secretAuthorizer{
cookieName: cookieName,
secret: secret,
}
}
type secretAuthorizer struct {
cookieName string
secret string
}
func (s *secretAuthorizer) Authorize(req *http.Request) error {
if req == nil {
return nil
}
req.AddCookie(&http.Cookie{
Name: s.cookieName,
Value: s.secret,
})
return nil
// Error ...
func (e *Error) Error() string {
return fmt.Sprintf("http error: code %d, message %s", e.Code, e.Message)
}

View File

@ -0,0 +1,88 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
commonhttp "github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/http/client"
"github.com/vmware/harbor/src/jobservice/api"
)
// Client defines the methods that a jobservice client should implement
type Client interface {
SubmitReplicationJob(*api.ReplicationReq) error
}
// DefaultClient provides a default implement for the interface Client
type DefaultClient struct {
endpoint string
client client.Client
}
// NewDefaultClient returns an instance of DefaultClient
func NewDefaultClient(endpoint string, client ...client.Client) *DefaultClient {
c := &DefaultClient{
endpoint: endpoint,
}
if len(client) > 0 {
c.client = client[0]
}
if c.client == nil {
c.client = &http.Client{}
}
return c
}
// SubmitReplicationJob submits a replication job to the jobservice
func (d *DefaultClient) SubmitReplicationJob(replication *api.ReplicationReq) error {
url := d.endpoint + "/api/jobs/replication"
buffer := &bytes.Buffer{}
if err := json.NewEncoder(buffer).Encode(replication); err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, url, buffer)
if err != nil {
return err
}
resp, err := d.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
message, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return &commonhttp.Error{
Code: resp.StatusCode,
Message: string(message),
}
}
return nil
}

View File

@ -0,0 +1,55 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"encoding/json"
"net/http"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/jobservice/api"
)
var url string
func TestMain(m *testing.M) {
requestMapping := []*test.RequestHandlerMapping{
&test.RequestHandlerMapping{
Method: http.MethodPost,
Pattern: "/api/jobs/replication",
Handler: func(w http.ResponseWriter, r *http.Request) {
replication := &api.ReplicationReq{}
if err := json.NewDecoder(r.Body).Decode(replication); err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
},
},
}
server := test.NewServer(requestMapping...)
defer server.Close()
url = server.URL
os.Exit(m.Run())
}
func TestSubmitReplicationJob(t *testing.T) {
client := NewDefaultClient(url)
err := client.SubmitReplicationJob(&api.ReplicationReq{})
assert.Nil(t, err)
}

View File

@ -20,9 +20,10 @@ import (
"strings"
"github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/adminserver/client/auth"
"github.com/vmware/harbor/src/common"
comcfg "github.com/vmware/harbor/src/common/config"
httpclient "github.com/vmware/harbor/src/common/http/client"
"github.com/vmware/harbor/src/common/http/client/auth"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
)
@ -50,8 +51,8 @@ func Init() error {
adminServerURL = "http://adminserver"
}
log.Infof("initializing client for adminserver %s ...", adminServerURL)
authorizer := auth.NewSecretAuthorizer(secretCookieName, UISecret())
AdminserverClient = client.NewClient(adminServerURL, authorizer)
authorizer := auth.NewSecretAuthorizer(UISecret())
AdminserverClient = client.NewClient(adminServerURL, httpclient.NewAuthorizedClient(authorizer))
if err := AdminserverClient.Ping(); err != nil {
return fmt.Errorf("failed to ping adminserver: %v", err)
}

View File

@ -23,11 +23,6 @@ const (
//TriggerScheduleWeekly : type of scheduling is 'weekly'
TriggerScheduleWeekly = "weekly"
//OperationPush : push operation
OperationPush = "push"
//OperationDelete : delete operation
OperationDelete = "delete"
// PatternMatchAll : the pattern that match all
PatternMatchAll = ".*"
)

View File

@ -16,15 +16,21 @@ package core
import (
"fmt"
"strings"
"github.com/vmware/harbor/src/common/http/client"
"github.com/vmware/harbor/src/common/http/client/auth"
common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/api"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/policy"
"github.com/vmware/harbor/src/replication/replicator"
"github.com/vmware/harbor/src/replication/source"
"github.com/vmware/harbor/src/replication/target"
"github.com/vmware/harbor/src/replication/trigger"
"github.com/vmware/harbor/src/ui/config"
)
// Controller defines the methods that a replicatoin controllter should implement
@ -51,6 +57,9 @@ type DefaultController struct {
//Manage the triggers of policies
triggerManager *trigger.Manager
//Handle the replication work
replicator replicator.Replicator
}
//Keep controller as singleton instance
@ -65,14 +74,20 @@ type ControllerConfig struct {
}
//NewDefaultController is the constructor of DefaultController.
func NewDefaultController(config ControllerConfig) *DefaultController {
func NewDefaultController(cfg ControllerConfig) *DefaultController {
//Controller refer the default instances
return &DefaultController{
ctl := &DefaultController{
policyManager: policy.NewDefaultManager(),
targetManager: target.NewDefaultManager(),
sourcer: source.NewSourcer(),
triggerManager: trigger.NewManager(config.CacheCapacity),
triggerManager: trigger.NewManager(cfg.CacheCapacity),
}
endpoint := config.InternalJobServiceURL()
client := client.NewAuthorizedClient(auth.NewSecretAuthorizer(config.UISecret()))
ctl.replicator = replicator.NewDefaultReplicator(endpoint, client)
return ctl
}
//Init will initialize the controller and the sub components
@ -82,16 +97,8 @@ func (ctl *DefaultController) Init() error {
}
//Build query parameters
triggerNames := []string{
replication.TriggerKindSchedule,
}
queryName := ""
for _, name := range triggerNames {
queryName = fmt.Sprintf("%s,%s", queryName, name)
}
//Enable the triggers
query := models.QueryParameter{
TriggerName: queryName,
TriggerType: replication.TriggerKindSchedule,
}
policies, err := ctl.policyManager.GetPolicies(query)
@ -233,19 +240,22 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i
// prepare candidates for replication
candidates = getCandidates(&policy, ctl.sourcer, candidates...)
targets := []*common_models.RepTarget{}
for _, targetID := range policy.TargetIDs {
target, err := ctl.targetManager.GetTarget(targetID)
if err != nil {
return err
// TODO
/*
targets := []*common_models.RepTarget{}
for _, targetID := range policy.TargetIDs {
target, err := ctl.targetManager.GetTarget(targetID)
if err != nil {
return err
}
targets = append(targets, target)
}
targets = append(targets, target)
}
*/
// TODO merge tags whose repository is same into one struct
// call job service to do the replication
return replicate(candidates, targets)
// submit the replication
return replicate(ctl.replicator, policyID, candidates)
}
func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, candidates ...models.FilterItem) []models.FilterItem {
@ -254,7 +264,7 @@ func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, ca
candidates = append(candidates, models.FilterItem{
Kind: replication.FilterItemKindProject,
Value: namespace,
Operation: replication.OperationPush,
Operation: common_models.RepOpTransfer,
})
}
}
@ -313,10 +323,28 @@ func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer)
return source.NewDefaultFilterChain(filters)
}
func replicate(candidates []models.FilterItem, targets []*common_models.RepTarget) error {
// TODO
log.Infof("replicate candidates %v to targets %v", candidates, targets)
func replicate(replicator replicator.Replicator, policyID int64, candidates []models.FilterItem) error {
repositories := map[string][]string{}
// TODO the operation of all candidates are same for now. Update it after supporting
// replicate deletion
operation := ""
for _, candidate := range candidates {
strs := strings.SplitN(candidate.Value, ":", 2)
repositories[strs[0]] = append(repositories[strs[0]], strs[1])
operation = candidate.Operation
}
for repository, tags := range repositories {
replication := &api.ReplicationReq{
PolicyID: policyID,
Repo: repository,
Operation: operation,
TagList: tags,
}
log.Debugf("submiting replication job to jobservice: %v", replication)
if err := replicator.Replicate(replication); err != nil {
return err
}
}
return nil
}

View File

@ -19,7 +19,7 @@ import (
"fmt"
"reflect"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/replication/event/notification"
)
@ -38,7 +38,7 @@ func (oph *OnDeletionHandler) Handle(value interface{}) error {
}
notification := value.(notification.OnDeletionNotification)
return checkAndTriggerReplication(notification.Image, replication.OperationDelete)
return checkAndTriggerReplication(notification.Image, models.RepOpDelete)
}
//IsStateful implements the same method of notification handler interface

View File

@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log"
@ -45,7 +46,7 @@ func (oph *OnPushHandler) Handle(value interface{}) error {
notification := value.(notification.OnPushNotification)
return checkAndTriggerReplication(notification.Image, replication.OperationPush)
return checkAndTriggerReplication(notification.Image, common_models.RepOpTransfer)
}
//IsStateful implements the same method of notification handler interface
@ -68,7 +69,7 @@ func checkAndTriggerReplication(image, operation string) error {
}
for _, watchItem := range watchItems {
item := &models.FilterItem{
item := models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: image,
Metadata: map[string]interface{}{
@ -79,7 +80,7 @@ func checkAndTriggerReplication(image, operation string) error {
if err := notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{
PolicyID: watchItem.PolicyID,
Metadata: map[string]interface{}{
"": []*models.FilterItem{item},
"candidates": []models.FilterItem{item},
},
}); err != nil {
return fmt.Errorf("failed to publish replication topic for resource %s, operation %s, policy %d: %v",

View File

@ -27,8 +27,8 @@ type QueryParameter struct {
//Size of each page, couple with page
PageSize int64
//Query by the name of trigger
TriggerName string
//Query by the type of trigger
TriggerType string
//Query by project ID
ProjectID int64

View File

@ -55,6 +55,13 @@ func (m *DefaultManager) GetPolicies(query models.QueryParameter) ([]models.Repl
if err != nil {
return []models.ReplicationPolicy{}, err
}
if len(query.TriggerType) > 0 {
if ply.Trigger.Kind != query.TriggerType {
continue
}
}
result = append(result, ply)
}

View File

@ -0,0 +1,43 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replicator
import (
"github.com/vmware/harbor/src/common/http/client"
"github.com/vmware/harbor/src/jobservice/api"
jobserviceclient "github.com/vmware/harbor/src/jobservice/client"
)
// Replicator submits the replication work to the jobservice
type Replicator interface {
Replicate(*api.ReplicationReq) error
}
// DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct {
client jobserviceclient.Client
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(endpoint string, client ...client.Client) *DefaultReplicator {
return &DefaultReplicator{
client: jobserviceclient.NewDefaultClient(endpoint, client...),
}
}
// Replicate ...
func (d *DefaultReplicator) Replicate(replication *api.ReplicationReq) error {
return d.client.SubmitReplicationJob(replication)
}

View File

@ -12,32 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
package replicator
import (
"net/http"
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/jobservice/api"
)
func TestAuthorize(t *testing.T) {
cookieName := "secret"
secret := "secret"
authorizer := NewSecretAuthorizer(cookieName, secret)
req, err := http.NewRequest("", "", nil)
if !assert.Nil(t, err, "unexpected error") {
return
}
type fakeJobserviceClient struct{}
err = authorizer.Authorize(req)
if !assert.Nil(t, err, "unexpected error") {
return
}
cookie, err := req.Cookie(cookieName)
if !assert.Nil(t, err, "unexpected error") {
return
}
assert.Equal(t, secret, cookie.Value, "unexpected cookie")
func (f *fakeJobserviceClient) SubmitReplicationJob(replication *api.ReplicationReq) error {
return nil
}
func TestReplicate(t *testing.T) {
replicator := NewDefaultReplicator("http://jobservice")
replicator.client = &fakeJobserviceClient{}
assert.Nil(t, replicator.Replicate(&api.ReplicationReq{}))
}

View File

@ -23,9 +23,10 @@ import (
"strings"
"github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/adminserver/client/auth"
"github.com/vmware/harbor/src/common"
comcfg "github.com/vmware/harbor/src/common/config"
httpclient "github.com/vmware/harbor/src/common/http/client"
"github.com/vmware/harbor/src/common/http/client/auth"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/secret"
"github.com/vmware/harbor/src/common/utils/log"
@ -73,8 +74,8 @@ func Init() error {
// InitByURL Init configurations with given url
func InitByURL(adminServerURL string) error {
log.Infof("initializing client for adminserver %s ...", adminServerURL)
authorizer := auth.NewSecretAuthorizer(secretCookieName, UISecret())
AdminserverClient = client.NewClient(adminServerURL, authorizer)
authorizer := auth.NewSecretAuthorizer(UISecret())
AdminserverClient = client.NewClient(adminServerURL, httpclient.NewAuthorizedClient(authorizer))
if err := AdminserverClient.Ping(); err != nil {
return fmt.Errorf("failed to ping adminserver: %v", err)
}