fix slack rate limit issue (#11623)

Signed-off-by: peimingming <peimingming@corp.netease.com>
This commit is contained in:
mmpei 2020-04-21 11:44:58 +08:00 committed by GitHub
parent ff2a7e61c9
commit 4b6196a00d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 296 additions and 26 deletions

View File

@ -0,0 +1,41 @@
package notification
import (
"net/http"
commonhttp "github.com/goharbor/harbor/src/common/http"
)
const (
secure = "secure"
insecure = "insecure"
)
var (
httpHelper *HTTPHelper
)
// HTTPHelper in charge of sending notification messages to remote endpoint
type HTTPHelper struct {
clients map[string]*http.Client
}
func init() {
httpHelper = &HTTPHelper{
clients: map[string]*http.Client{},
}
httpHelper.clients[secure] = &http.Client{
Transport: commonhttp.GetHTTPTransport(commonhttp.SecureTransport),
}
httpHelper.clients[insecure] = &http.Client{
Transport: commonhttp.GetHTTPTransport(commonhttp.InsecureTransport),
}
}
// GetHTTPInstance ...
func GetHTTPInstance(insec bool) *http.Client {
if insec {
return httpHelper.clients[insecure]
}
return httpHelper.clients[secure]
}

View File

@ -0,0 +1,17 @@
package notification
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestHttpHelper(t *testing.T) {
c1 := GetHTTPInstance(true)
assert.NotNil(t, c1)
c2 := GetHTTPInstance(false)
assert.NotNil(t, c2)
_, ok := httpHelper.clients["notExists"]
assert.False(t, ok)
}

View File

@ -0,0 +1,128 @@
package notification
import (
"bytes"
"fmt"
"net/http"
"os"
"strconv"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/pkg/errors"
"reflect"
"time"
)
// SlackJob implements the job interface, which send notification to slack by slack incoming webhooks.
type SlackJob struct {
client *http.Client
logger logger.Interface
}
// MaxFails returns that how many times this job can fail.
func (sj *SlackJob) MaxFails() (result uint) {
// Default max fails count is 10, and its max retry interval is around 3h
// Large enough to ensure most situations can notify successfully
result = 10
if maxFails, exist := os.LookupEnv(maxFails); exist {
mf, err := strconv.ParseUint(maxFails, 10, 32)
if err != nil {
logger.Warningf("Fetch slack job maxFails error: %s", err.Error())
return result
}
result = uint(mf)
}
return result
}
// MaxCurrency is implementation of same method in Interface.
func (sj *SlackJob) MaxCurrency() uint {
return 1
}
// ShouldRetry ...
func (sj *SlackJob) ShouldRetry() bool {
return true
}
// Validate implements the interface in job/Interface
func (sj *SlackJob) Validate(params job.Parameters) error {
if params == nil {
// Params are required
return errors.New("missing parameter of slack job")
}
payload, ok := params["payload"]
if !ok {
return errors.Errorf("missing job parameter 'payload'")
}
_, ok = payload.(string)
if !ok {
return errors.Errorf("malformed job parameter 'payload', expecting string but got %s", reflect.TypeOf(payload).String())
}
address, ok := params["address"]
if !ok {
return errors.Errorf("missing job parameter 'address'")
}
_, ok = address.(string)
if !ok {
return errors.Errorf("malformed job parameter 'address', expecting string but got %s", reflect.TypeOf(address).String())
}
return nil
}
// Run implements the interface in job/Interface
func (sj *SlackJob) Run(ctx job.Context, params job.Parameters) error {
if err := sj.init(ctx, params); err != nil {
return err
}
err := sj.execute(params)
if err != nil {
sj.logger.Error(err)
}
// Wait a second for slack rate limit, refer to https://api.slack.com/docs/rate-limits
time.Sleep(time.Second)
return err
}
// init slack job
func (sj *SlackJob) init(ctx job.Context, params map[string]interface{}) error {
sj.logger = ctx.GetLogger()
// default use insecure transport
sj.client = GetHTTPInstance(true)
if v, ok := params["skip_cert_verify"]; ok {
if insecure, ok := v.(bool); ok {
if insecure {
sj.client = GetHTTPInstance(false)
}
}
}
return nil
}
// execute slack job
func (sj *SlackJob) execute(params map[string]interface{}) error {
payload := params["payload"].(string)
address := params["address"].(string)
req, err := http.NewRequest(http.MethodPost, address, bytes.NewReader([]byte(payload)))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := sj.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("slack job(target: %s) response code is %d", address, resp.StatusCode)
}
return nil
}

View File

@ -0,0 +1,83 @@
package notification
import (
"github.com/goharbor/harbor/src/jobservice/job"
mockjobservice "github.com/goharbor/harbor/src/testing/jobservice"
"github.com/stretchr/testify/assert"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
)
func TestSlackJobMaxFails(t *testing.T) {
rep := &SlackJob{}
// test default max fails
assert.Equal(t, uint(10), rep.MaxFails())
// test user defined max fails
_ = os.Setenv(maxFails, "15")
assert.Equal(t, uint(15), rep.MaxFails())
// test user defined wrong max fails
_ = os.Setenv(maxFails, "abc")
assert.Equal(t, uint(10), rep.MaxFails())
}
func TestSlackJobShouldRetry(t *testing.T) {
rep := &SlackJob{}
assert.True(t, rep.ShouldRetry())
}
func TestSlackJobValidate(t *testing.T) {
rep := &SlackJob{}
assert.NotNil(t, rep.Validate(nil))
jp := job.Parameters{
"address": "https://webhook.slack.com/hsdouihhsd988",
"payload": "slack payload",
}
assert.Nil(t, rep.Validate(jp))
}
func TestSlackJobRun(t *testing.T) {
ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
rep := &SlackJob{}
// test slack request
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
// test request method
assert.Equal(t, http.MethodPost, r.Method)
// test request body
assert.Equal(t, string(body), `{"key": "value"}`)
}))
defer ts.Close()
params := map[string]interface{}{
"skip_cert_verify": true,
"payload": `{"key": "value"}`,
"address": ts.URL,
}
// test correct slack response
assert.Nil(t, rep.Run(ctx, params))
tsWrong := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
}))
defer tsWrong.Close()
paramsWrong := map[string]interface{}{
"skip_cert_verify": true,
"payload": `{"key": "value"}`,
"address": tsWrong.URL,
}
// test incorrect slack response
assert.NotNil(t, rep.Run(ctx, paramsWrong))
}

View File

@ -7,7 +7,6 @@ import (
"os"
"strconv"
commonhttp "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
)
@ -23,18 +22,19 @@ type WebhookJob struct {
}
// MaxFails returns that how many times this job can fail, get this value from ctx.
func (wj *WebhookJob) MaxFails() uint {
if maxFails, exist := os.LookupEnv(maxFails); exist {
result, err := strconv.ParseUint(maxFails, 10, 32)
// Unable to log error message because the logger isn't initialized when calling this function.
if err == nil {
return uint(result)
}
}
func (wj *WebhookJob) MaxFails() (result uint) {
// Default max fails count is 10, and its max retry interval is around 3h
// Large enough to ensure most situations can notify successfully
return 10
result = 10
if maxFails, exist := os.LookupEnv(maxFails); exist {
mf, err := strconv.ParseUint(maxFails, 10, 32)
if err != nil {
logger.Warningf("Fetch webhook job maxFails error: %s", err.Error())
return result
}
result = uint(mf)
}
return result
}
// MaxCurrency is implementation of same method in Interface.
@ -58,9 +58,9 @@ func (wj *WebhookJob) Run(ctx job.Context, params job.Parameters) error {
return err
}
// does not throw err in the notification job
if err := wj.execute(ctx, params); err != nil {
wj.logger.Error(err)
return err
}
return nil
@ -72,18 +72,14 @@ func (wj *WebhookJob) init(ctx job.Context, params map[string]interface{}) error
wj.ctx = ctx
// default use insecure transport
tr := commonhttp.GetHTTPTransport(commonhttp.InsecureTransport)
wj.client = GetHTTPInstance(true)
if v, ok := params["skip_cert_verify"]; ok {
if insecure, ok := v.(bool); ok {
if insecure {
tr = commonhttp.GetHTTPTransport(commonhttp.SecureTransport)
wj.client = GetHTTPInstance(false)
}
}
}
wj.client = &http.Client{
Transport: tr,
}
return nil
}

View File

@ -76,5 +76,5 @@ func TestRun(t *testing.T) {
"auth_header": "auth_test",
}
// test incorrect webhook response
assert.Nil(t, rep.Run(ctx, paramsWrong))
assert.NotNil(t, rep.Run(ctx, paramsWrong))
}

View File

@ -32,6 +32,8 @@ const (
ReplicationScheduler = "IMAGE_REPLICATE"
// WebhookJob : the name of the webhook job in job service
WebhookJob = "WEBHOOK"
// SlackJob : the name of the slack job in job service
SlackJob = "SLACK"
// Retention : the name of the retention job
Retention = "RETENTION"
)

View File

@ -42,6 +42,8 @@ func (ps *defaultSampler) For(job string) uint {
// As an example, sample job has the lowest priority
case SampleJob:
return 1
case SlackJob:
return 1
// add more cases here if specified job priority is required
// case XXX:
// return 2000

View File

@ -47,4 +47,7 @@ func (suite *PrioritySamplerSuite) Test() {
p3 := suite.sampler.For(Replication)
suite.Equal(defaultPriority, p3, "Job priority for %s", Replication)
p4 := suite.sampler.For(SlackJob)
suite.Equal((uint)(1), p4, "Job priority for %s", SlackJob)
}

View File

@ -261,6 +261,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
job.Retention: (*retention.Job)(nil),
scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),
job.WebhookJob: (*notification.WebhookJob)(nil),
job.SlackJob: (*notification.SlackJob)(nil),
}); err != nil {
// exit
return nil, err

View File

@ -89,8 +89,8 @@ func (s *SlackHandler) process(event *model.HookEvent) error {
JobKind: job.KindGeneric,
},
}
// Create a webhookJob to send message to slack
j.Name = job.WebhookJob
// Create a slackJob to send message to slack
j.Name = job.SlackJob
// Convert payload to slack format
payload, err := s.convert(event.Payload)
@ -99,11 +99,8 @@ func (s *SlackHandler) process(event *model.HookEvent) error {
}
j.Parameters = map[string]interface{}{
"payload": payload,
"address": event.Target.Address,
// Users can define a auth header in http statement in notification(webhook) policy.
// So it will be sent in header in http request.
"auth_header": event.Target.AuthHeader,
"payload": payload,
"address": event.Target.Address,
"skip_cert_verify": event.Target.SkipCertVerify,
}
return notification.HookManager.StartHook(event, j)