Merge pull request #4638 from vmware/use_redis_url_addr

Use redis URL address to replace host:port when connecting to redis server
This commit is contained in:
Steven Zou 2018-04-17 10:12:51 +08:00 committed by GitHub
commit ca8d3bdcc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 101 additions and 47 deletions

View File

@ -13,13 +13,13 @@ port: 8080
#Worker pool #Worker pool
worker_pool: worker_pool:
#Worker concurrency #Worker concurrency
workers: 50 workers: $max_job_workers
backend: "redis" backend: "redis"
#Additional config if use 'redis' backend #Additional config if use 'redis' backend
#TODO: switch to internal redis endpoint and namespace.
redis_pool: redis_pool:
host: "redis" #redis://[arbitrary_username:password@]ipaddress:port/database_index
port: 6379 #or ipaddress:port[,weight,password,database_index]
redis_url: $redis_url
namespace: "harbor_job_service_namespace" namespace: "harbor_job_service_namespace"
#Logger for job #Logger for job
logger: logger:

View File

@ -11,7 +11,7 @@ hostname = reg.mydomain.com
ui_url_protocol = http ui_url_protocol = http
#Maximum number of job workers in job service #Maximum number of job workers in job service
max_job_workers = 3 max_job_workers = 50
#Determine whether or not to generate certificate for the registry's token. #Determine whether or not to generate certificate for the registry's token.
#If the value is on, the prepare script creates new root cert and private key #If the value is on, the prepare script creates new root cert and private key
@ -141,7 +141,8 @@ db_user = root
##### End of Harbor DB configuration####### ##### End of Harbor DB configuration#######
#The redis server address. Only needed in HA installation. #The redis server address. Only needed in HA installation.
redis_url = #address:port[,weight,password,db_index]
redis_url = redis:6379
##########Clair DB configuration############ ##########Clair DB configuration############

View File

@ -413,13 +413,17 @@ render(os.path.join(templates_dir, "jobservice", "env"),
jobservice_secret=jobservice_secret, jobservice_secret=jobservice_secret,
adminserver_url=adminserver_url) adminserver_url=adminserver_url)
render(os.path.join(templates_dir, "jobservice", "config.yml"),
jobservice_conf,
max_job_workers=max_job_workers,
redis_url=redis_url)
render(os.path.join(templates_dir, "log", "logrotate.conf"), render(os.path.join(templates_dir, "log", "logrotate.conf"),
log_rotate_config, log_rotate_config,
log_rotate_count=log_rotate_count, log_rotate_count=log_rotate_count,
log_rotate_size=log_rotate_size) log_rotate_size=log_rotate_size)
print("Generated configuration file: %s" % jobservice_conf) print("Generated configuration file: %s" % jobservice_conf)
shutil.copyfile(os.path.join(templates_dir, "jobservice", "config.yml"), jobservice_conf)
print("Generated configuration file: %s" % ui_conf) print("Generated configuration file: %s" % ui_conf)
shutil.copyfile(os.path.join(templates_dir, "ui", "app.conf"), ui_conf) shutil.copyfile(os.path.join(templates_dir, "ui", "app.conf"), ui_conf)

View File

@ -17,8 +17,9 @@ worker_pool:
backend: "redis" backend: "redis"
#Additional config if use 'redis' backend #Additional config if use 'redis' backend
redis_pool: redis_pool:
host: "10.160.178.186" #redis://[arbitrary_username:password@]ipaddress:port/database_index
port: 6379 #or ipaddress:port[,weight,password,database_index]
redis_url: "redis:6379"
namespace: "harbor_job_service" namespace: "harbor_job_service"
#Logger for job #Logger for job

View File

@ -22,8 +22,7 @@ const (
jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY" jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY"
jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND" jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND"
jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS" jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS"
jobServiceRedisHost = "JOB_SERVICE_POOL_REDIS_HOST" jobServiceRedisURL = "JOB_SERVICE_POOL_REDIS_URL"
jobServiceRedisPort = "JOB_SERVICE_POOL_REDIS_PORT"
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE" jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH" jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH"
jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL" jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL"
@ -41,6 +40,9 @@ const (
//secret of UI //secret of UI
uiAuthSecret = "UI_SECRET" uiAuthSecret = "UI_SECRET"
//redis protocol schema
redisSchema = "redis://"
) )
//DefaultConfig is the default configuration reference //DefaultConfig is the default configuration reference
@ -74,14 +76,13 @@ type HTTPSConfig struct {
//RedisPoolConfig keeps redis pool info. //RedisPoolConfig keeps redis pool info.
type RedisPoolConfig struct { type RedisPoolConfig struct {
Host string `yaml:"host"` RedisURL string `yaml:"redis_url"`
Port uint `yaml:"port"`
Namespace string `yaml:"namespace"` Namespace string `yaml:"namespace"`
} }
//PoolConfig keeps worker pool configurations. //PoolConfig keeps worker pool configurations.
type PoolConfig struct { type PoolConfig struct {
//0 means unlimited //Worker concurrency
WorkerCount uint `yaml:"workers"` WorkerCount uint `yaml:"workers"`
Backend string `yaml:"backend"` Backend string `yaml:"backend"`
RedisPoolCfg *RedisPoolConfig `yaml:"redis_pool,omitempty"` RedisPoolCfg *RedisPoolConfig `yaml:"redis_pool,omitempty"`
@ -118,6 +119,22 @@ func (c *Configuration) Load(yamlFilePath string, detectEnv bool) error {
c.loadEnvs() c.loadEnvs()
} }
//translate redis url if needed
if c.PoolConfig != nil && c.PoolConfig.RedisPoolCfg != nil {
redisAddress := c.PoolConfig.RedisPoolCfg.RedisURL
if !utils.IsEmptyStr(redisAddress) {
if _, err := url.Parse(redisAddress); err != nil {
if redisURL, ok := utils.TranslateRedisAddress(redisAddress); ok {
c.PoolConfig.RedisPoolCfg.RedisURL = redisURL
}
} else {
if !strings.HasPrefix(redisAddress, redisSchema) {
c.PoolConfig.RedisPoolCfg.RedisURL = fmt.Sprintf("%s%s", redisSchema, redisAddress)
}
}
}
}
//Validate settings //Validate settings
return c.validate() return c.validate()
} }
@ -222,22 +239,12 @@ func (c *Configuration) loadEnvs() {
} }
if c.PoolConfig != nil && c.PoolConfig.Backend == JobServicePoolBackendRedis { if c.PoolConfig != nil && c.PoolConfig.Backend == JobServicePoolBackendRedis {
rh := utils.ReadEnv(jobServiceRedisHost) redisURL := utils.ReadEnv(jobServiceRedisURL)
if !utils.IsEmptyStr(rh) { if !utils.IsEmptyStr(redisURL) {
if c.PoolConfig.RedisPoolCfg == nil { if c.PoolConfig.RedisPoolCfg == nil {
c.PoolConfig.RedisPoolCfg = &RedisPoolConfig{} c.PoolConfig.RedisPoolCfg = &RedisPoolConfig{}
} }
c.PoolConfig.RedisPoolCfg.Host = rh c.PoolConfig.RedisPoolCfg.RedisURL = redisURL
}
rp := utils.ReadEnv(jobServiceRedisPort)
if !utils.IsEmptyStr(rp) {
if rport, err := strconv.Atoi(rp); err == nil {
if c.PoolConfig.RedisPoolCfg == nil {
c.PoolConfig.RedisPoolCfg = &RedisPoolConfig{}
}
c.PoolConfig.RedisPoolCfg.Port = uint(rport)
}
} }
rn := utils.ReadEnv(jobServiceRedisNamespace) rn := utils.ReadEnv(jobServiceRedisNamespace)
@ -321,12 +328,18 @@ func (c *Configuration) validate() error {
if c.PoolConfig.RedisPoolCfg == nil { if c.PoolConfig.RedisPoolCfg == nil {
return fmt.Errorf("redis pool must be configured when backend is set to '%s'", c.PoolConfig.Backend) return fmt.Errorf("redis pool must be configured when backend is set to '%s'", c.PoolConfig.Backend)
} }
if utils.IsEmptyStr(c.PoolConfig.RedisPoolCfg.Host) { if utils.IsEmptyStr(c.PoolConfig.RedisPoolCfg.RedisURL) {
return errors.New("host of redis pool is empty") return errors.New("URL of redis pool is empty")
} }
if !utils.IsValidPort(c.PoolConfig.RedisPoolCfg.Port) {
return fmt.Errorf("redis port number should be a none zero integer and less or equal 65535, but current is %d", c.PoolConfig.RedisPoolCfg.Port) if !strings.HasPrefix(c.PoolConfig.RedisPoolCfg.RedisURL, redisSchema) {
return errors.New("Invalid redis URL")
} }
if _, err := url.Parse(c.PoolConfig.RedisPoolCfg.RedisURL); err != nil {
return fmt.Errorf("Invalid redis URL: %s", err.Error())
}
if utils.IsEmptyStr(c.PoolConfig.RedisPoolCfg.Namespace) { if utils.IsEmptyStr(c.PoolConfig.RedisPoolCfg.Namespace) {
return errors.New("namespace of redis pool is required") return errors.New("namespace of redis pool is required")
} }

View File

@ -48,11 +48,8 @@ func TestConfigLoadingWithEnv(t *testing.T) {
if cfg.PoolConfig.WorkerCount != 8 { if cfg.PoolConfig.WorkerCount != 8 {
t.Fatalf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount) t.Fatalf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount)
} }
if cfg.PoolConfig.RedisPoolCfg.Host != "localhost" { if cfg.PoolConfig.RedisPoolCfg.RedisURL != "redis://arbitrary_username:password@8.8.8.8:6379/0" {
t.Fatalf("expect redis host 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Host) t.Fatalf("expect redis URL 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.RedisURL)
}
if cfg.PoolConfig.RedisPoolCfg.Port != 7379 {
t.Fatalf("expect redis port '7379' but got '%d'\n", cfg.PoolConfig.RedisPoolCfg.Port)
} }
if cfg.PoolConfig.RedisPoolCfg.Namespace != "ut_namespace" { if cfg.PoolConfig.RedisPoolCfg.Namespace != "ut_namespace" {
t.Fatalf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace) t.Fatalf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace)
@ -98,6 +95,11 @@ func TestDefaultConfig(t *testing.T) {
t.Fatalf("expect default log archive period 1 but got '%d'\n", period) t.Fatalf("expect default log archive period 1 but got '%d'\n", period)
} }
redisURL := DefaultConfig.PoolConfig.RedisPoolCfg.RedisURL
if redisURL != "redis://redis:6379" {
t.Fatalf("expect redisURL '%s' but got '%s'\n", "redis://redis:6379", redisURL)
}
if err := RemoveLogDir(); err != nil { if err := RemoveLogDir(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -110,8 +112,7 @@ func setENV() {
os.Setenv("JOB_SERVICE_HTTPS_KEY", "../server.key") os.Setenv("JOB_SERVICE_HTTPS_KEY", "../server.key")
os.Setenv("JOB_SERVICE_POOL_BACKEND", "redis") os.Setenv("JOB_SERVICE_POOL_BACKEND", "redis")
os.Setenv("JOB_SERVICE_POOL_WORKERS", "8") os.Setenv("JOB_SERVICE_POOL_WORKERS", "8")
os.Setenv("JOB_SERVICE_POOL_REDIS_HOST", "localhost") os.Setenv("JOB_SERVICE_POOL_REDIS_URL", "8.8.8.8:6379,100,password,0")
os.Setenv("JOB_SERVICE_POOL_REDIS_PORT", "7379")
os.Setenv("JOB_SERVICE_POOL_REDIS_NAMESPACE", "ut_namespace") os.Setenv("JOB_SERVICE_POOL_REDIS_NAMESPACE", "ut_namespace")
os.Setenv("JOB_SERVICE_LOGGER_BASE_PATH", "/tmp") os.Setenv("JOB_SERVICE_LOGGER_BASE_PATH", "/tmp")
os.Setenv("JOB_SERVICE_LOGGER_LEVEL", "DEBUG") os.Setenv("JOB_SERVICE_LOGGER_LEVEL", "DEBUG")
@ -125,8 +126,7 @@ func unsetENV() {
os.Unsetenv("JOB_SERVICE_HTTPS_KEY") os.Unsetenv("JOB_SERVICE_HTTPS_KEY")
os.Unsetenv("JOB_SERVICE_POOL_BACKEND") os.Unsetenv("JOB_SERVICE_POOL_BACKEND")
os.Unsetenv("JOB_SERVICE_POOL_WORKERS") os.Unsetenv("JOB_SERVICE_POOL_WORKERS")
os.Unsetenv("JOB_SERVICE_POOL_REDIS_HOST") os.Unsetenv("JOB_SERVICE_POOL_REDIS_URL")
os.Unsetenv("JOB_SERVICE_POOL_REDIS_PORT")
os.Unsetenv("JOB_SERVICE_POOL_REDIS_NAMESPACE") os.Unsetenv("JOB_SERVICE_POOL_REDIS_NAMESPACE")
os.Unsetenv("JOB_SERVICE_LOGGER_BASE_PATH") os.Unsetenv("JOB_SERVICE_LOGGER_BASE_PATH")
os.Unsetenv("JOB_SERVICE_LOGGER_LEVEL") os.Unsetenv("JOB_SERVICE_LOGGER_LEVEL")

View File

@ -12,13 +12,14 @@ port: 9443
#Worker pool #Worker pool
worker_pool: worker_pool:
#0 means unlimited #Worker concurrency
workers: 10 workers: 10
backend: "redis" backend: "redis"
#Additional config if use 'redis' backend #Additional config if use 'redis' backend
redis_pool: redis_pool:
host: "10.160.178.186" #redis://[arbitrary_username:password@]ipaddress:port/database_index
port: 6379 #or ipaddress:port[,weight,password,database_index]
redis_url: "redis:6379"
namespace: "harbor_job_service" namespace: "harbor_job_service"
#Logger for job #Logger for job

View File

@ -4,7 +4,6 @@ package runtime
import ( import (
"context" "context"
"fmt"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
@ -151,9 +150,8 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
MaxIdle: 6, MaxIdle: 6,
Wait: true, Wait: true,
Dial: func() (redis.Conn, error) { Dial: func() (redis.Conn, error) {
return redis.Dial( return redis.DialURL(
"tcp", cfg.PoolConfig.RedisPoolCfg.RedisURL,
fmt.Sprintf("%s:%d", cfg.PoolConfig.RedisPoolCfg.Host, cfg.PoolConfig.RedisPoolCfg.Port),
redis.DialConnectTimeout(dialConnectionTimeout), redis.DialConnectTimeout(dialConnectionTimeout),
redis.DialReadTimeout(dialReadTimeout), redis.DialReadTimeout(dialReadTimeout),
redis.DialWriteTimeout(dialWriteTimeout), redis.DialWriteTimeout(dialWriteTimeout),

View File

@ -5,8 +5,10 @@ package utils
import ( import (
"errors" "errors"
"fmt"
"net/url" "net/url"
"os" "os"
"strconv"
"strings" "strings"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
@ -71,6 +73,40 @@ func IsValidURL(address string) bool {
return true return true
} }
//TranslateRedisAddress translates the comma format to redis URL
func TranslateRedisAddress(commaFormat string) (string, bool) {
if IsEmptyStr(commaFormat) {
return "", false
}
sections := strings.Split(commaFormat, ",")
totalSections := len(sections)
if totalSections == 0 {
return "", false
}
urlParts := []string{}
//section[0] should be host:port
redisURL := fmt.Sprintf("redis://%s", sections[0])
if _, err := url.Parse(redisURL); err != nil {
return "", false
}
urlParts = append(urlParts, "redis://", sections[0])
//Ignore weight
//Check password
if totalSections >= 3 && !IsEmptyStr(sections[2]) {
urlParts = []string{urlParts[0], fmt.Sprintf("%s:%s@", "arbitrary_username", sections[2]), urlParts[1]}
}
if totalSections >= 4 && !IsEmptyStr(sections[3]) {
if _, err := strconv.Atoi(sections[3]); err == nil {
urlParts = append(urlParts, "/", sections[3])
}
}
return strings.Join(urlParts, ""), true
}
//JobScore represents the data item with score in the redis db. //JobScore represents the data item with score in the redis db.
type JobScore struct { type JobScore struct {
JobBytes []byte JobBytes []byte