Extarct out redis pool initialization code to bootstrap

This commit is contained in:
Steven Zou 2018-03-26 16:38:53 +08:00
parent 7663592551
commit e966698939
2 changed files with 36 additions and 42 deletions

View File

@ -19,10 +19,6 @@ import (
) )
var ( var (
dialConnectionTimeout = 30 * time.Second
healthCheckPeriod = time.Minute
dialReadTimeout = healthCheckPeriod + 10*time.Second
dialWriteTimeout = 10 * time.Second
workerPoolDeadTime = 10 * time.Second workerPoolDeadTime = 10 * time.Second
) )
@ -50,43 +46,21 @@ type GoCraftWorkPool struct {
knownJobs map[string]interface{} knownJobs map[string]interface{}
} }
//RedisPoolConfig defines configurations for GoCraftWorkPool.
type RedisPoolConfig struct {
RedisHost string
RedisPort uint
Namespace string
WorkerCount uint
}
//RedisPoolContext ... //RedisPoolContext ...
//We did not use this context to pass context info so far, just a placeholder. //We did not use this context to pass context info so far, just a placeholder.
type RedisPoolContext struct{} type RedisPoolContext struct{}
//NewGoCraftWorkPool is constructor of goCraftWorkPool. //NewGoCraftWorkPool is constructor of goCraftWorkPool.
func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool { func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, redisPool *redis.Pool) *GoCraftWorkPool {
redisPool := &redis.Pool{ pool := work.NewWorkerPool(RedisPoolContext{}, workerCount, namespace, redisPool)
MaxActive: 6, enqueuer := work.NewEnqueuer(namespace, redisPool)
MaxIdle: 6, client := work.NewClient(namespace, redisPool)
Wait: true, scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool)
Dial: func() (redis.Conn, error) { sweeper := period.NewSweeper(namespace, redisPool, client)
return redis.Dial( statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, namespace, redisPool, client, scheduler)
"tcp", msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool)
fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort),
redis.DialConnectTimeout(dialConnectionTimeout),
redis.DialReadTimeout(dialReadTimeout),
redis.DialWriteTimeout(dialWriteTimeout),
)
},
}
pool := work.NewWorkerPool(RedisPoolContext{}, cfg.WorkerCount, cfg.Namespace, redisPool)
enqueuer := work.NewEnqueuer(cfg.Namespace, redisPool)
client := work.NewClient(cfg.Namespace, redisPool)
scheduler := period.NewRedisPeriodicScheduler(ctx, cfg.Namespace, redisPool)
sweeper := period.NewSweeper(cfg.Namespace, redisPool, client)
statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool, client, scheduler)
msgServer := NewMessageServer(ctx.SystemContext, cfg.Namespace, redisPool)
return &GoCraftWorkPool{ return &GoCraftWorkPool{
namespace: cfg.Namespace, namespace: namespace,
redisPool: redisPool, redisPool: redisPool,
pool: pool, pool: pool,
enqueuer: enqueuer, enqueuer: enqueuer,

View File

@ -4,12 +4,14 @@ package runtime
import ( import (
"context" "context"
"fmt"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/common/job" "github.com/vmware/harbor/src/common/job"
"github.com/vmware/harbor/src/jobservice_v2/api" "github.com/vmware/harbor/src/jobservice_v2/api"
"github.com/vmware/harbor/src/jobservice_v2/config" "github.com/vmware/harbor/src/jobservice_v2/config"
@ -22,6 +24,13 @@ import (
"github.com/vmware/harbor/src/jobservice_v2/pool" "github.com/vmware/harbor/src/jobservice_v2/pool"
) )
const (
dialConnectionTimeout = 30 * time.Second
healthCheckPeriod = time.Minute
dialReadTimeout = healthCheckPeriod + 10*time.Second
dialWriteTimeout = 10 * time.Second
)
//JobService ... //JobService ...
var JobService = &Bootstrap{} var JobService = &Bootstrap{}
@ -136,14 +145,25 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configura
//Load and run the worker pool //Load and run the worker pool
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface { func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface {
redisPoolCfg := pool.RedisPoolConfig{ redisPool := &redis.Pool{
RedisHost: cfg.PoolConfig.RedisPoolCfg.Host, MaxActive: 6,
RedisPort: cfg.PoolConfig.RedisPoolCfg.Port, MaxIdle: 6,
Namespace: cfg.PoolConfig.RedisPoolCfg.Namespace, Wait: true,
WorkerCount: cfg.PoolConfig.WorkerCount, Dial: func() (redis.Conn, error) {
return redis.Dial(
"tcp",
fmt.Sprintf("%s:%d", cfg.PoolConfig.RedisPoolCfg.Host, cfg.PoolConfig.RedisPoolCfg.Port),
redis.DialConnectTimeout(dialConnectionTimeout),
redis.DialReadTimeout(dialReadTimeout),
redis.DialWriteTimeout(dialWriteTimeout),
)
},
} }
redisWorkerPool := pool.NewGoCraftWorkPool(ctx, redisPoolCfg) redisWorkerPool := pool.NewGoCraftWorkPool(ctx,
cfg.PoolConfig.RedisPoolCfg.Namespace,
cfg.PoolConfig.WorkerCount,
redisPool)
//Register jobs here //Register jobs here
if err := redisWorkerPool.RegisterJob(impl.KnownJobReplication, (*impl.ReplicationJob)(nil)); err != nil { if err := redisWorkerPool.RegisterJob(impl.KnownJobReplication, (*impl.ReplicationJob)(nil)); err != nil {
//exit //exit