Add quota sync api toi to sync quota data with backend storage

Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
Wang Yan 2019-08-09 17:02:21 +08:00 committed by wang yan
parent 022d4e6ae8
commit 7a41d89ac8
8 changed files with 822 additions and 8 deletions

View File

@ -58,6 +58,7 @@ func UpdateArtifactPullTime(af *models.Artifact) error {
// DeleteArtifact ...
func DeleteArtifact(id int64) error {
_, err := GetOrmer().QueryTable(&models.Artifact{}).Filter("ID", id).Delete()
return err
}

View File

@ -19,12 +19,15 @@ import (
"github.com/goharbor/harbor/src/common"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
common_quota "github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/pkg/errors"
"strconv"
quota "github.com/goharbor/harbor/src/core/api/quota"
)
// InternalAPI handles request of harbor admin...
@ -132,14 +135,14 @@ func (ia *InternalAPI) ensureQuota() error {
pCount = pCount + int64(count)
}
quotaMgr, err := quota.NewManager("project", strconv.FormatInt(project.ProjectID, 10))
quotaMgr, err := common_quota.NewManager("project", strconv.FormatInt(project.ProjectID, 10))
if err != nil {
logger.Errorf("Error occurred when to new quota manager %v, just skip it.", err)
continue
}
used := quota.ResourceList{
quota.ResourceStorage: pSize,
quota.ResourceCount: pCount,
used := common_quota.ResourceList{
common_quota.ResourceStorage: pSize,
common_quota.ResourceCount: pCount,
}
if err := quotaMgr.EnsureQuota(used); err != nil {
logger.Errorf("cannot ensure quota for the project: %d, err: %v, just skip it.", project.ProjectID, err)
@ -148,3 +151,12 @@ func (ia *InternalAPI) ensureQuota() error {
}
return nil
}
// SyncQuota ...
func (ia *InternalAPI) SyncQuota() {
err := quota.Sync(ia.ProjectMgr, false)
if err != nil {
ia.SendInternalServerError(err)
return
}
}

View File

@ -0,0 +1,217 @@
// Copyright 2018 Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chart
import (
"fmt"
"github.com/goharbor/harbor/src/chartserver"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
common_quota "github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log"
quota "github.com/goharbor/harbor/src/core/api/quota"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr"
"github.com/pkg/errors"
"net/url"
"strings"
"sync"
)
// Migrator ...
type Migrator struct {
pm promgr.ProjectManager
}
// NewChartMigrator returns a new RegistryMigrator.
func NewChartMigrator(pm promgr.ProjectManager) quota.QuotaMigrator {
migrator := Migrator{
pm: pm,
}
return &migrator
}
var (
controller *chartserver.Controller
controllerErr error
controllerOnce sync.Once
)
// Dump ...
// Depends on DB to dump chart data, as chart cannot get all of namespaces.
func (rm *Migrator) Dump() ([]quota.ProjectInfo, error) {
var (
projects []quota.ProjectInfo
wg sync.WaitGroup
err error
)
all, err := dao.GetProjects(nil)
if err != nil {
return nil, err
}
wg.Add(len(all))
errChan := make(chan error, 1)
infoChan := make(chan interface{})
done := make(chan bool, 1)
go func() {
defer func() {
done <- true
}()
for {
select {
case result := <-infoChan:
if result == nil {
return
}
project, ok := result.(quota.ProjectInfo)
if ok {
projects = append(projects, project)
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "quota sync error on getting info of project")
} else {
err = errors.Wrap(e, err.Error())
}
}
}
}()
for _, project := range all {
go func(project *models.Project) {
defer wg.Done()
var repos []quota.RepoData
var afs []*models.Artifact
ctr, err := chartController()
if err != nil {
log.Error(err)
}
chartInfo, err := ctr.ListCharts(project.Name)
if err != nil {
log.Error(err)
}
// repo
for _, chart := range chartInfo {
chartVersions, err := ctr.GetChart(project.Name, chart.Name)
if err != nil {
log.Error(err)
}
for _, chart := range chartVersions {
af := &models.Artifact{
PID: project.ProjectID,
Repo: chart.Name,
Tag: chart.Version,
Digest: chart.Digest,
Kind: "Chart",
}
afs = append(afs, af)
}
repoData := quota.RepoData{
Name: project.Name,
Afs: afs,
}
repos = append(repos, repoData)
}
projectInfo := quota.ProjectInfo{
Name: project.Name,
Repos: repos,
}
infoChan <- projectInfo
}(project)
}
wg.Done()
close(infoChan)
<-done
if err != nil {
return nil, err
}
return projects, nil
}
// Usage ...
// Chart will not cover size.
func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, error) {
var pros []quota.ProjectUsage
for _, project := range projects {
var count int64
// usage count
for _, repo := range project.Repos {
count = count + int64(len(repo.Afs))
}
proUsage := quota.ProjectUsage{
Project: project.Name,
Used: common_quota.ResourceList{
common_quota.ResourceCount: count,
},
}
pros = append(pros, proUsage)
}
return pros, nil
}
// Persist ...
// Chart will not persist data into db.
func (rm *Migrator) Persist(projects []quota.ProjectInfo) error {
return nil
}
func chartController() (*chartserver.Controller, error) {
controllerOnce.Do(func() {
addr, err := config.GetChartMuseumEndpoint()
if err != nil {
controllerErr = fmt.Errorf("failed to get the endpoint URL of chart storage server: %s", err.Error())
return
}
addr = strings.TrimSuffix(addr, "/")
url, err := url.Parse(addr)
if err != nil {
controllerErr = errors.New("endpoint URL of chart storage server is malformed")
return
}
ctr, err := chartserver.NewController(url)
if err != nil {
controllerErr = errors.New("failed to initialize chart API controller")
}
controller = ctr
log.Debugf("Chart storage server is set to %s", url.String())
log.Info("API controller for chart repository server is successfully initialized")
})
return controller, controllerErr
}
func init() {
quota.Register("chart", NewChartMigrator)
}

View File

@ -0,0 +1,128 @@
// Copyright 2018 Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"fmt"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr"
"strconv"
)
// QuotaMigrator ...
type QuotaMigrator interface {
// Dump exports all data from backend service, registry, chartmuseum
Dump() ([]ProjectInfo, error)
// Usage computes the quota usage of all the projects
Usage([]ProjectInfo) ([]ProjectUsage, error)
// Persist record the data to DB, artifact, artifact_blob and blob tabel.
Persist([]ProjectInfo) error
}
// ProjectInfo ...
type ProjectInfo struct {
Name string
Repos []RepoData
}
// RepoData ...
type RepoData struct {
Name string
Afs []*models.Artifact
Afnbs []*models.ArtifactAndBlob
Blobs []*models.Blob
}
// ProjectUsage ...
type ProjectUsage struct {
Project string
Used quota.ResourceList
}
// Instance ...
type Instance func(promgr.ProjectManager) QuotaMigrator
var adapters = make(map[string]Instance)
// Register ...
func Register(name string, adapter Instance) {
if adapter == nil {
panic("quota: Register adapter is nil")
}
if _, ok := adapters[name]; ok {
panic("quota: Register called twice for adapter " + name)
}
adapters[name] = adapter
}
// Sync ...
func Sync(pm promgr.ProjectManager, populate bool) error {
for name := range adapters {
if !config.WithChartMuseum() {
if name == "chart" {
continue
}
}
instanceFunc, ok := adapters[name]
if !ok {
err := fmt.Errorf("quota migtator: unknown adapter name %q", name)
return err
}
adapter := instanceFunc(pm)
data, err := adapter.Dump()
if err != nil {
return err
}
usage, err := adapter.Usage(data)
if err := ensureQuota(usage); err != nil {
return err
}
if populate {
if err := adapter.Persist(data); err != nil {
return err
}
}
}
return nil
}
// ensureQuota updates the quota and quota usage in the data base.
func ensureQuota(usages []ProjectUsage) error {
var pid int64
for _, usage := range usages {
project, err := dao.GetProjectByName(usage.Project)
if err != nil {
log.Error(err)
return err
}
pid = project.ProjectID
quotaMgr, err := quota.NewManager("project", strconv.FormatInt(pid, 10))
if err != nil {
log.Errorf("Error occurred when to new quota manager %v", err)
return err
}
if err := quotaMgr.EnsureQuota(usage.Used); err != nil {
log.Errorf("cannot ensure quota for the project: %d, err: %v", pid, err)
return err
}
}
return nil
}

View File

@ -0,0 +1,420 @@
// Copyright 2018 Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
import (
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
common_quota "github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/core/api"
quota "github.com/goharbor/harbor/src/core/api/quota"
"github.com/goharbor/harbor/src/core/promgr"
coreutils "github.com/goharbor/harbor/src/core/utils"
"github.com/pkg/errors"
"strings"
"sync"
"time"
)
// Migrator ...
type Migrator struct {
pm promgr.ProjectManager
}
// NewRegistryMigrator returns a new Migrator.
func NewRegistryMigrator(pm promgr.ProjectManager) quota.QuotaMigrator {
migrator := Migrator{
pm: pm,
}
return &migrator
}
// Dump ...
func (rm *Migrator) Dump() ([]quota.ProjectInfo, error) {
var (
projects []quota.ProjectInfo
wg sync.WaitGroup
err error
)
reposInRegistry, err := api.Catalog()
if err != nil {
return nil, err
}
// repoMap : map[project_name : []repo list]
repoMap := make(map[string][]string)
for _, item := range reposInRegistry {
projectName := strings.Split(item, "/")[0]
pro, err := rm.pm.Get(projectName)
if err != nil {
log.Errorf("failed to get project %s: %v", projectName, err)
continue
}
_, exist := repoMap[pro.Name]
if !exist {
repoMap[pro.Name] = []string{item}
} else {
repos := repoMap[pro.Name]
repos = append(repos, item)
repoMap[pro.Name] = repos
}
}
wg.Add(len(repoMap))
errChan := make(chan error, 1)
infoChan := make(chan interface{})
done := make(chan bool, 1)
go func() {
defer func() {
done <- true
}()
for {
select {
case result := <-infoChan:
if result == nil {
return
}
project, ok := result.(quota.ProjectInfo)
if ok {
projects = append(projects, project)
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "quota sync error on getting info of project")
} else {
err = errors.Wrap(e, err.Error())
}
}
}
}()
for project, repos := range repoMap {
go func(project string, repos []string) {
defer wg.Done()
info, err := infoOfProject(project, repos)
if err != nil {
errChan <- err
return
}
infoChan <- info
}(project, repos)
}
wg.Wait()
close(infoChan)
// wait for all of project info
<-done
if err != nil {
return nil, err
}
return projects, nil
}
// Usage ...
// registry needs to merge the shard blobs of different repositories.
func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, error) {
var pros []quota.ProjectUsage
for _, project := range projects {
var size, count int64
var blobs = make(map[string]int64)
// usage count
for _, repo := range project.Repos {
count = count + int64(len(repo.Afs))
// Because that there are some shared blobs between repositories, it needs to remove the duplicate items.
for _, blob := range repo.Blobs {
_, exist := blobs[blob.Digest]
if !exist {
blobs[blob.Digest] = blob.Size
}
}
}
// size
for _, item := range blobs {
size = size + item
}
proUsage := quota.ProjectUsage{
Project: project.Name,
Used: common_quota.ResourceList{
common_quota.ResourceCount: count,
common_quota.ResourceStorage: size,
},
}
pros = append(pros, proUsage)
}
return pros, nil
}
// Persist ...
func (rm *Migrator) Persist(projects []quota.ProjectInfo) error {
for _, project := range projects {
for _, repo := range project.Repos {
if err := persistAf(repo.Afs); err != nil {
return err
}
if err := persistAfnbs(repo.Afnbs); err != nil {
return err
}
if err := persistBlob(repo.Blobs); err != nil {
return err
}
}
}
if err := persistPB(projects); err != nil {
return err
}
return nil
}
func persistAf(afs []*models.Artifact) error {
if len(afs) != 0 {
for _, af := range afs {
_, err := dao.AddArtifact(af)
if err != nil {
if err == dao.ErrDupRows {
continue
}
log.Error(err)
return err
}
}
}
return nil
}
func persistAfnbs(afnbs []*models.ArtifactAndBlob) error {
if len(afnbs) != 0 {
for _, afnb := range afnbs {
_, err := dao.AddArtifactNBlob(afnb)
if err != nil {
if err == dao.ErrDupRows {
continue
}
log.Error(err)
return err
}
}
}
return nil
}
func persistBlob(blobs []*models.Blob) error {
if len(blobs) != 0 {
for _, blob := range blobs {
_, err := dao.AddBlob(blob)
if err != nil {
if err == dao.ErrDupRows {
continue
}
log.Error(err)
return err
}
}
}
return nil
}
func persistPB(projects []quota.ProjectInfo) error {
for _, project := range projects {
var blobs = make(map[string]int64)
var blobsOfPro []*models.Blob
for _, repo := range project.Repos {
for _, blob := range repo.Blobs {
_, exist := blobs[blob.Digest]
if exist {
continue
}
blobs[blob.Digest] = blob.Size
blobInDB, err := dao.GetBlob(blob.Digest)
if err != nil {
log.Error(err)
return err
}
if blobInDB != nil {
blobsOfPro = append(blobsOfPro, blobInDB)
}
}
}
pro, err := dao.GetProjectByName(project.Name)
if err != nil {
log.Error(err)
return err
}
_, err = dao.AddBlobsToProject(pro.ProjectID, blobsOfPro...)
if err != nil {
log.Error(err)
return err
}
}
return nil
}
func infoOfProject(project string, repoList []string) (quota.ProjectInfo, error) {
var (
repos []quota.RepoData
wg sync.WaitGroup
err error
)
wg.Add(len(repoList))
errChan := make(chan error, 1)
infoChan := make(chan interface{})
done := make(chan bool, 1)
pro, err := dao.GetProjectByName(project)
if err != nil {
log.Error(err)
return quota.ProjectInfo{}, err
}
go func() {
defer func() {
done <- true
}()
for {
select {
case result := <-infoChan:
if result == nil {
return
}
repoData, ok := result.(quota.RepoData)
if ok {
repos = append(repos, repoData)
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "quota sync error on getting info of repo")
} else {
err = errors.Wrap(e, err.Error())
}
}
}
}()
for _, repo := range repoList {
go func(pid int64, repo string) {
defer func() {
wg.Done()
}()
info, err := infoOfRepo(pid, repo)
if err != nil {
errChan <- err
return
}
infoChan <- info
}(pro.ProjectID, repo)
}
wg.Wait()
close(infoChan)
<-done
if err != nil {
return quota.ProjectInfo{}, err
}
return quota.ProjectInfo{
Name: project,
Repos: repos,
}, nil
}
func infoOfRepo(pid int64, repo string) (quota.RepoData, error) {
repoClient, err := coreutils.NewRepositoryClientForUI("harbor-core", repo)
if err != nil {
return quota.RepoData{}, err
}
tags, err := repoClient.ListTag()
if err != nil {
return quota.RepoData{}, err
}
var afnbs []*models.ArtifactAndBlob
var afs []*models.Artifact
var blobs []*models.Blob
for _, tag := range tags {
_, mediaType, payload, err := repoClient.PullManifest(tag, []string{
schema1.MediaTypeManifest,
schema1.MediaTypeSignedManifest,
schema2.MediaTypeManifest,
})
if err != nil {
log.Error(err)
return quota.RepoData{}, err
}
manifest, desc, err := registry.UnMarshal(mediaType, payload)
if err != nil {
log.Error(err)
return quota.RepoData{}, err
}
// self
afnb := &models.ArtifactAndBlob{
DigestAF: desc.Digest.String(),
DigestBlob: desc.Digest.String(),
}
afnbs = append(afnbs, afnb)
for _, layer := range manifest.References() {
afnb := &models.ArtifactAndBlob{
DigestAF: desc.Digest.String(),
DigestBlob: layer.Digest.String(),
}
afnbs = append(afnbs, afnb)
blob := &models.Blob{
Digest: layer.Digest.String(),
ContentType: layer.MediaType,
Size: layer.Size,
CreationTime: time.Now(),
}
blobs = append(blobs, blob)
}
af := &models.Artifact{
PID: pid,
Repo: strings.Split(repo, "/")[1],
Tag: tag,
Digest: desc.Digest.String(),
Kind: "Docker-Image",
CreationTime: time.Now(),
}
afs = append(afs, af)
}
return quota.RepoData{
Name: repo,
Afs: afs,
Afnbs: afnbs,
Blobs: blobs,
}, nil
}
func init() {
quota.Register("registry", NewRegistryMigrator)
}

View File

@ -38,7 +38,7 @@ func SyncRegistry(pm promgr.ProjectManager) error {
log.Infof("Start syncing repositories from registry to DB... ")
reposInRegistry, err := catalog()
reposInRegistry, err := Catalog()
if err != nil {
log.Error(err)
return err
@ -105,7 +105,8 @@ func SyncRegistry(pm promgr.ProjectManager) error {
return nil
}
func catalog() ([]string, error) {
// Catalog ...
func Catalog() ([]string, error) {
repositories := []string{}
rc, err := initRegistryClient()

View File

@ -34,6 +34,11 @@ import (
_ "github.com/goharbor/harbor/src/core/auth/db"
_ "github.com/goharbor/harbor/src/core/auth/ldap"
_ "github.com/goharbor/harbor/src/core/auth/uaa"
quota "github.com/goharbor/harbor/src/core/api/quota"
_ "github.com/goharbor/harbor/src/core/api/quota/chart"
_ "github.com/goharbor/harbor/src/core/api/quota/registry"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/filter"
"github.com/goharbor/harbor/src/core/middlewares"
@ -74,6 +79,32 @@ func updateInitPassword(userID int, password string) error {
return nil
}
// Quota migration
func quotaSync() error {
usages, err := dao.ListQuotaUsages()
if err != nil {
log.Errorf("list quota usage error, %v", err)
return err
}
projects, err := dao.GetProjects(nil)
if err != nil {
log.Errorf("list project error, %v", err)
return err
}
// upgrade from old version
if len(projects) > 1 && len(usages) == 1 {
log.Info("Start to sync quota data .....")
if err := quota.Sync(config.GlobalProjectMgr, true); err != nil {
log.Errorf("Error happened when syncing quota usage data, %v", err)
return err
}
log.Info("Success to sync quota data .....")
}
return nil
}
func gracefulShutdown(closing chan struct{}) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
@ -174,6 +205,9 @@ func main() {
log.Fatalf("init proxy error, %v", err)
}
// go proxy.StartProxy()
if err := quotaSync(); err != nil {
log.Fatalf("quota migration error, %v", err)
}
beego.Run()
}

View File

@ -135,6 +135,7 @@ func initRouters() {
beego.Router("/api/internal/syncregistry", &api.InternalAPI{}, "post:SyncRegistry")
beego.Router("/api/internal/renameadmin", &api.InternalAPI{}, "post:RenameAdmin")
beego.Router("/api/internal/switchquota", &api.InternalAPI{}, "put:SwitchQuota")
beego.Router("/api/internal/syncquota", &api.InternalAPI{}, "post:SyncQuota")
// external service that hosted on harbor process:
beego.Router("/service/notifications", &registry.NotificationHandler{})