diff --git a/make/harbor.yml b/make/harbor.yml index 515ac72c5..347ef0c8c 100644 --- a/make/harbor.yml +++ b/make/harbor.yml @@ -64,6 +64,10 @@ jobservice: # Maximum number of job workers in job service max_job_workers: 10 +notification: + # Maximum retry count for webhook job + webhook_job_max_retry: 10 + chart: # Change the value of absolute_url to enabled can enable absolute url in chart absolute_url: disabled diff --git a/make/photon/prepare/templates/jobservice/env.jinja b/make/photon/prepare/templates/jobservice/env.jinja index 2f4923248..d9e32c521 100644 --- a/make/photon/prepare/templates/jobservice/env.jinja +++ b/make/photon/prepare/templates/jobservice/env.jinja @@ -1,3 +1,4 @@ CORE_SECRET={{core_secret}} JOBSERVICE_SECRET={{jobservice_secret}} CORE_URL={{core_url}} +JOBSERVICE_WEBHOOK_JOB_MAX_RETRY={{notification_webhook_job_max_retry}} diff --git a/make/photon/prepare/utils/configs.py b/make/photon/prepare/utils/configs.py index c2e8b41fc..c57856845 100644 --- a/make/photon/prepare/utils/configs.py +++ b/make/photon/prepare/utils/configs.py @@ -188,6 +188,9 @@ def parse_yaml_config(config_file_path): config_dict['max_job_workers'] = js_config["max_job_workers"] config_dict['jobservice_secret'] = generate_random_string(16) + # notification config + notification_config = configs.get('notification') or {} + config_dict['notification_webhook_job_max_retry'] = notification_config["webhook_job_max_retry"] # Log configs allowed_levels = ['debug', 'info', 'warning', 'error', 'fatal'] diff --git a/src/jobservice/job/impl/notification/webhook_job.go b/src/jobservice/job/impl/notification/webhook_job.go new file mode 100644 index 000000000..b8c56966b --- /dev/null +++ b/src/jobservice/job/impl/notification/webhook_job.go @@ -0,0 +1,99 @@ +package notification + +import ( + "bytes" + "fmt" + commonhttp "github.com/goharbor/harbor/src/common/http" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" + "net/http" + "os" + "strconv" +) + +// Max retry has the same meaning as max fails. +const maxFails = "JOBSERVICE_WEBHOOK_JOB_MAX_RETRY" + +// WebhookJob implements the job interface, which send notification by http or https. +type WebhookJob struct { + client *http.Client + logger logger.Interface + ctx job.Context +} + +// MaxFails returns that how many times this job can fail, get this value from ctx. +func (wj *WebhookJob) MaxFails() uint { + if maxFails, exist := os.LookupEnv(maxFails); exist { + result, err := strconv.ParseUint(maxFails, 10, 32) + // Unable to log error message because the logger isn't initialized when calling this function. + if err == nil { + return uint(result) + } + } + + // Default max fails count is 10, and its max retry interval is around 3h + // Large enough to ensure most situations can notify successfully + return 10 +} + +// ShouldRetry ... +func (wj *WebhookJob) ShouldRetry() bool { + return true +} + +// Validate implements the interface in job/Interface +func (wj *WebhookJob) Validate(params job.Parameters) error { + return nil +} + +// Run implements the interface in job/Interface +func (wj *WebhookJob) Run(ctx job.Context, params job.Parameters) error { + if err := wj.init(ctx, params); err != nil { + return err + } + + return wj.execute(ctx, params) +} + +// init webhook job +func (wj *WebhookJob) init(ctx job.Context, params map[string]interface{}) error { + wj.logger = ctx.GetLogger() + wj.ctx = ctx + + // default insecureSkipVerify is false + insecureSkipVerify := false + if v, ok := params["skip_cert_verify"]; ok { + insecureSkipVerify = v.(bool) + } + wj.client = &http.Client{ + Transport: commonhttp.GetHTTPTransport(insecureSkipVerify), + } + + return nil +} + +// execute webhook job +func (wj *WebhookJob) execute(ctx job.Context, params map[string]interface{}) error { + payload := params["payload"].(string) + address := params["address"].(string) + + req, err := http.NewRequest(http.MethodPost, address, bytes.NewReader([]byte(payload))) + if err != nil { + return err + } + if v, ok := params["auth_header"]; ok && len(v.(string)) > 0 { + req.Header.Set("Authorization", v.(string)) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := wj.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("webhook job(target: %s) response code is %d", address, resp.StatusCode) + } + + return nil +} diff --git a/src/jobservice/job/impl/notification/webhook_job_test.go b/src/jobservice/job/impl/notification/webhook_job_test.go new file mode 100644 index 000000000..d5a1db69a --- /dev/null +++ b/src/jobservice/job/impl/notification/webhook_job_test.go @@ -0,0 +1,75 @@ +package notification + +import ( + "github.com/goharbor/harbor/src/jobservice/job/impl" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "testing" +) + +func TestMaxFails(t *testing.T) { + rep := &WebhookJob{} + // test default max fails + assert.Equal(t, uint(10), rep.MaxFails()) + + // test user defined max fails + _ = os.Setenv(maxFails, "15") + assert.Equal(t, uint(15), rep.MaxFails()) + + // test user defined wrong max fails + _ = os.Setenv(maxFails, "abc") + assert.Equal(t, uint(10), rep.MaxFails()) +} + +func TestShouldRetry(t *testing.T) { + rep := &WebhookJob{} + assert.True(t, rep.ShouldRetry()) +} + +func TestValidate(t *testing.T) { + rep := &WebhookJob{} + assert.Nil(t, rep.Validate(nil)) +} + +func TestRun(t *testing.T) { + rep := &WebhookJob{} + + // test webhook request + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := ioutil.ReadAll(r.Body) + + // test request method + assert.Equal(t, http.MethodPost, r.Method) + // test request header + assert.Equal(t, "auth_test", r.Header.Get("Authorization")) + // test request body + assert.Equal(t, string(body), `{"key": "value"}`) + })) + defer ts.Close() + params := map[string]interface{}{ + "skip_cert_verify": true, + "payload": `{"key": "value"}`, + "address": ts.URL, + "auth_header": "auth_test", + } + // test correct webhook response + assert.Nil(t, rep.Run(&impl.Context{}, params)) + + tsWrong := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + })) + defer tsWrong.Close() + paramsWrong := map[string]interface{}{ + "skip_cert_verify": true, + "payload": `{"key": "value"}`, + "address": tsWrong.URL, + "auth_header": "auth_test", + } + // test incorrect webhook response + assert.NotNil(t, rep.Run(&impl.Context{}, paramsWrong)) +} diff --git a/src/jobservice/job/known_jobs.go b/src/jobservice/job/known_jobs.go index 60baa4ff9..307141e2d 100644 --- a/src/jobservice/job/known_jobs.go +++ b/src/jobservice/job/known_jobs.go @@ -30,6 +30,8 @@ const ( Replication = "REPLICATION" // ReplicationScheduler : the name of the replication scheduler job in job service ReplicationScheduler = "IMAGE_REPLICATE" + // WebhookJob : the name of the webhook job in job service + WebhookJob = "WEBHOOK" // Retention : the name of the retention job Retention = "RETENTION" ) diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index eb645c623..88dac6081 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -33,6 +33,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/hook" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job/impl/gc" + "github.com/goharbor/harbor/src/jobservice/job/impl/notification" "github.com/goharbor/harbor/src/jobservice/job/impl/replication" "github.com/goharbor/harbor/src/jobservice/job/impl/sample" "github.com/goharbor/harbor/src/jobservice/job/impl/scan" @@ -248,6 +249,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool( job.ReplicationScheduler: (*replication.Scheduler)(nil), job.Retention: (*retention.Job)(nil), scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil), + job.WebhookJob: (*notification.WebhookJob)(nil), }); err != nil { // exit return nil, err