From 0b57345a4dfccd164ad22f566791b968447c1991 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 25 May 2016 15:25:16 +0800 Subject: [PATCH] 1.add API for getting logs of job 2.trigger replication by creating policy or enabling policy 3.trigger replication by webhook --- api/replication_job.go | 36 +++++++++++++++- api/replication_policy.go | 25 ++++++++++- api/utils.go | 88 +++++++++++++++++++++++++++++++++++++++ service/notification.go | 3 ++ 4 files changed, 149 insertions(+), 3 deletions(-) diff --git a/api/replication_job.go b/api/replication_job.go index 0c6f0f67b..c15e1f1f8 100644 --- a/api/replication_job.go +++ b/api/replication_job.go @@ -1,9 +1,11 @@ package api import ( + "io/ioutil" + "net/http" + "github.com/vmware/harbor/dao" "github.com/vmware/harbor/utils/log" - "net/http" ) type RepJobAPI struct { @@ -19,6 +21,7 @@ func (ja *RepJobAPI) Prepare() { if !isAdmin { ja.CustomAbort(http.StatusForbidden, "") } + } func (ja *RepJobAPI) Get() { @@ -38,4 +41,35 @@ func (ja *RepJobAPI) Get() { ja.ServeJSON() } +// GetLog ... +func (ja *RepJobAPI) GetLog() { + id := ja.Ctx.Input.Param(":id") + if len(id) == 0 { + ja.CustomAbort(http.StatusBadRequest, "id is nil") + } + + resp, err := http.Get(buildJobLogURL(id)) + if err != nil { + log.Errorf("failed to get log for job %s: %v", id, err) + ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) + } + + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Errorf("failed to read response body for job %s: %v", id, err) + ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) + } + + if resp.StatusCode == http.StatusOK { + if _, err = ja.Ctx.ResponseWriter.Write(b); err != nil { + log.Errorf("failed to write log to response; %v", err) + ja.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) + } + return + } + + ja.CustomAbort(resp.StatusCode, string(b)) +} + //TODO:add Post handler to call job service API to submit jobs by policy diff --git a/api/replication_policy.go b/api/replication_policy.go index 174e898f0..2e78100e8 100644 --- a/api/replication_policy.go +++ b/api/replication_policy.go @@ -2,11 +2,13 @@ package api import ( "fmt" + + "net/http" + "strconv" + "github.com/vmware/harbor/dao" "github.com/vmware/harbor/models" "github.com/vmware/harbor/utils/log" - "net/http" - "strconv" ) type RepPolicyAPI struct { @@ -70,6 +72,15 @@ func (pa *RepPolicyAPI) Post() { pa.RenderError(http.StatusInternalServerError, "Internal Error") return } + + go func() { + if err := TriggerReplication(pid, "", models.RepOpTransfer); err != nil { + log.Errorf("failed to trigger replication of %d: %v", pid, err) + } else { + log.Infof("replication of %d triggered", pid) + } + }() + pa.Redirect(http.StatusCreated, strconv.FormatInt(pid, 10)) } @@ -94,4 +105,14 @@ func (pa *RepPolicyAPI) UpdateEnablement() { pa.RenderError(http.StatusInternalServerError, "Internal Error") return } + + if e.Enabled == 1 { + go func() { + if err := TriggerReplication(pa.policyID, "", models.RepOpTransfer); err != nil { + log.Errorf("failed to trigger replication of %d: %v", pa.policyID, err) + } else { + log.Infof("replication of %d triggered", pa.policyID) + } + }() + } } diff --git a/api/utils.go b/api/utils.go index 826484ac3..92590c097 100644 --- a/api/utils.go +++ b/api/utils.go @@ -16,6 +16,13 @@ package api import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "github.com/vmware/harbor/dao" "github.com/vmware/harbor/models" "github.com/vmware/harbor/utils/log" @@ -49,3 +56,84 @@ func checkUserExists(name string) int { } return 0 } + +// TriggerReplication triggers the replication according to the policy +func TriggerReplication(policyID int64, repository, operation string) error { + data := struct { + PolicyID int64 `json:"policy_id"` + Repo string `json:"repository"` + Operation string `json:"operation"` + }{ + PolicyID: policyID, + Repo: repository, + Operation: operation, + } + + b, err := json.Marshal(data) + if err != nil { + return err + } + + url := buildReplicationURL() + + resp, err := http.DefaultClient.Post(url, "application/json", bytes.NewBuffer(b)) + if err != nil { + return err + } + + if resp.StatusCode == http.StatusOK { + return nil + } + + defer resp.Body.Close() + + b, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + return fmt.Errorf("%d %s", resp.StatusCode, string(b)) +} + +// GetPoliciesByRepository returns policies according the repository +func GetPoliciesByRepository(repository string) ([]*models.RepPolicy, error) { + repository = strings.TrimSpace(repository) + repository = strings.TrimRight(repository, "/") + projectName := repository[:strings.LastIndex(repository, "/")] + + project, err := dao.GetProjectByName(projectName) + if err != nil { + return nil, err + } + + policies, err := dao.GetRepPolicyByProject(project.ProjectID) + if err != nil { + return nil, err + } + + return policies, nil +} + +func TriggerReplicationByRepository(repository, operation string) { + policies, err := GetPoliciesByRepository(repository) + if err != nil { + log.Errorf("failed to get policies for repository %s: %v", repository, err) + return + } + + for _, policy := range policies { + if err := TriggerReplication(policy.ProjectID, repository, operation); err != nil { + log.Errorf("failed to trigger replication of %d for %s: %v", policy.ID, repository, err) + } else { + log.Infof("replication of %d for %s triggered", policy.ID, repository) + } + } +} + +func buildReplicationURL() string { + return "http://job_service/api/replicationJobs" +} + +func buildJobLogURL(jobID string) string { + return fmt.Sprintf("http://job_service/api/replicationJobs/%s/log", jobID) +} diff --git a/service/notification.go b/service/notification.go index 916b4901b..629bda987 100644 --- a/service/notification.go +++ b/service/notification.go @@ -20,6 +20,7 @@ import ( "regexp" "strings" + "github.com/vmware/harbor/api" "github.com/vmware/harbor/dao" "github.com/vmware/harbor/models" svc_utils "github.com/vmware/harbor/service/utils" @@ -77,6 +78,8 @@ func (n *NotificationHandler) Post() { log.Errorf("Error happens when refreshing cache: %v", err2) } }() + + go api.TriggerReplicationByRepository(repo, models.RepOpTransfer) } } }