1.add API for getting logs of job 2.trigger replication by creating policy or enabling policy 3.trigger replication by webhook

This commit is contained in:
Wenkai Yin 2016-05-25 15:25:16 +08:00
parent 00b2880cdd
commit 0b57345a4d
4 changed files with 149 additions and 3 deletions

View File

@ -1,9 +1,11 @@
package api
import (
"io/ioutil"
"net/http"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/utils/log"
"net/http"
)
type RepJobAPI struct {
@ -19,6 +21,7 @@ func (ja *RepJobAPI) Prepare() {
if !isAdmin {
ja.CustomAbort(http.StatusForbidden, "")
}
}
func (ja *RepJobAPI) Get() {
@ -38,4 +41,35 @@ func (ja *RepJobAPI) Get() {
ja.ServeJSON()
}
// GetLog ...
func (ja *RepJobAPI) GetLog() {
id := ja.Ctx.Input.Param(":id")
if len(id) == 0 {
ja.CustomAbort(http.StatusBadRequest, "id is nil")
}
resp, err := http.Get(buildJobLogURL(id))
if err != nil {
log.Errorf("failed to get log for job %s: %v", id, err)
ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("failed to read response body for job %s: %v", id, err)
ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if resp.StatusCode == http.StatusOK {
if _, err = ja.Ctx.ResponseWriter.Write(b); err != nil {
log.Errorf("failed to write log to response; %v", err)
ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
return
}
ja.CustomAbort(resp.StatusCode, string(b))
}
//TODO:add Post handler to call job service API to submit jobs by policy

View File

@ -2,11 +2,13 @@ package api
import (
"fmt"
"net/http"
"strconv"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"net/http"
"strconv"
)
type RepPolicyAPI struct {
@ -70,6 +72,15 @@ func (pa *RepPolicyAPI) Post() {
pa.RenderError(http.StatusInternalServerError, "Internal Error")
return
}
go func() {
if err := TriggerReplication(pid, "", models.RepOpTransfer); err != nil {
log.Errorf("failed to trigger replication of %d: %v", pid, err)
} else {
log.Infof("replication of %d triggered", pid)
}
}()
pa.Redirect(http.StatusCreated, strconv.FormatInt(pid, 10))
}
@ -94,4 +105,14 @@ func (pa *RepPolicyAPI) UpdateEnablement() {
pa.RenderError(http.StatusInternalServerError, "Internal Error")
return
}
if e.Enabled == 1 {
go func() {
if err := TriggerReplication(pa.policyID, "", models.RepOpTransfer); err != nil {
log.Errorf("failed to trigger replication of %d: %v", pa.policyID, err)
} else {
log.Infof("replication of %d triggered", pa.policyID)
}
}()
}
}

View File

@ -16,6 +16,13 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
@ -49,3 +56,84 @@ func checkUserExists(name string) int {
}
return 0
}
// TriggerReplication triggers the replication according to the policy
func TriggerReplication(policyID int64, repository, operation string) error {
data := struct {
PolicyID int64 `json:"policy_id"`
Repo string `json:"repository"`
Operation string `json:"operation"`
}{
PolicyID: policyID,
Repo: repository,
Operation: operation,
}
b, err := json.Marshal(data)
if err != nil {
return err
}
url := buildReplicationURL()
resp, err := http.DefaultClient.Post(url, "application/json", bytes.NewBuffer(b))
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("%d %s", resp.StatusCode, string(b))
}
// GetPoliciesByRepository returns policies according the repository
func GetPoliciesByRepository(repository string) ([]*models.RepPolicy, error) {
repository = strings.TrimSpace(repository)
repository = strings.TrimRight(repository, "/")
projectName := repository[:strings.LastIndex(repository, "/")]
project, err := dao.GetProjectByName(projectName)
if err != nil {
return nil, err
}
policies, err := dao.GetRepPolicyByProject(project.ProjectID)
if err != nil {
return nil, err
}
return policies, nil
}
func TriggerReplicationByRepository(repository, operation string) {
policies, err := GetPoliciesByRepository(repository)
if err != nil {
log.Errorf("failed to get policies for repository %s: %v", repository, err)
return
}
for _, policy := range policies {
if err := TriggerReplication(policy.ProjectID, repository, operation); err != nil {
log.Errorf("failed to trigger replication of %d for %s: %v", policy.ID, repository, err)
} else {
log.Infof("replication of %d for %s triggered", policy.ID, repository)
}
}
}
func buildReplicationURL() string {
return "http://job_service/api/replicationJobs"
}
func buildJobLogURL(jobID string) string {
return fmt.Sprintf("http://job_service/api/replicationJobs/%s/log", jobID)
}

View File

@ -20,6 +20,7 @@ import (
"regexp"
"strings"
"github.com/vmware/harbor/api"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
svc_utils "github.com/vmware/harbor/service/utils"
@ -77,6 +78,8 @@ func (n *NotificationHandler) Post() {
log.Errorf("Error happens when refreshing cache: %v", err2)
}
}()
go api.TriggerReplicationByRepository(repo, models.RepOpTransfer)
}
}
}