Assign read-only privilege of replication policy to project admin and add stopping replication jobs API.

This commit is contained in:
Wenkai Yin 2017-12-16 21:52:29 +08:00
parent fa472823e8
commit 3be1d5a7fd
15 changed files with 349 additions and 50 deletions

View File

@ -1391,6 +1391,32 @@ paths:
description: User need to login first.
'500':
description: Unexpected internal errors.
put:
summary: Update status of jobs. Only stop is supported for now.
description: >
The endpoint is used to stop the replication jobs of a policy.
tags:
- Products
parameters:
- name: policyinfo
in: body
description: The policy ID and status.
required: true
schema:
$ref: '#/definitions/UpdateJobs'
responses:
'200':
description: Update the status successfully.
'400':
description: Bad request because of invalid parameters.
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'404':
description: Resource requested does not exist.
'500':
description: Unexpected internal errors.
/jobs/replication/{id}:
delete:
summary: Delete specific ID job.
@ -2414,9 +2440,22 @@ definitions:
kind:
type: string
description: The replication policy trigger kind. The valid values are manual, immediate and schedule.
param:
schedule_param:
$ref: '#/definitions/ScheduleParam'
ScheduleParam:
type: object
properties:
type:
type: string
description: The replication policy trigger parameters.
description: The schedule type. The valid values are daily and weekly.
weekday:
type: integer
format: int8
description: Optional, only used when the type is weedly. The valid values are 1-7.
offtime:
type: integer
format: int64
description: The time offset with the UTC 00:00 in seconds.
RepFilter:
type: object
properties:
@ -2927,5 +2966,12 @@ definitions:
description:
type: string
description: The description of the repository.
UpdateJobs:
type: object
properties:
policy_id:
type: integer
description: The ID of replication policy
status:
type: string
description: The status of jobs. The only valid value is stop for now.

View File

@ -17,12 +17,20 @@ package client
import (
"github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/http/modifier/auth"
"github.com/vmware/harbor/src/jobservice/api"
)
// Replication holds information for submiting a replication job
type Replication struct {
PolicyID int64 `json:"policy_id"`
Repository string `json:"repository"`
Operation string `json:"operation"`
Tags []string `json:"tags"`
}
// Client defines the methods that a jobservice client should implement
type Client interface {
SubmitReplicationJob(*api.ReplicationReq) error
SubmitReplicationJob(*Replication) error
StopReplicationJobs(policyID int64) error
}
// DefaultClient provides a default implement for the interface Client
@ -50,7 +58,19 @@ func NewDefaultClient(endpoint string, cfg *Config) *DefaultClient {
}
// SubmitReplicationJob submits a replication job to the jobservice
func (d *DefaultClient) SubmitReplicationJob(replication *api.ReplicationReq) error {
func (d *DefaultClient) SubmitReplicationJob(replication *Replication) error {
url := d.endpoint + "/api/jobs/replication"
return d.client.Post(url, replication)
}
// StopReplicationJobs stop replication jobs of the policy specified by the policy ID
func (d *DefaultClient) StopReplicationJobs(policyID int64) error {
url := d.endpoint + "/api/jobs/replication/actions"
return d.client.Post(url, &struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}{
PolicyID: policyID,
Action: "stop",
})
}

View File

@ -22,18 +22,37 @@ import (
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/jobservice/api"
)
var url string
func TestMain(m *testing.M) {
requestMapping := []*test.RequestHandlerMapping{
&test.RequestHandlerMapping{
Method: http.MethodPost,
Pattern: "/api/jobs/replication/actions",
Handler: func(w http.ResponseWriter, r *http.Request) {
action := &struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}{}
if err := json.NewDecoder(r.Body).Decode(action); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if action.PolicyID != 1 {
w.WriteHeader(http.StatusNotFound)
return
}
},
},
&test.RequestHandlerMapping{
Method: http.MethodPost,
Pattern: "/api/jobs/replication",
Handler: func(w http.ResponseWriter, r *http.Request) {
replication := &api.ReplicationReq{}
replication := &Replication{}
if err := json.NewDecoder(r.Body).Decode(replication); err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
@ -50,6 +69,18 @@ func TestMain(m *testing.M) {
func TestSubmitReplicationJob(t *testing.T) {
client := NewDefaultClient(url, &Config{})
err := client.SubmitReplicationJob(&api.ReplicationReq{})
err := client.SubmitReplicationJob(&Replication{})
assert.Nil(t, err)
}
func TestStopReplicationJobs(t *testing.T) {
client := NewDefaultClient(url, &Config{})
// 404
err := client.StopReplicationJobs(2)
assert.NotNil(t, err)
// 200
err = client.StopReplicationJobs(1)
assert.Nil(t, err)
}

View File

@ -20,7 +20,6 @@ import (
common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/api"
"github.com/vmware/harbor/src/jobservice/client"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
@ -63,7 +62,7 @@ type DefaultController struct {
//Keep controller as singleton instance
var (
GlobalController Controller = NewDefaultController(ControllerConfig{}) //Use default data
GlobalController Controller
)
//ControllerConfig includes related configurations required by the controller
@ -82,16 +81,17 @@ func NewDefaultController(cfg ControllerConfig) *DefaultController {
triggerManager: trigger.NewManager(cfg.CacheCapacity),
}
// TODO read from configuration
endpoint := "http://jobservice:8080"
ctl.replicator = replicator.NewDefaultReplicator(endpoint,
&client.Config{
Secret: config.UISecret(),
})
ctl.replicator = replicator.NewDefaultReplicator(config.GlobalJobserviceClient)
return ctl
}
// Init creates the GlobalController and inits it
func Init() error {
GlobalController = NewDefaultController(ControllerConfig{}) //Use default data
return GlobalController.Init()
}
//Init will initialize the controller and the sub components
func (ctl *DefaultController) Init() error {
if ctl.initialized {
@ -308,11 +308,11 @@ func replicate(replicator replicator.Replicator, policyID int64, candidates []mo
}
for repository, tags := range repositories {
replication := &api.ReplicationReq{
PolicyID: policyID,
Repo: repository,
Operation: operation,
TagList: tags,
replication := &client.Replication{
PolicyID: policyID,
Repository: repository,
Operation: operation,
Tags: tags,
}
log.Debugf("submiting replication job to jobservice: %v", replication)
if err := replicator.Replicate(replication); err != nil {

View File

@ -26,6 +26,7 @@ import (
)
func TestMain(m *testing.M) {
GlobalController = NewDefaultController(ControllerConfig{})
// set the policy manager used by GlobalController with a fake policy manager
controller := GlobalController.(*DefaultController)
controller.policyManager = &test.FakePolicyManager{}

View File

@ -15,13 +15,12 @@
package replicator
import (
"github.com/vmware/harbor/src/jobservice/api"
"github.com/vmware/harbor/src/jobservice/client"
)
// Replicator submits the replication work to the jobservice
type Replicator interface {
Replicate(*api.ReplicationReq) error
Replicate(*client.Replication) error
}
// DefaultReplicator provides a default implement for Replicator
@ -30,13 +29,13 @@ type DefaultReplicator struct {
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(endpoint string, cfg *client.Config) *DefaultReplicator {
func NewDefaultReplicator(client client.Client) *DefaultReplicator {
return &DefaultReplicator{
client: client.NewDefaultClient(endpoint, cfg),
client: client,
}
}
// Replicate ...
func (d *DefaultReplicator) Replicate(replication *api.ReplicationReq) error {
func (d *DefaultReplicator) Replicate(replication *client.Replication) error {
return d.client.SubmitReplicationJob(replication)
}

View File

@ -18,18 +18,20 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/jobservice/api"
"github.com/vmware/harbor/src/jobservice/client"
)
type fakeJobserviceClient struct{}
func (f *fakeJobserviceClient) SubmitReplicationJob(replication *api.ReplicationReq) error {
func (f *fakeJobserviceClient) SubmitReplicationJob(replication *client.Replication) error {
return nil
}
func (f *fakeJobserviceClient) StopReplicationJobs(policyID int64) error {
return nil
}
func TestReplicate(t *testing.T) {
replicator := NewDefaultReplicator("http://jobservice", &client.Config{})
replicator.client = &fakeJobserviceClient{}
assert.Nil(t, replicator.Replicate(&api.ReplicationReq{}))
replicator := NewDefaultReplicator(&fakeJobserviceClient{})
assert.Nil(t, replicator.Replicate(&client.Replication{}))
}

View File

@ -134,7 +134,7 @@ func init() {
_ = updateInitPassword(1, "Harbor12345")
if err := core.GlobalController.Init(); err != nil {
if err := core.Init(); err != nil {
log.Fatalf("failed to initialize GlobalController: %v", err)
}

View File

@ -0,0 +1,35 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"github.com/astaxie/beego/validation"
)
// StopJobsReq holds information needed to stop the jobs for a replication rule
type StopJobsReq struct {
PolicyID int64 `json:"policy_id"`
Status string `json:"status"`
}
// Valid ...
func (s *StopJobsReq) Valid(v *validation.Validation) {
if s.PolicyID <= 0 {
v.SetError("policy_id", "invalid value")
}
if s.Status != "stop" {
v.SetError("status", "invalid status, valid values: [stop]")
}
}

View File

@ -23,6 +23,9 @@ import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication/core"
api_models "github.com/vmware/harbor/src/ui/api/models"
"github.com/vmware/harbor/src/ui/config"
"github.com/vmware/harbor/src/ui/utils"
)
@ -40,7 +43,7 @@ func (ra *RepJobAPI) Prepare() {
return
}
if !ra.SecurityCtx.IsSysAdmin() {
if !(ra.Ctx.Request.Method == http.MethodGet || ra.SecurityCtx.IsSysAdmin()) {
ra.HandleForbidden(ra.SecurityCtx.GetUsername())
return
}
@ -63,16 +66,21 @@ func (ra *RepJobAPI) List() {
ra.CustomAbort(http.StatusBadRequest, "invalid policy_id")
}
policy, err := dao.GetRepPolicy(policyID)
policy, err := core.GlobalController.GetPolicy(policyID)
if err != nil {
log.Errorf("failed to get policy %d: %v", policyID, err)
ra.CustomAbort(http.StatusInternalServerError, "")
}
if policy == nil {
if policy.ID == 0 {
ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", policyID))
}
if !ra.SecurityCtx.HasAllPerm(policy.ProjectIDs[0]) {
ra.HandleForbidden(ra.SecurityCtx.GetUsername())
return
}
repository := ra.GetString("repository")
status := ra.GetString("status")
@ -145,12 +153,56 @@ func (ra *RepJobAPI) GetLog() {
if ra.jobID == 0 {
ra.CustomAbort(http.StatusBadRequest, "id is nil")
}
job, err := dao.GetRepJob(ra.jobID)
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to get replication job %d: %v", ra.jobID, err))
return
}
if job == nil {
ra.HandleNotFound(fmt.Sprintf("replication job %d not found", ra.jobID))
return
}
policy, err := core.GlobalController.GetPolicy(job.PolicyID)
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to get policy %d: %v", job.PolicyID, err))
return
}
if !ra.SecurityCtx.HasAllPerm(policy.ProjectIDs[0]) {
ra.HandleForbidden(ra.SecurityCtx.GetUsername())
return
}
url := buildJobLogURL(strconv.FormatInt(ra.jobID, 10), ReplicationJobType)
err := utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&ra.BaseAPI))
err = utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&ra.BaseAPI))
if err != nil {
ra.RenderError(http.StatusInternalServerError, err.Error())
return
}
}
// StopJobs stop replication jobs for the policy
func (ra *RepJobAPI) StopJobs() {
req := &api_models.StopJobsReq{}
ra.DecodeJSONReqAndValidate(req)
policy, err := core.GlobalController.GetPolicy(req.PolicyID)
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to get policy %d: %v", req.PolicyID, err))
return
}
if policy.ID == 0 {
ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID))
}
if err = config.GlobalJobserviceClient.StopReplicationJobs(req.PolicyID); err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to stop replication jobs of policy %d: %v", req.PolicyID, err))
return
}
}
//TODO:add Post handler to call job service API to submit jobs by policy

View File

@ -42,7 +42,7 @@ func (pa *RepPolicyAPI) Prepare() {
return
}
if !pa.SecurityCtx.IsSysAdmin() {
if !(pa.Ctx.Request.Method == http.MethodGet || pa.SecurityCtx.IsSysAdmin()) {
pa.HandleForbidden(pa.SecurityCtx.GetUsername())
return
}
@ -61,6 +61,11 @@ func (pa *RepPolicyAPI) Get() {
pa.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound))
}
if !pa.SecurityCtx.HasAllPerm(policy.ProjectIDs[0]) {
pa.HandleForbidden(pa.SecurityCtx.GetUsername())
return
}
ply, err := convertFromRepPolicy(pa.ProjectMgr, policy)
if err != nil {
pa.ParseAndHandleError(fmt.Sprintf("failed to convert from replication policy"), err)
@ -94,6 +99,9 @@ func (pa *RepPolicyAPI) List() {
}
for _, policy := range policies {
if !pa.SecurityCtx.HasAllPerm(policy.ProjectIDs[0]) {
continue
}
ply, err := convertFromRepPolicy(pa.ProjectMgr, policy)
if err != nil {
pa.ParseAndHandleError(fmt.Sprintf("failed to convert from replication policy"), err)

View File

@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/replication"
rep_models "github.com/vmware/harbor/src/replication/models"
@ -265,15 +266,28 @@ func TestRepPolicyAPIPost(t *testing.T) {
}
func TestRepPolicyAPIGet(t *testing.T) {
// 404
runCodeCheckingCases(t, &codeCheckingCase{
request: &testingRequest{
method: http.MethodGet,
url: fmt.Sprintf("%s/%d", repPolicyAPIBasePath, 10000),
credential: sysAdmin,
cases := []*codeCheckingCase{
// 404
&codeCheckingCase{
request: &testingRequest{
method: http.MethodGet,
url: fmt.Sprintf("%s/%d", repPolicyAPIBasePath, 10000),
credential: sysAdmin,
},
code: http.StatusNotFound,
},
code: http.StatusNotFound,
})
// 401
&codeCheckingCase{
request: &testingRequest{
method: http.MethodGet,
url: fmt.Sprintf("%s/%d", repPolicyAPIBasePath, policyID),
},
code: http.StatusUnauthorized,
},
}
runCodeCheckingCases(t, cases...)
// 200
policy := &api_models.ReplicationPolicy{}
@ -290,6 +304,39 @@ func TestRepPolicyAPIGet(t *testing.T) {
}
func TestRepPolicyAPIList(t *testing.T) {
projectAdmin := models.User{
Username: "project_admin",
Password: "ProjectAdmin",
Email: "project_admin@test.com",
}
projectDev := models.User{
Username: "project_dev",
Password: "ProjectDev",
Email: "project_dev@test.com",
}
proAdminID, err := dao.Register(projectAdmin)
if err != nil {
panic(err)
}
defer dao.DeleteUser(int(proAdminID))
if err = dao.AddProjectMember(1, int(proAdminID), models.PROJECTADMIN); err != nil {
panic(err)
}
defer dao.DeleteProjectMember(1, int(proAdminID))
proDevID, err := dao.Register(projectDev)
if err != nil {
panic(err)
}
defer dao.DeleteUser(int(proDevID))
if err = dao.AddProjectMember(1, int(proDevID), models.DEVELOPER); err != nil {
panic(err)
}
defer dao.DeleteProjectMember(1, int(proDevID))
// 400: invalid project ID
runCodeCheckingCases(t, &codeCheckingCase{
request: &testingRequest{
@ -305,7 +352,7 @@ func TestRepPolicyAPIList(t *testing.T) {
code: http.StatusBadRequest,
})
// 200
// 200 system admin
policies := []*api_models.ReplicationPolicy{}
resp, err := handleAndParse(
&testingRequest{
@ -326,6 +373,52 @@ func TestRepPolicyAPIList(t *testing.T) {
assert.Equal(t, policyID, policies[0].ID)
assert.Equal(t, policyName, policies[0].Name)
// 200 project admin
policies = []*api_models.ReplicationPolicy{}
resp, err = handleAndParse(
&testingRequest{
method: http.MethodGet,
url: repPolicyAPIBasePath,
queryStruct: struct {
ProjectID int64 `url:"project_id"`
Name string `url:"name"`
}{
ProjectID: projectID,
Name: policyName,
},
credential: &usrInfo{
Name: projectAdmin.Username,
Passwd: projectAdmin.Password,
},
}, &policies)
require.Nil(t, err)
assert.Equal(t, http.StatusOK, resp.Code)
require.Equal(t, 1, len(policies))
assert.Equal(t, policyID, policies[0].ID)
assert.Equal(t, policyName, policies[0].Name)
// 200 project developer
policies = []*api_models.ReplicationPolicy{}
resp, err = handleAndParse(
&testingRequest{
method: http.MethodGet,
url: repPolicyAPIBasePath,
queryStruct: struct {
ProjectID int64 `url:"project_id"`
Name string `url:"name"`
}{
ProjectID: projectID,
Name: policyName,
},
credential: &usrInfo{
Name: projectDev.Username,
Passwd: projectDev.Password,
},
}, &policies)
require.Nil(t, err)
assert.Equal(t, http.StatusOK, resp.Code)
require.Equal(t, 0, len(policies))
// 200
policies = []*api_models.ReplicationPolicy{}
resp, err = handleAndParse(

View File

@ -28,6 +28,7 @@ import (
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/secret"
"github.com/vmware/harbor/src/common/utils/log"
jobservice_client "github.com/vmware/harbor/src/jobservice/client"
"github.com/vmware/harbor/src/ui/promgr"
"github.com/vmware/harbor/src/ui/promgr/pmsdriver"
"github.com/vmware/harbor/src/ui/promgr/pmsdriver/admiral"
@ -54,6 +55,8 @@ var (
AdmiralClient *http.Client
// TokenReader is used in integration mode to read token
TokenReader admiral.TokenReader
// GlobalJobserviceClient is a global client for jobservice
GlobalJobserviceClient jobservice_client.Client
)
// Init configurations
@ -92,6 +95,11 @@ func InitByURL(adminServerURL string) error {
// init project manager based on deploy mode
initProjectManager()
GlobalJobserviceClient = jobservice_client.NewDefaultClient(InternalJobServiceURL(),
&jobservice_client.Config{
Secret: UISecret(),
})
return nil
}
@ -260,6 +268,10 @@ func InternalJobServiceURL() string {
return "http://jobservice"
}
if cfg[common.JobServiceURL] == nil {
return "http://jobservice"
}
return strings.TrimSuffix(cfg[common.JobServiceURL].(string), "/")
}

View File

@ -132,7 +132,7 @@ func main() {
notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)})
}
if err := core.GlobalController.Init(); err != nil {
if err := core.Init(); err != nil {
log.Errorf("failed to initialize the replication controller: %v", err)
}

View File

@ -101,7 +101,7 @@ func initRouters() {
beego.Router("/api/repositories/*/tags/:tag/manifest", &api.RepositoryAPI{}, "get:GetManifests")
beego.Router("/api/repositories/*/signatures", &api.RepositoryAPI{}, "get:GetSignatures")
beego.Router("/api/repositories/top", &api.RepositoryAPI{}, "get:GetTopRepos")
beego.Router("/api/jobs/replication/", &api.RepJobAPI{}, "get:List")
beego.Router("/api/jobs/replication/", &api.RepJobAPI{}, "get:List;put:StopJobs")
beego.Router("/api/jobs/replication/:id([0-9]+)", &api.RepJobAPI{})
beego.Router("/api/jobs/replication/:id([0-9]+)/log", &api.RepJobAPI{}, "get:GetLog")
beego.Router("/api/jobs/scan/:id([0-9]+)/log", &api.ScanJobAPI{}, "get:GetLog")