Implement registries manager

Signed-off-by: cd1989 <chende@caicloud.io>
This commit is contained in:
cd1989 2019-01-28 16:39:07 +08:00
parent 681453720f
commit 6bdf3053a7
9 changed files with 479 additions and 90 deletions

View File

@ -76,6 +76,7 @@ type RepTarget struct {
Password string `orm:"column(password)" json:"password"`
Type int `orm:"column(target_type)" json:"type"`
Insecure bool `orm:"column(insecure)" json:"insecure"`
Health string `orm:"column(health)" json:"health"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}

208
src/core/api/registry.go Normal file
View File

@ -0,0 +1,208 @@
package api
import (
"fmt"
"net/http"
"strconv"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
// RegistryAPI handles requests to /api/registries/{}. It manages registries integrated to Harbor.
type RegistryAPI struct {
BaseController
manager registry.Manager
}
// Prepare validates the user
func (t *RegistryAPI) Prepare() {
t.BaseController.Prepare()
if !t.SecurityCtx.IsAuthenticated() {
t.HandleUnauthorized()
return
}
if !t.SecurityCtx.IsSysAdmin() {
t.HandleForbidden(t.SecurityCtx.GetUsername())
return
}
t.manager = registry.NewDefaultManager()
if t.manager == nil {
log.Error("failed to create registry manager")
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
// Get gets a registry by id.
func (t *RegistryAPI) Get() {
id := t.GetIDFromURL()
registry, err := dao.GetRepTarget(id)
if err != nil {
log.Errorf("failed to get registry %d: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if registry == nil {
t.HandleNotFound(fmt.Sprintf("registry %d not found", id))
return
}
// Hide password
registry.Password = ""
t.Data["json"] = registry
t.ServeJSON()
}
// List lists all registries that match a given registry name.
func (t *RegistryAPI) List() {
name := t.GetString("name")
registries, err := dao.FilterRepTargets(name)
if err != nil {
log.Errorf("failed to filter registries %s: %v", name, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
// Hide passwords
for _, registry := range registries {
registry.Password = ""
}
t.Data["json"] = registries
t.ServeJSON()
return
}
// Post creates a registry
func (t *RegistryAPI) Post() {
registry := &models.RepTarget{}
t.DecodeJSONReqAndValidate(registry)
reg, err := dao.GetRepTargetByName(registry.Name)
if err != nil {
log.Errorf("failed to get registry %s: %v", registry.Name, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if reg != nil {
t.HandleConflict(fmt.Sprintf("name '%s' is already used"), registry.Name)
return
}
reg, err = dao.GetRepTargetByEndpoint(registry.URL)
if err != nil {
log.Errorf("failed to get registry by URL [ %s ]: %v", registry.URL, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if reg != nil {
t.HandleConflict(fmt.Sprintf("registry with endpoint '%s' already exists", registry.URL))
return
}
id, err := t.manager.AddRegistry(registry)
if err != nil {
log.Errorf("Add registry '%s' error: %v", registry.URL, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
t.Redirect(http.StatusCreated, strconv.FormatInt(id, 10))
}
// Put updates a registry
func (t *RegistryAPI) Put() {
id := t.GetIDFromURL()
registry, err := t.manager.GetRegistry(id)
if err != nil {
log.Errorf("Get registry by id %d error: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
req := struct {
Name *string `json:"name"`
Endpoint *string `json:"endpoint"`
Username *string `json:"username"`
Password *string `json:"password"`
Insecure *bool `json:"insecure"`
}{}
t.DecodeJSONReq(&req)
originalName := registry.Name
originalURL := registry.URL
if req.Name != nil {
registry.Name = *req.Name
}
if req.Endpoint != nil {
registry.URL = *req.Endpoint
}
if req.Username != nil {
registry.Username = *req.Username
}
if req.Password != nil {
registry.Password = *req.Password
}
if req.Insecure != nil {
registry.Insecure = *req.Insecure
}
t.Validate(registry)
if registry.Name != originalName {
reg, err := dao.GetRepTargetByName(registry.Name)
if err != nil {
log.Errorf("Get registry by name '%s' error: %v", registry.Name, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if reg != nil {
t.HandleConflict("name is already used")
return
}
}
if registry.URL != originalURL {
reg, err := dao.GetRepTargetByEndpoint(registry.URL)
if err != nil {
log.Errorf("Get registry by URL '%s' error: %v", registry.URL, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if reg != nil {
t.HandleConflict(fmt.Sprintf("registry with endpoint '%s' already exists", registry.URL))
return
}
}
if err := t.manager.UpdateRegistry(registry); err != nil {
log.Errorf("Update registry %d error: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
// Delete deletes a registry
func (t *RegistryAPI) Delete() {
id := t.GetIDFromURL()
registry, err := dao.GetRepTarget(id)
if err != nil {
log.Errorf("Get registry %d error: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if registry == nil {
t.HandleNotFound(fmt.Sprintf("target %d not found", id))
return
}
if err := t.manager.DeleteRegistry(id); err != nil {
log.Errorf("Delete registry %d error: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}

View File

@ -39,6 +39,8 @@ import (
"github.com/goharbor/harbor/src/core/service/token"
"github.com/goharbor/harbor/src/replication/core"
_ "github.com/goharbor/harbor/src/replication/event"
"os/signal"
"syscall"
)
const (
@ -71,6 +73,13 @@ func updateInitPassword(userID int, password string) error {
return nil
}
func gracefulShutdown(closing chan struct{}) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
log.Infof("capture system signal %s, to close \"closing\" channel", <-signals)
close(closing)
}
func main() {
beego.BConfig.WebConfig.Session.SessionOn = true
beego.BConfig.WebConfig.Session.SessionName = "sid"
@ -128,7 +137,10 @@ func main() {
}
}
if err := core.Init(); err != nil {
closing := make(chan struct{})
go gracefulShutdown(closing)
if err := core.Init(closing); err != nil {
log.Errorf("failed to initialize the replication controller: %v", err)
}

View File

@ -127,6 +127,9 @@ func initRouters() {
beego.Router("/service/notifications/jobs/adminjob/:id([0-9]+)", &admin.Handler{}, "post:HandleAdminJob")
beego.Router("/service/token", &token.Handler{})
beego.Router("/api/registries", &api.RegistryAPI{}, "get:List;post:Post")
beego.Router("/api/registries/:id([0-9]+)", &api.RegistryAPI{}, "get:Get;put:Put;delete:Delete")
beego.Router("/v2/*", &controllers.RegistryProxy{}, "*:Handle")
// APIs for chart repository

View File

@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"strings"
"time"
common_models "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
@ -25,9 +26,9 @@ import (
"github.com/goharbor/harbor/src/replication"
"github.com/goharbor/harbor/src/replication/models"
"github.com/goharbor/harbor/src/replication/policy"
"github.com/goharbor/harbor/src/replication/registry"
"github.com/goharbor/harbor/src/replication/replicator"
"github.com/goharbor/harbor/src/replication/source"
"github.com/goharbor/harbor/src/replication/target"
"github.com/goharbor/harbor/src/replication/trigger"
"github.com/docker/distribution/uuid"
@ -36,7 +37,7 @@ import (
// Controller defines the methods that a replicatoin controllter should implement
type Controller interface {
policy.Manager
Init() error
Init(closing chan struct{}) error
Replicate(policyID int64, metadata ...map[string]interface{}) error
}
@ -49,8 +50,8 @@ type DefaultController struct {
// Manage the policies
policyManager policy.Manager
// Manage the targets
targetManager target.Manager
// Manage the registries
registryManager registry.Manager
// Handle the things related with source
sourcer *source.Sourcer
@ -77,10 +78,10 @@ type ControllerConfig struct {
func NewDefaultController(cfg ControllerConfig) *DefaultController {
// Controller refer the default instances
ctl := &DefaultController{
policyManager: policy.NewDefaultManager(),
targetManager: target.NewDefaultManager(),
sourcer: source.NewSourcer(),
triggerManager: trigger.NewManager(cfg.CacheCapacity),
policyManager: policy.NewDefaultManager(),
registryManager: registry.NewDefaultManager(),
sourcer: source.NewSourcer(),
triggerManager: trigger.NewManager(cfg.CacheCapacity),
}
ctl.replicator = replicator.NewDefaultReplicator(utils.GetJobServiceClient())
@ -89,13 +90,13 @@ func NewDefaultController(cfg ControllerConfig) *DefaultController {
}
// Init creates the GlobalController and inits it
func Init() error {
func Init(closing chan struct{}) error {
GlobalController = NewDefaultController(ControllerConfig{}) // Use default data
return GlobalController.Init()
return GlobalController.Init(closing)
}
// Init will initialize the controller and the sub components
func (ctl *DefaultController) Init() error {
func (ctl *DefaultController) Init(closing chan struct{}) error {
if ctl.initialized {
return nil
}
@ -105,6 +106,9 @@ func (ctl *DefaultController) Init() error {
ctl.initialized = true
// Start registry health checker to regularly check registries' health status
go registry.NewHealthChecker(time.Second*30, closing).Run()
return nil
}
@ -217,7 +221,7 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i
targets := []*common_models.RepTarget{}
for _, targetID := range policy.TargetIDs {
target, err := ctl.targetManager.GetTarget(targetID)
target, err := ctl.registryManager.GetRegistry(targetID)
if err != nil {
return err
}

View File

@ -0,0 +1,65 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
import (
"time"
"github.com/goharbor/harbor/src/common/utils/log"
)
// MinInterval defines the minimum interval to check registries' health status.
const MinInterval = time.Second * 3
// HealthChecker is used to regularly check all registries' health status and update
// check result to database
type HealthChecker struct {
interval time.Duration
closing chan struct{}
manager Manager
}
// NewHealthChecker creates a new health checker
// - interval specifies the time interval to perform health check for registries
// - closing is a channel to stop the health checker
func NewHealthChecker(interval time.Duration, closing chan struct{}) *HealthChecker {
return &HealthChecker{
interval: interval,
manager: NewDefaultManager(),
closing: closing,
}
}
// Run performs health check for all registries regularly
func (c *HealthChecker) Run() {
interval := c.interval
if c.interval < MinInterval {
interval = MinInterval
}
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
if err := c.manager.HealthCheck(); err != nil {
log.Errorf("Health check error: %v", err)
continue
}
log.Debug("Health Check succeeded")
case <-c.closing:
log.Info("Stop health checker")
return
}
}
}

View File

@ -15,20 +15,179 @@
package registry
import (
"github.com/goharbor/harbor/src/replication/ng/model"
"errors"
"fmt"
"net/http"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
"github.com/goharbor/harbor/src/core/config"
)
// Manager manages registries
// HealthStatus describes whether a target is healthy or not
type HealthStatus string
const (
// Healthy indicates target is healthy
Healthy = "healthy"
// Unhealthy indicates target is unhealthy
Unhealthy = "unhealthy"
// Unknown indicates health status of target is unknown
Unknown = "unknown"
)
// Manager defines the methods that a target manager should implement
type Manager interface {
// Add new registry
Add(*model.Registry) (int64, error)
// List registries, returns total count, registry list and error
List(...*model.RegistryQuery) (int64, []*model.Registry, error)
// Get the specified registry
Get(int64) (*model.Registry, error)
// Update the registry, the "props" are the properties of registry
// that need to be updated
Update(registry *model.Registry, props ...string) error
// Remove the registry with the specified ID
Remove(int64) error
GetRegistry(int64) (*models.RepTarget, error)
AddRegistry(*models.RepTarget) (int64, error)
UpdateRegistry(*models.RepTarget) error
DeleteRegistry(int64) error
HealthCheck() error
}
// DefaultManager implement the Manager interface
type DefaultManager struct{}
// NewDefaultManager returns an instance of DefaultManger
func NewDefaultManager() *DefaultManager {
return &DefaultManager{}
}
// GetRegistry gets a registry by id
func (m *DefaultManager) GetRegistry(id int64) (*models.RepTarget, error) {
target, err := dao.GetRepTarget(id)
if err != nil {
return nil, err
}
if target == nil {
return nil, fmt.Errorf("target '%d' does not exist", id)
}
// decrypt the password
if len(target.Password) > 0 {
key, err := config.SecretKey()
if err != nil {
return nil, err
}
pwd, err := utils.ReversibleDecrypt(target.Password, key)
if err != nil {
return nil, err
}
target.Password = pwd
}
return target, nil
}
// AddRegistry adds a new registry
func (m *DefaultManager) AddRegistry(registry *models.RepTarget) (int64, error) {
var err error
if len(registry.Password) != 0 {
key, err := config.SecretKey()
if err != nil {
return -1, err
}
registry.Password, err = utils.ReversibleEncrypt(registry.Password, key)
if err != nil {
log.Errorf("failed to encrypt password: %v", err)
return -1, err
}
}
id, err := dao.AddRepTarget(*registry)
if err != nil {
log.Errorf("failed to add registry: %v", err)
}
return id, nil
}
// UpdateRegistry updates a registry
func (m *DefaultManager) UpdateRegistry(registry *models.RepTarget) error {
// Encrypt the password if set
if len(registry.Password) > 0 {
key, err := config.SecretKey()
if err != nil {
return err
}
pwd, err := utils.ReversibleEncrypt(registry.Password, key)
if err != nil {
return err
}
registry.Password = pwd
}
return dao.UpdateRepTarget(*registry)
}
// DeleteRegistry deletes a registry
func (m *DefaultManager) DeleteRegistry(id int64) error {
policies, err := dao.GetRepPolicyByTarget(id)
if err != nil {
log.Errorf("Get policies related to registry %d error: %v", id, err)
return err
}
if len(policies) > 0 {
msg := fmt.Sprintf("Can't delete registry with replication policies, %d found", len(policies))
log.Error(msg)
return errors.New(msg)
}
if err = dao.DeleteRepTarget(id); err != nil {
log.Errorf("Delete registry %d error: %v", id, err)
return err
}
return nil
}
// HealthCheck checks health status of every registries and update their status. It will check whether a registry
// is reachable and the credential is valid
func (m *DefaultManager) HealthCheck() error {
registries, err := dao.FilterRepTargets("")
if err != nil {
return err
}
errCount := 0
for _, r := range registries {
status, _ := healthStatus(r)
r.Health = string(status)
err := m.UpdateRegistry(r)
if err != nil {
log.Warningf("Update health status for '%s' error: %v", r.URL, err)
errCount++
continue
}
}
if errCount > 0 {
return fmt.Errorf("%d out of %d registries failed to update health status", errCount, len(registries))
}
return nil
}
func healthStatus(r *models.RepTarget) (HealthStatus, error) {
transport := registry.GetHTTPTransport(r.Insecure)
credential := auth.NewBasicAuthCredential(r.Username, r.Password)
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, credential)
registry, err := registry.NewRegistry(r.URL, &http.Client{
Transport: registry.NewTransport(transport, authorizer),
})
if err != nil {
return Unknown, err
}
err = registry.Ping()
if err != nil {
return Unhealthy, err
}
return Healthy, nil
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package target
package registry
import (
"testing"

View File

@ -1,63 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package target
import (
"fmt"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/core/config"
)
// Manager defines the methods that a target manager should implement
type Manager interface {
GetTarget(int64) (*models.RepTarget, error)
}
// DefaultManager implement the Manager interface
type DefaultManager struct{}
// NewDefaultManager returns an instance of DefaultManger
func NewDefaultManager() *DefaultManager {
return &DefaultManager{}
}
// GetTarget ...
func (d *DefaultManager) GetTarget(id int64) (*models.RepTarget, error) {
target, err := dao.GetRepTarget(id)
if err != nil {
return nil, err
}
if target == nil {
return nil, fmt.Errorf("target '%d' does not exist", id)
}
// decrypt the password
if len(target.Password) > 0 {
key, err := config.SecretKey()
if err != nil {
return nil, err
}
pwd, err := utils.ReversibleDecrypt(target.Password, key)
if err != nil {
return nil, err
}
target.Password = pwd
}
return target, nil
}