diff --git a/make/photon/prepare/templates/jobservice/config.yml.jinja b/make/photon/prepare/templates/jobservice/config.yml.jinja index f7675130e..e41846519 100644 --- a/make/photon/prepare/templates/jobservice/config.yml.jinja +++ b/make/photon/prepare/templates/jobservice/config.yml.jinja @@ -44,3 +44,10 @@ job_loggers: loggers: - name: "STD_OUTPUT" # Same with above level: "{{level}}" + +{% if metric.enabled %} +metric: + enabled: true + path: {{ metric.path }} + port: {{ metric.port }} +{% endif %} diff --git a/make/photon/prepare/templates/jobservice/env.jinja b/make/photon/prepare/templates/jobservice/env.jinja index 7fcbb47ff..b6d7f2d65 100644 --- a/make/photon/prepare/templates/jobservice/env.jinja +++ b/make/photon/prepare/templates/jobservice/env.jinja @@ -20,3 +20,8 @@ HTTPS_PROXY={{jobservice_https_proxy}} NO_PROXY={{jobservice_no_proxy}} REGISTRY_CREDENTIAL_USERNAME={{registry_username}} REGISTRY_CREDENTIAL_PASSWORD={{registry_password}} + +{% if metric.enabled %} +METRIC_NAMESPACE=harbor +METRIC_SUBSYSTEM=jobservice +{% endif %} diff --git a/make/photon/prepare/templates/nginx/nginx.http.conf.jinja b/make/photon/prepare/templates/nginx/nginx.http.conf.jinja index e8d1184f4..8157c5b5e 100644 --- a/make/photon/prepare/templates/nginx/nginx.http.conf.jinja +++ b/make/photon/prepare/templates/nginx/nginx.http.conf.jinja @@ -206,6 +206,10 @@ http { server core:{{ metric.port }}; } + upstream js_metrics { + server jobservice:{{ metric.port }}; + } + upstream registry_metrics { server registry:{{ metric.port }}; } @@ -218,6 +222,7 @@ http { listen 9090; location = /metrics { if ($arg_comp = core) { proxy_pass http://core_metrics; } + if ($arg_comp = jobservice) { proxy_pass http://js_metrics; } if ($arg_comp = registry) { proxy_pass http://registry_metrics; } proxy_pass http://harbor_exporter; } diff --git a/make/photon/prepare/templates/nginx/nginx.https.conf.jinja b/make/photon/prepare/templates/nginx/nginx.https.conf.jinja index 76b4b94c2..7a9f34207 100644 --- a/make/photon/prepare/templates/nginx/nginx.https.conf.jinja +++ b/make/photon/prepare/templates/nginx/nginx.https.conf.jinja @@ -238,6 +238,10 @@ http { server core:{{ metric.port }}; } + upstream js_metrics { + server jobservice:{{ metric.port }}; + } + upstream registry_metrics { server registry:{{ metric.port }}; } @@ -250,6 +254,7 @@ http { listen 9090; location = {{ metric.path }} { if ($arg_comp = core) { proxy_pass http://core_metrics; } + if ($arg_comp = jobservice) { proxy_pass http://js_metrics; } if ($arg_comp = registry) { proxy_pass http://registry_metrics; } proxy_pass http://harbor_exporter; } diff --git a/make/photon/prepare/utils/jobservice.py b/make/photon/prepare/utils/jobservice.py index 4082af82f..133f047bc 100644 --- a/make/photon/prepare/utils/jobservice.py +++ b/make/photon/prepare/utils/jobservice.py @@ -33,4 +33,5 @@ def prepare_job_service(config_dict): internal_tls=config_dict['internal_tls'], max_job_workers=config_dict['max_job_workers'], redis_url=config_dict['redis_url_js'], - level=log_level) + level=log_level, + metric=config_dict['metric']) diff --git a/src/jobservice/config/config.go b/src/jobservice/config/config.go index 877b38ce3..edf87c05b 100644 --- a/src/jobservice/config/config.go +++ b/src/jobservice/config/config.go @@ -78,6 +78,9 @@ type Configuration struct { // Logger configurations LoggerConfigs []*LoggerConfig `yaml:"loggers,omitempty"` + + // Metric configurations + Metric *MetricConfig `yaml:"metric,omitempty"` } // HTTPSConfig keeps additional configurations when using https protocol @@ -104,6 +107,13 @@ type PoolConfig struct { RedisPoolCfg *RedisPoolConfig `yaml:"redis_pool,omitempty"` } +// MetricConfig used for configure metrics +type MetricConfig struct { + Enabled bool `yaml:"enabled"` + Path string `yaml:"path"` + Port int `yaml:"port"` +} + // CustomizedSettings keeps the customized settings of logger type CustomizedSettings map[string]interface{} diff --git a/src/jobservice/core/controller_test.go b/src/jobservice/core/controller_test.go index d8bc11bb9..072693bf7 100644 --- a/src/jobservice/core/controller_test.go +++ b/src/jobservice/core/controller_test.go @@ -14,6 +14,8 @@ package core import ( + "testing" + "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/job" @@ -23,7 +25,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "testing" ) // ControllerTestSuite tests functions of core controller @@ -229,6 +230,10 @@ func (suite *ControllerTestSuite) Start() error { return suite.worker.Start() } +func (suite *ControllerTestSuite) GetPoolID() string { + return suite.worker.GetPoolID() +} + func (suite *ControllerTestSuite) RegisterJobs(jobs map[string]interface{}) error { return suite.worker.RegisterJobs(jobs) } @@ -295,6 +300,10 @@ func (f *fakeWorker) Start() error { return f.Called().Error(0) } +func (f *fakeWorker) GetPoolID() string { + return f.Called().String() +} + func (f *fakeWorker) RegisterJobs(jobs map[string]interface{}) error { return f.Called(jobs).Error(0) } diff --git a/src/jobservice/runner/redis.go b/src/jobservice/runner/redis.go index 2355baf85..b35068bc7 100644 --- a/src/jobservice/runner/redis.go +++ b/src/jobservice/runner/redis.go @@ -27,6 +27,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/period" "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/metric" ) const ( @@ -56,10 +57,11 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { execContext job.Context tracker job.Tracker ) - // Track the running job now jID := j.ID + // used to instrument process time + now := time.Now() // Check if the job is a periodic one as periodic job has its own ID format if eID, yes := isPeriodicJobExecution(j); yes { jID = eID @@ -109,7 +111,8 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { if err != nil { // log error logger.Errorf("Job '%s:%s' exit with error: %s", j.Name, j.ID, err) - + metric.JobserviceTotalTask.WithLabelValues(j.Name, "fail").Inc() + metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "fail").Observe(time.Since(now).Seconds()) if er := tracker.Fail(); er != nil { logger.Errorf("Error occurred when marking the status of job %s:%s to failure: %s", j.Name, j.ID, er) } @@ -124,11 +127,14 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { } else { if latest == job.StoppedStatus { // Logged + metric.JobserviceTotalTask.WithLabelValues(j.Name, "stop").Inc() + metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "stop").Observe(time.Since(now).Seconds()) logger.Infof("Job %s:%s is stopped", j.Name, j.ID) return } } - + metric.JobserviceTotalTask.WithLabelValues(j.Name, "success").Inc() + metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "success").Observe(time.Since(now).Seconds()) // Mark job status to success. logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID) if er := tracker.Succeed(); er != nil { diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index 951e7a3d4..59314c412 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -44,6 +44,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/worker" "github.com/goharbor/harbor/src/jobservice/worker/cworker" "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/metric" redislib "github.com/goharbor/harbor/src/lib/redis" "github.com/goharbor/harbor/src/pkg/p2p/preheat" "github.com/goharbor/harbor/src/pkg/retention" @@ -61,6 +62,9 @@ const ( // JobService ... var JobService = &Bootstrap{} +// workerPoolID +var workerPoolID string + // Bootstrap is coordinating process to help load and start the other components to serve. type Bootstrap struct { jobContextInitializer job.ContextInitializer @@ -174,6 +178,8 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) // Initialize controller ctl := core.NewController(backendWorker, manager) + // Initialize Prometheus backend + go bs.createMetricServer(cfg) // Start the API server apiServer := bs.createAPIServer(ctx, cfg, ctl) @@ -205,6 +211,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) node := ctx.Value(utils.NodeID) // Blocking here logger.Infof("API server is serving at %d with [%s] mode at node [%s]", cfg.Port, cfg.Protocol, node) + metric.JobserviceInfo.WithLabelValues(node.(string), workerPoolID, fmt.Sprint(cfg.PoolConfig.WorkerCount)).Set(1) if er := apiServer.Start(); er != nil { if !terminated { // Tell the listening goroutine @@ -221,6 +228,14 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) return } +func (bs *Bootstrap) createMetricServer(cfg *config.Configuration) { + if cfg.Metric != nil && cfg.Metric.Enabled { + metric.RegisterJobServiceCollectors() + metric.ServeProm(cfg.Metric.Path, cfg.Metric.Port) + logger.Infof("Prom backend is serving at %s:%d", cfg.Metric.Path, cfg.Metric.Port) + } +} + // Load and run the API server. func (bs *Bootstrap) createAPIServer(ctx context.Context, cfg *config.Configuration, ctl core.Interface) *api.Server { // Initialized API server @@ -249,6 +264,8 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool( lcmCtl lcm.Controller, ) (worker.Interface, error) { redisWorker := cworker.NewWorker(ctx, ns, workers, redisPool, lcmCtl) + workerPoolID = redisWorker.GetPoolID() + // Register jobs here if err := redisWorker.RegisterJobs( map[string]interface{}{ @@ -274,11 +291,9 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool( // exit return nil, err } - if err := redisWorker.Start(); err != nil { return nil, err } - return redisWorker, nil } diff --git a/src/jobservice/worker/cworker/c_worker.go b/src/jobservice/worker/cworker/c_worker.go index 82cfd7887..df7ee987a 100644 --- a/src/jobservice/worker/cworker/c_worker.go +++ b/src/jobservice/worker/cworker/c_worker.go @@ -163,6 +163,12 @@ func (w *basicWorker) Start() error { return nil } +// GetPoolID returns the worker pool id +func (w *basicWorker) GetPoolID() string { + v := reflect.ValueOf(*w.pool) + return v.FieldByName("workerPoolID").String() +} + // RegisterJobs is used to register multiple jobs to worker. func (w *basicWorker) RegisterJobs(jobs map[string]interface{}) error { if jobs == nil || len(jobs) == 0 { diff --git a/src/jobservice/worker/interface.go b/src/jobservice/worker/interface.go index 776b2a0f2..064b3ca9d 100644 --- a/src/jobservice/worker/interface.go +++ b/src/jobservice/worker/interface.go @@ -32,6 +32,12 @@ type Interface interface { // error if failed to register RegisterJobs(jobs map[string]interface{}) error + // Get the worker pool ID + // + // Return: + // string : the pool ID + GetPoolID() string + // Enqueue job // // jobName string : the name of enqueuing job diff --git a/src/lib/metric/jobservice.go b/src/lib/metric/jobservice.go new file mode 100644 index 000000000..54112c3ce --- /dev/null +++ b/src/lib/metric/jobservice.go @@ -0,0 +1,49 @@ +package metric + +import ( + "os" + + "github.com/prometheus/client_golang/prometheus" +) + +// RegisterJobServiceCollectors ... +func RegisterJobServiceCollectors() { + prometheus.MustRegister([]prometheus.Collector{ + JobserviceInfo, + JobserviceTotalTask, + JobservieTaskProcessTimeSummary, + }...) +} + +var ( + // JobserviceInfo used for collect jobservice information + JobserviceInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: os.Getenv(NamespaceEnvKey), + Subsystem: os.Getenv(SubsystemEnvKey), + Name: "info", + Help: "the information of jobservice", + }, + []string{"node", "pool_id", "workers"}, + ) + // JobserviceTotalTask used for collect data + JobserviceTotalTask = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: os.Getenv(NamespaceEnvKey), + Subsystem: os.Getenv(SubsystemEnvKey), + Name: "task_total", + Help: "The total number of requests", + }, + []string{"type", "status"}, + ) + // JobservieTaskProcessTimeSummary used for instrument task running time + JobservieTaskProcessTimeSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: os.Getenv(NamespaceEnvKey), + Subsystem: os.Getenv(SubsystemEnvKey), + Name: "task_process_time_seconds", + Help: "The time duration of the task process time", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{"type", "status"}) +)