2017-06-22 14:01:17 +08:00

504 lines
14 KiB
Go

// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/vmware/harbor/src/common/models"
comutils "github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/common/utils/registry/auth"
"github.com/vmware/harbor/src/jobservice/config"
"github.com/vmware/harbor/src/jobservice/utils"
)
const (
// StateInitialize ...
StateInitialize = "initialize"
// StateCheck ...
StateCheck = "check"
// StatePullManifest ...
StatePullManifest = "pull_manifest"
// StateTransferBlob ...
StateTransferBlob = "transfer_blob"
// StatePushManifest ...
StatePushManifest = "push_manifest"
)
var (
// ErrConflict represents http 409 error
ErrConflict = errors.New("conflict")
)
// BaseHandler holds informations shared by other state handlers
type BaseHandler struct {
project string // project_name
repository string // prject_name/repo_name
tags []string
srcURL string // url of source registry
srcSecret string
dstURL string // url of target registry
dstUsr string // username ...
dstPwd string // password ...
insecure bool // whether skip secure check when using https
srcClient *registry.Repository
dstClient *registry.Repository
manifest distribution.Manifest // manifest of tags[0]
digest string //digest of tags[0]'s manifest
blobs []string // blobs need to be transferred for tags[0]
blobsExistence map[string]bool //key: digest of blob, value: existence
logger *log.Logger
}
// InitBaseHandler initializes a BaseHandler.
func InitBaseHandler(repository, srcURL, srcSecret,
dstURL, dstUsr, dstPwd string, insecure bool, tags []string, logger *log.Logger) *BaseHandler {
base := &BaseHandler{
repository: repository,
tags: tags,
srcURL: srcURL,
srcSecret: srcSecret,
dstURL: dstURL,
dstUsr: dstUsr,
dstPwd: dstPwd,
insecure: insecure,
blobsExistence: make(map[string]bool, 10),
logger: logger,
}
base.project, _ = comutils.ParseRepository(base.repository)
return base
}
// Exit ...
func (b *BaseHandler) Exit() error {
return nil
}
// Initializer creates clients for source and destination registry,
// lists tags of the repository if parameter tags is nil.
type Initializer struct {
*BaseHandler
}
// Enter ...
func (i *Initializer) Enter() (string, error) {
i.logger.Infof("initializing: repository: %s, tags: %v, source URL: %s, destination URL: %s, insecure: %v, destination user: %s",
i.repository, i.tags, i.srcURL, i.dstURL, i.insecure, i.dstUsr)
state, err := i.enter()
if err != nil && retry(err) {
i.logger.Info("waiting for retrying...")
return models.JobRetrying, nil
}
return state, err
}
func (i *Initializer) enter() (string, error) {
c := &http.Cookie{Name: models.UISecretCookie, Value: i.srcSecret}
srcCred := auth.NewCookieCredential(c)
srcClient, err := utils.NewRepositoryClient(i.srcURL, i.insecure, srcCred,
config.InternalTokenServiceEndpoint(), i.repository, "pull", "push", "*")
if err != nil {
i.logger.Errorf("an error occurred while creating source repository client: %v", err)
return "", err
}
i.srcClient = srcClient
dstCred := auth.NewBasicAuthCredential(i.dstUsr, i.dstPwd)
dstClient, err := utils.NewRepositoryClient(i.dstURL, i.insecure, dstCred,
"", i.repository, "pull", "push", "*")
if err != nil {
i.logger.Errorf("an error occurred while creating destination repository client: %v", err)
return "", err
}
i.dstClient = dstClient
if len(i.tags) == 0 {
tags, err := i.srcClient.ListTag()
if err != nil {
i.logger.Errorf("an error occurred while listing tags for source repository: %v", err)
return "", err
}
i.tags = tags
}
i.logger.Infof("initialization completed: project: %s, repository: %s, tags: %v, source URL: %s, destination URL: %s, insecure: %v, destination user: %s",
i.project, i.repository, i.tags, i.srcURL, i.dstURL, i.insecure, i.dstUsr)
return StateCheck, nil
}
// Checker checks the existence of project and the user's privlege to the project
type Checker struct {
*BaseHandler
}
// Enter check existence of project, if it does not exist, create it,
// if it exists, check whether the user has write privilege to it.
func (c *Checker) Enter() (string, error) {
state, err := c.enter()
if err != nil && retry(err) {
c.logger.Info("waiting for retrying...")
return models.JobRetrying, nil
}
return state, err
}
func (c *Checker) enter() (string, error) {
project, err := getProject(c.project)
if err != nil {
c.logger.Infof("failed to get project %s from %s: %v", c.project, c.srcURL, err)
return "", nil
}
err = c.createProject(project)
if err == nil {
c.logger.Infof("project %s is created on %s with user %s", c.project, c.dstURL, c.dstUsr)
return StatePullManifest, nil
}
// other job may be also doing the same thing when the current job
// is creating project, so when the response code is 409, continue
// to do next step
if err == ErrConflict {
c.logger.Warningf("the status code is 409 when creating project %s on %s with user %s, try to do next step", c.project, c.dstURL, c.dstUsr)
return StatePullManifest, nil
}
c.logger.Errorf("an error occurred while creating project %s on %s with user %s : %v", c.project, c.dstURL, c.dstUsr, err)
return "", err
}
func getProject(name string) (*models.Project, error) {
req, err := http.NewRequest(http.MethodGet, buildProjectURL(), nil)
if err != nil {
return nil, err
}
req.URL.Query().Set("name", name)
req.URL.Query().Encode()
req.AddCookie(&http.Cookie{
Name: models.UISecretCookie,
Value: config.JobserviceSecret(),
})
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
list := []*models.Project{}
if err = json.Unmarshal(data, list); err != nil {
return nil, err
}
var project *models.Project
for _, p := range list {
if p.Name == name {
project = p
break
}
}
if project == nil {
return nil, fmt.Errorf("project %s not found", name)
}
return project, nil
}
func (c *Checker) createProject(project *models.Project) error {
pro := struct {
Name string `json:"project_name"`
Public int `json:"public"`
EnableContentTrust bool `json:"enable_content_trust"`
PreventVulnerableImagesFromRunning bool `json:"prevent_vulnerable_images_from_running"`
PreventVulnerableImagesFromRunningSeverity string `json:"prevent_vulnerable_images_from_running_severity"`
AutomaticallyScanImagesOnPush bool `json:"automatically_scan_images_on_push"`
}{
Name: project.Name,
Public: project.Public,
EnableContentTrust: project.EnableContentTrust,
PreventVulnerableImagesFromRunning: project.PreventVulnerableImagesFromRunning,
PreventVulnerableImagesFromRunningSeverity: project.PreventVulnerableImagesFromRunningSeverity,
AutomaticallyScanImagesOnPush: project.AutomaticallyScanImagesOnPush,
}
data, err := json.Marshal(pro)
if err != nil {
return err
}
url := strings.TrimRight(c.dstURL, "/") + "/api/projects/"
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
if err != nil {
return err
}
req.SetBasicAuth(c.dstUsr, c.dstPwd)
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: c.insecure,
},
},
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// version 0.1.1's reponse code is 200
if resp.StatusCode == http.StatusCreated ||
resp.StatusCode == http.StatusOK {
return nil
}
if resp.StatusCode == http.StatusConflict {
return ErrConflict
}
message, err := ioutil.ReadAll(resp.Body)
if err != nil {
c.logger.Errorf("an error occurred while reading message from response: %v", err)
}
return fmt.Errorf("failed to create project %s on %s with user %s: %d %s",
c.project, c.dstURL, c.dstUsr, resp.StatusCode, string(message))
}
func buildProjectURL() string {
return strings.TrimRight(config.LocalUIURL(), "/") + "/api/projects/"
}
// ManifestPuller pulls the manifest of a tag. And if no tag needs to be pulled,
// the next state that state machine should enter is "finished".
type ManifestPuller struct {
*BaseHandler
}
// Enter pulls manifest of a tag and checks if all blobs exist in the destination registry
func (m *ManifestPuller) Enter() (string, error) {
state, err := m.enter()
if err != nil && retry(err) {
m.logger.Info("waiting for retrying...")
return models.JobRetrying, nil
}
return state, err
}
func (m *ManifestPuller) enter() (string, error) {
if len(m.tags) == 0 {
m.logger.Infof("no tag needs to be replicated, next state is \"finished\"")
return models.JobFinished, nil
}
name := m.repository
tag := m.tags[0]
acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest}
digest, mediaType, payload, err := m.srcClient.PullManifest(tag, acceptMediaTypes)
if err != nil {
m.logger.Errorf("an error occurred while pulling manifest of %s:%s from %s: %v", name, tag, m.srcURL, err)
return "", err
}
m.digest = digest
m.logger.Infof("manifest of %s:%s pulled successfully from %s: %s", name, tag, m.srcURL, digest)
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := registry.UnMarshal(mediaType, payload)
if err != nil {
m.logger.Errorf("an error occurred while parsing manifest of %s:%s from %s: %v", name, tag, m.srcURL, err)
return "", err
}
m.manifest = manifest
// all blobs(layers and config)
var blobs []string
for _, discriptor := range manifest.References() {
blobs = append(blobs, discriptor.Digest.String())
}
m.logger.Infof("all blobs of %s:%s from %s: %v", name, tag, m.srcURL, blobs)
for _, blob := range blobs {
exist, ok := m.blobsExistence[blob]
if !ok {
exist, err = m.dstClient.BlobExist(blob)
if err != nil {
m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err)
return "", err
}
m.blobsExistence[blob] = exist
}
if !exist {
m.blobs = append(m.blobs, blob)
} else {
m.logger.Infof("blob %s of %s:%s already exists in %s", blob, name, tag, m.dstURL)
}
}
m.logger.Infof("blobs of %s:%s need to be transferred to %s: %v", name, tag, m.dstURL, m.blobs)
return StateTransferBlob, nil
}
// BlobTransfer transfers blobs of a tag
type BlobTransfer struct {
*BaseHandler
}
// Enter pulls blobs and then pushs them to destination registry.
func (b *BlobTransfer) Enter() (string, error) {
state, err := b.enter()
if err != nil && retry(err) {
b.logger.Info("waiting for retrying...")
return models.JobRetrying, nil
}
return state, err
}
func (b *BlobTransfer) enter() (string, error) {
name := b.repository
tag := b.tags[0]
for _, blob := range b.blobs {
b.logger.Infof("transferring blob %s of %s:%s to %s ...", blob, name, tag, b.dstURL)
size, data, err := b.srcClient.PullBlob(blob)
if err != nil {
b.logger.Errorf("an error occurred while pulling blob %s of %s:%s from %s: %v", blob, name, tag, b.srcURL, err)
return "", err
}
if data != nil {
defer data.Close()
}
if err = b.dstClient.PushBlob(blob, size, data); err != nil {
b.logger.Errorf("an error occurred while pushing blob %s of %s:%s to %s : %v", blob, name, tag, b.dstURL, err)
return "", err
}
b.logger.Infof("blob %s of %s:%s transferred to %s completed", blob, name, tag, b.dstURL)
}
return StatePushManifest, nil
}
// ManifestPusher pushs the manifest to destination registry
type ManifestPusher struct {
*BaseHandler
}
// Enter checks the existence of manifest in the source registry first, and if it
// exists, pushs it to destination registry. The checking operation is to avoid
// the situation that the tag is deleted during the blobs transfering
func (m *ManifestPusher) Enter() (string, error) {
state, err := m.enter()
if err != nil && retry(err) {
m.logger.Info("waiting for retrying...")
return models.JobRetrying, nil
}
return state, err
}
func (m *ManifestPusher) enter() (string, error) {
name := m.repository
tag := m.tags[0]
_, exist, err := m.srcClient.ManifestExist(tag)
if err != nil {
m.logger.Infof("an error occurred while checking the existence of manifest of %s:%s on %s: %v", name, tag, m.srcURL, err)
return "", err
}
if !exist {
m.logger.Infof("manifest of %s:%s does not exist on source registry %s, cancel manifest pushing", name, tag, m.srcURL)
} else {
m.logger.Infof("manifest of %s:%s exists on source registry %s, continue manifest pushing", name, tag, m.srcURL)
digest, manifestExist, err := m.dstClient.ManifestExist(tag)
if manifestExist && digest == m.digest {
m.logger.Infof("manifest of %s:%s exists on destination registry %s, skip manifest pushing", name, tag, m.dstURL)
m.tags = m.tags[1:]
m.manifest = nil
m.digest = ""
m.blobs = nil
return StatePullManifest, nil
}
mediaType, data, err := m.manifest.Payload()
if err != nil {
m.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v", name, tag, err)
return "", err
}
if _, err = m.dstClient.PushManifest(tag, mediaType, data); err != nil {
m.logger.Errorf("an error occurred while pushing manifest of %s:%s to %s : %v", name, tag, m.dstURL, err)
return "", err
}
m.logger.Infof("manifest of %s:%s has been pushed to %s", name, tag, m.dstURL)
}
m.tags = m.tags[1:]
m.manifest = nil
m.digest = ""
m.blobs = nil
return StatePullManifest, nil
}