Merge remote-tracking branch 'upstream/job-service' into job-service

This commit is contained in:
Tan Jiang 2016-05-30 15:53:55 +08:00
commit e53d2d9900
8 changed files with 137 additions and 60 deletions

View File

@ -3,6 +3,11 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"strconv"
"github.com/vmware/harbor/api"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
@ -10,16 +15,15 @@ import (
"github.com/vmware/harbor/job/utils"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"io/ioutil"
"net/http"
"net/http/httputil"
"strconv"
)
// ReplicationJob handles /api/replicationJobs /api/replicationJobs/:id/log
// /api/replicationJobs/actions
type ReplicationJob struct {
api.BaseAPI
}
// ReplicationReq holds informations of request for /api/replicationJobs
type ReplicationReq struct {
PolicyID int64 `json:"policy_id"`
Repo string `json:"repository"`
@ -27,6 +31,7 @@ type ReplicationReq struct {
TagList []string `json:"tags"`
}
// Post ...
func (rj *ReplicationJob) Post() {
var data ReplicationReq
rj.DecodeJSONReq(&data)
@ -91,11 +96,13 @@ func (rj *ReplicationJob) addJob(repo string, policyID int64, operation string,
return nil
}
// RepActionReq holds informations of request for /api/replicationJobs/actions
type RepActionReq struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}
// HandleAction stops jobs of policy
func (rj *ReplicationJob) HandleAction() {
var data RepActionReq
rj.DecodeJSONReq(&data)
@ -118,6 +125,7 @@ func (rj *ReplicationJob) HandleAction() {
job.WorkerPool.StopJobs(jobIDList)
}
// GetLog gets log of a job
func (rj *ReplicationJob) GetLog() {
idStr := rj.Ctx.Input.Param(":id")
jid, err := strconv.ParseInt(idStr, 10, 64)

View File

@ -1,79 +1,119 @@
package api
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
// RepJobAPI handles request to /api/replicationJobs /api/replicationJobs/:id/log
type RepJobAPI struct {
BaseAPI
jobID int64
}
func (ja *RepJobAPI) Prepare() {
uid := ja.ValidateUser()
// Prepare validates that whether user has system admin role
func (ra *RepJobAPI) Prepare() {
uid := ra.ValidateUser()
isAdmin, err := dao.IsAdminRole(uid)
if err != nil {
log.Errorf("Failed to Check if the user is admin, error: %v, uid: %d", err, uid)
}
if !isAdmin {
ja.CustomAbort(http.StatusForbidden, "")
ra.CustomAbort(http.StatusForbidden, "")
}
idStr := ra.Ctx.Input.Param(":id")
if len(idStr) != 0 {
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
ra.CustomAbort(http.StatusBadRequest, "ID is invalid")
}
ra.jobID = id
}
}
func (ja *RepJobAPI) Get() {
policyID, err := ja.GetInt64("policy_id")
// Get ...
func (ra *RepJobAPI) Get() {
policyID, err := ra.GetInt64("policy_id")
if err != nil {
log.Errorf("Failed to get policy id, error: %v", err)
ja.RenderError(http.StatusBadRequest, "Invalid policy id")
ra.RenderError(http.StatusBadRequest, "Invalid policy id")
return
}
jobs, err := dao.GetRepJobByPolicy(policyID)
if err != nil {
log.Errorf("Failed to query job from db, error: %v", err)
ja.RenderError(http.StatusInternalServerError, "Failed to query job")
ra.RenderError(http.StatusInternalServerError, "Failed to query job")
return
}
ja.Data["json"] = jobs
ja.ServeJSON()
ra.Data["json"] = jobs
ra.ServeJSON()
}
// Delete ...
func (ra *RepJobAPI) Delete() {
if ra.jobID == 0 {
ra.CustomAbort(http.StatusBadRequest, "id is nil")
}
job, err := dao.GetRepJob(ra.jobID)
if err != nil {
log.Errorf("failed to get job %d: %v", ra.jobID, err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if job.Status == models.JobPending || job.Status == models.JobRunning {
ra.CustomAbort(http.StatusBadRequest, fmt.Sprintf("job is %s, can not be deleted", job.Status))
}
if err = dao.DeleteRepJob(ra.jobID); err != nil {
log.Errorf("failed to deleted job %d: %v", ra.jobID, err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
// GetLog ...
func (ja *RepJobAPI) GetLog() {
id := ja.Ctx.Input.Param(":id")
if len(id) == 0 {
ja.CustomAbort(http.StatusBadRequest, "id is nil")
func (ra *RepJobAPI) GetLog() {
if ra.jobID == 0 {
ra.CustomAbort(http.StatusBadRequest, "id is nil")
}
resp, err := http.Get(buildJobLogURL(id))
resp, err := http.Get(buildJobLogURL(strconv.FormatInt(ra.jobID, 10)))
if err != nil {
log.Errorf("failed to get log for job %s: %v", id, err)
ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
log.Errorf("failed to get log for job %d: %v", ra.jobID, err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if resp.StatusCode == http.StatusOK {
for key, values := range resp.Header {
for _, value := range values {
ra.Ctx.ResponseWriter.Header().Set(key, value)
}
}
if _, err = io.Copy(ra.Ctx.ResponseWriter, resp.Body); err != nil {
log.Errorf("failed to write log to response; %v", err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("failed to read response body for job %s: %v", id, err)
ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
log.Errorf("failed to read reponse body: %v", err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if resp.StatusCode == http.StatusOK {
ja.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Disposition"), "attachment; filename=replication_job.log")
ja.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), resp.Header.Get(http.CanonicalHeaderKey("Content-Type")))
ja.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(b)))
if _, err = ja.Ctx.ResponseWriter.Write(b); err != nil {
log.Errorf("failed to write log to response; %v", err)
ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
return
}
ja.CustomAbort(resp.StatusCode, string(b))
ra.CustomAbort(resp.StatusCode, string(b))
}
//TODO:add Post handler to call job service API to submit jobs by policy

View File

@ -11,12 +11,15 @@ import (
"github.com/vmware/harbor/utils/log"
)
// RepPolicyAPI handles /api/replicationPolicies /api/replicationPolicies/:id/enablement
type RepPolicyAPI struct {
BaseAPI
policyID int64
policy *models.RepPolicy
}
// Prepare validates whether the user has system admin role
// and parsed the policy ID if it exists
func (pa *RepPolicyAPI) Prepare() {
uid := pa.ValidateUser()
var err error
@ -92,6 +95,7 @@ type enablementReq struct {
Enabled int `json:"enabled"`
}
// UpdateEnablement changes the enablement of policy
func (pa *RepPolicyAPI) UpdateEnablement() {
e := enablementReq{}
pa.DecodeJSONReq(&e)

View File

@ -139,14 +139,7 @@ func (t *TargetAPI) Get() {
}
for _, target := range targets {
if len(target.Password) != 0 {
str, err := utils.ReversibleDecrypt(target.Password)
if err != nil {
log.Errorf("failed to decrypt password: %v", err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
target.Password = str
}
target.Password = ""
}
t.Data["json"] = targets
@ -177,7 +170,7 @@ func (t *TargetAPI) Get() {
t.ServeJSON()
}
// Post
// Post ...
func (t *TargetAPI) Post() {
target := &models.RepTarget{}
t.DecodeJSONReq(target)
@ -209,8 +202,8 @@ func (t *TargetAPI) Put() {
target := &models.RepTarget{}
t.DecodeJSONReq(target)
if target.ID != id {
t.CustomAbort(http.StatusBadRequest, "IDs mismatch")
if target.ID == 0 {
target.ID = id
}
if len(target.Password) != 0 {
@ -223,6 +216,7 @@ func (t *TargetAPI) Put() {
}
}
// Delete ...
func (t *TargetAPI) Delete() {
id := t.getIDFromURL()
if id == 0 {

View File

@ -150,6 +150,7 @@ func GetPoliciesByRepository(repository string) ([]*models.RepPolicy, error) {
return policies, nil
}
// TriggerReplicationByRepository triggers the replication according to the repository
func TriggerReplicationByRepository(repository string, tags []string, operation string) {
policies, err := GetPoliciesByRepository(repository)
if err != nil {

View File

@ -793,7 +793,7 @@ func TestAddRepPolicy(t *testing.T) {
t.Errorf("The data does not match, expected: Name: mypolicy, TargetID: %d, Enabled: 1, Description: whatever;\n result: Name: %s, TargetID: %d, Enabled: %d, Description: %s",
targetID, p.Name, p.TargetID, p.Enabled, p.Description)
}
var tm time.Time = time.Now().AddDate(0, 0, -1)
var tm = time.Now().AddDate(0, 0, -1)
if !p.StartTime.After(tm) {
t.Errorf("Unexpected start_time: %v", p.StartTime)
}
@ -871,9 +871,9 @@ func TestAddRepJob(t *testing.T) {
if err != nil {
t.Errorf("Error occurred in AddRepJob: %v", err)
return
} else {
jobID = id
}
jobID = id
j, err := GetRepJob(id)
if err != nil {
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, id)

View File

@ -9,10 +9,13 @@ import (
"github.com/vmware/harbor/models"
)
// AddRepTarget ...
func AddRepTarget(target models.RepTarget) (int64, error) {
o := orm.NewOrm()
return o.Insert(&target)
}
// GetRepTarget ...
func GetRepTarget(id int64) (*models.RepTarget, error) {
o := orm.NewOrm()
t := models.RepTarget{ID: id}
@ -22,18 +25,27 @@ func GetRepTarget(id int64) (*models.RepTarget, error) {
}
return &t, err
}
// DeleteRepTarget ...
func DeleteRepTarget(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepTarget{ID: id})
return err
}
// UpdateRepTarget ...
func UpdateRepTarget(target models.RepTarget) error {
o := orm.NewOrm()
_, err := o.Update(&target)
if len(target.Password) != 0 {
_, err := o.Update(&target)
return err
}
_, err := o.Update(&target, "URL", "Name", "Username")
return err
}
// GetAllRepTargets ...
func GetAllRepTargets() ([]*models.RepTarget, error) {
o := orm.NewOrm()
qs := o.QueryTable(&models.RepTarget{})
@ -42,6 +54,7 @@ func GetAllRepTargets() ([]*models.RepTarget, error) {
return targets, err
}
// AddRepPolicy ...
func AddRepPolicy(policy models.RepPolicy) (int64, error) {
o := orm.NewOrm()
sqlTpl := `insert into replication_policy (name, project_id, target_id, enabled, description, cron_str, start_time, creation_time, update_time ) values (?, ?, ?, ?, ?, ?, %s, NOW(), NOW())`
@ -62,6 +75,8 @@ func AddRepPolicy(policy models.RepPolicy) (int64, error) {
id, err := r.LastInsertId()
return id, err
}
// GetRepPolicy ...
func GetRepPolicy(id int64) (*models.RepPolicy, error) {
o := orm.NewOrm()
p := models.RepPolicy{ID: id}
@ -71,17 +86,23 @@ func GetRepPolicy(id int64) (*models.RepPolicy, error) {
}
return &p, err
}
// GetRepPolicyByProject ...
func GetRepPolicyByProject(projectID int64) ([]*models.RepPolicy, error) {
var res []*models.RepPolicy
o := orm.NewOrm()
_, err := o.QueryTable("replication_policy").Filter("project_id", projectID).All(&res)
return res, err
}
// DeleteRepPolicy ...
func DeleteRepPolicy(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepPolicy{ID: id})
return err
}
// UpdateRepPolicyEnablement ...
func UpdateRepPolicyEnablement(id int64, enabled int) error {
o := orm.NewOrm()
p := models.RepPolicy{
@ -91,14 +112,18 @@ func UpdateRepPolicyEnablement(id int64, enabled int) error {
return err
}
// EnableRepPolicy ...
func EnableRepPolicy(id int64) error {
return UpdateRepPolicyEnablement(id, 1)
}
// DisableRepPolicy ...
func DisableRepPolicy(id int64) error {
return UpdateRepPolicyEnablement(id, 0)
}
// AddRepJob ...
func AddRepJob(job models.RepJob) (int64, error) {
o := orm.NewOrm()
if len(job.Status) == 0 {
@ -110,6 +135,7 @@ func AddRepJob(job models.RepJob) (int64, error) {
return o.Insert(&job)
}
// GetRepJob ...
func GetRepJob(id int64) (*models.RepJob, error) {
o := orm.NewOrm()
j := models.RepJob{ID: id}
@ -121,6 +147,7 @@ func GetRepJob(id int64) (*models.RepJob, error) {
return &j, nil
}
// GetRepJobByPolicy ...
func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) {
var res []*models.RepJob
_, err := repJobPolicyIDQs(policyID).All(&res)
@ -141,11 +168,14 @@ func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
return o.QueryTable("replication_job").Filter("policy_id", policyID)
}
// DeleteRepJob ...
func DeleteRepJob(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepJob{ID: id})
return err
}
// UpdateRepJobStatus ...
func UpdateRepJobStatus(id int64, status string) error {
o := orm.NewOrm()
j := models.RepJob{

View File

@ -52,23 +52,23 @@ func initRouters() {
//API:
beego.Router("/api/search", &api.SearchAPI{})
beego.Router("/api/projects/:pid/members/?:mid", &api.ProjectMemberAPI{})
beego.Router("/api/projects/:pid([0-9]+)/members/?:mid([0-9]+)", &api.ProjectMemberAPI{})
beego.Router("/api/projects/", &api.ProjectAPI{}, "get:List")
beego.Router("/api/projects/?:id", &api.ProjectAPI{})
beego.Router("/api/projects/?:id([0-9]+)", &api.ProjectAPI{})
beego.Router("/api/statistics", &api.StatisticAPI{})
beego.Router("/api/projects/:id/logs/filter", &api.ProjectAPI{}, "post:FilterAccessLog")
beego.Router("/api/projects/:id([0-9]+)/logs/filter", &api.ProjectAPI{}, "post:FilterAccessLog")
beego.Router("/api/users", &api.UserAPI{})
beego.Router("/api/users/?:id", &api.UserAPI{})
beego.Router("/api/users/:id/password", &api.UserAPI{}, "put:ChangePassword")
beego.Router("/api/users/?:id([0-9]+)", &api.UserAPI{})
beego.Router("/api/users/:id([0-9]+)/password", &api.UserAPI{}, "put:ChangePassword")
beego.Router("/api/repositories", &api.RepositoryAPI{})
beego.Router("/api/repositories/tags", &api.RepositoryAPI{}, "get:GetTags")
beego.Router("/api/repositories/manifests", &api.RepositoryAPI{}, "get:GetManifests")
beego.Router("/api/replicationJobs", &api.RepJobAPI{})
beego.Router("/api/replicationJobs/:id/log", &api.RepJobAPI{}, "get:GetLog")
beego.Router("/api/replicationPolicies", &api.RepPolicyAPI{})
beego.Router("/api/replicationPolicies/:id/enablement", &api.RepPolicyAPI{}, "put:UpdateEnablement")
beego.Router("/api/targets/?:id", &api.TargetAPI{})
beego.Router("/api/target_ping", &api.TargetAPI{}, "get:Ping")
beego.Router("/api/jobs/replication/?:id([0-9]+)", &api.RepJobAPI{})
beego.Router("/api/jobs/replication/:id([0-9]+)/log", &api.RepJobAPI{}, "get:GetLog")
beego.Router("/api/policies/replication", &api.RepPolicyAPI{})
beego.Router("/api/policies/replication/:id([0-9]+)/enablement", &api.RepPolicyAPI{}, "put:UpdateEnablement")
beego.Router("/api/targets/?:id([0-9]+)", &api.TargetAPI{})
beego.Router("/api/targets/ping", &api.TargetAPI{}, "post:Ping")
//external service that hosted on harbor process:
beego.Router("/service/notifications", &service.NotificationHandler{})