mirror of
https://github.com/goharbor/harbor
synced 2025-05-06 22:58:45 +00:00
Merge pull request #6929 from ywk253100/190214_flow_controller_scheduler
Refactor the Scheduler interface
This commit is contained in:
commit
60a6113323
@ -23,7 +23,7 @@ type Manager interface {
|
||||
// Create a new execution
|
||||
Create(*model.Execution) (int64, error)
|
||||
// List the summaries of executions
|
||||
List(*model.ExecutionQuery) (int64, []*model.Execution, error)
|
||||
List(...*model.ExecutionQuery) (int64, []*model.Execution, error)
|
||||
// Get the specified execution
|
||||
Get(int64) (*model.Execution, error)
|
||||
// Update the data of the specified execution, the "props" are the
|
||||
@ -36,12 +36,17 @@ type Manager interface {
|
||||
// Create a task
|
||||
CreateTask(*model.Task) (int64, error)
|
||||
// List the tasks according to the query
|
||||
ListTasks(*model.TaskQuery) (int64, []*model.Task, error)
|
||||
ListTasks(...*model.TaskQuery) (int64, []*model.Task, error)
|
||||
// Get one specified task
|
||||
GetTask(int64) (*model.Task, error)
|
||||
// Update the task, the "props" are the properties of task
|
||||
// that need to be updated
|
||||
// that need to be updated, it cannot include "status". If
|
||||
// you want to update the status, use "UpdateTask" instead
|
||||
UpdateTask(task *model.Task, props ...string) error
|
||||
// UpdateTaskStatus only updates the task status. If "statusCondition"
|
||||
// presents, only the tasks whose status equal to "statusCondition"
|
||||
// will be updated
|
||||
UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error
|
||||
// Remove one task specified by task ID
|
||||
RemoveTask(int64) error
|
||||
// Remove all tasks of one execution specified by the execution ID
|
||||
|
@ -83,7 +83,19 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// schedule the replication
|
||||
// preprocess the resources
|
||||
if err = flow.preprocess(); err != nil {
|
||||
log.Errorf("failed to preprocess the resources for the execution %d: %v", id, err)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// create task records in database
|
||||
if err = flow.createTasks(); err != nil {
|
||||
log.Errorf("failed to create task records for the execution %d: %v", id, err)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// schedule the tasks
|
||||
if err = flow.schedule(); err != nil {
|
||||
log.Errorf("failed to schedule the execution %d: %v", id, err)
|
||||
return id, nil
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -79,7 +80,7 @@ type fakedExecutionManager struct{}
|
||||
func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) List(*model.ExecutionQuery) (int64, []*model.Execution, error) {
|
||||
func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) {
|
||||
@ -95,9 +96,9 @@ func (f *fakedExecutionManager) RemoveAll(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) {
|
||||
return 0, nil
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) ListTasks(*model.TaskQuery) (int64, []*model.Task, error) {
|
||||
func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
|
||||
@ -106,6 +107,9 @@ func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
|
||||
func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) RemoveTask(int64) error {
|
||||
return nil
|
||||
}
|
||||
@ -118,13 +122,25 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
|
||||
|
||||
type fakedScheduler struct{}
|
||||
|
||||
func (f *fakedScheduler) Schedule(src []*model.Resource, dst []*model.Resource) ([]*model.Task, error) {
|
||||
return []*model.Task{
|
||||
{
|
||||
Status: model.TaskStatusPending,
|
||||
JobID: "uuid",
|
||||
},
|
||||
}, nil
|
||||
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
|
||||
items := []*scheduler.ScheduleItem{}
|
||||
for i, res := range src {
|
||||
items = append(items, &scheduler.ScheduleItem{
|
||||
SrcResource: res,
|
||||
DstResource: dst[i],
|
||||
})
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
|
||||
results := []*scheduler.ScheduleResult{}
|
||||
for _, item := range items {
|
||||
results = append(results, &scheduler.ScheduleResult{
|
||||
TaskID: item.TaskID,
|
||||
Error: nil,
|
||||
})
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
func (f *fakedScheduler) Stop(id string) error {
|
||||
return nil
|
||||
@ -153,7 +169,7 @@ func (f *fakedAdapter) FetchResources(namespace []string, filters []*model.Filte
|
||||
{
|
||||
Type: model.ResourceTypeRepository,
|
||||
Metadata: &model.ResourceMetadata{
|
||||
Name: "hello-world",
|
||||
Name: "library/hello-world",
|
||||
Namespace: "library",
|
||||
Vtags: []string{"latest"},
|
||||
},
|
||||
|
@ -15,7 +15,9 @@
|
||||
package flow
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||
@ -29,15 +31,16 @@ import (
|
||||
)
|
||||
|
||||
type flow struct {
|
||||
policy *model.Policy
|
||||
srcRegistry *model.Registry
|
||||
dstRegistry *model.Registry
|
||||
srcAdapter adapter.Adapter
|
||||
dstAdapter adapter.Adapter
|
||||
executionID int64
|
||||
resources []*model.Resource
|
||||
executionMgr execution.Manager
|
||||
scheduler scheduler.Scheduler
|
||||
policy *model.Policy
|
||||
srcRegistry *model.Registry
|
||||
dstRegistry *model.Registry
|
||||
srcAdapter adapter.Adapter
|
||||
dstAdapter adapter.Adapter
|
||||
executionID int64
|
||||
resources []*model.Resource
|
||||
executionMgr execution.Manager
|
||||
scheduler scheduler.Scheduler
|
||||
scheduleItems []*scheduler.ScheduleItem
|
||||
}
|
||||
|
||||
func newFlow(policy *model.Policy, registryMgr registry.Manager,
|
||||
@ -143,7 +146,7 @@ func (f *flow) createNamespace() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *flow) schedule() error {
|
||||
func (f *flow) preprocess() error {
|
||||
dstResources := []*model.Resource{}
|
||||
for _, srcResource := range f.resources {
|
||||
dstResource := &model.Resource{
|
||||
@ -161,30 +164,78 @@ func (f *flow) schedule() error {
|
||||
dstResources = append(dstResources, dstResource)
|
||||
}
|
||||
|
||||
tasks, err := f.scheduler.Schedule(f.resources, dstResources)
|
||||
items, err := f.scheduler.Preprocess(f.resources, dstResources)
|
||||
if err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
f.scheduleItems = items
|
||||
log.Debugf("the preprocess for resources of the execution %d completed",
|
||||
f.executionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *flow) createTasks() error {
|
||||
for _, item := range f.scheduleItems {
|
||||
task := &model.Task{
|
||||
ExecutionID: f.executionID,
|
||||
Status: model.TaskStatusInitialized,
|
||||
ResourceType: item.SrcResource.Type,
|
||||
SrcResource: getResourceName(item.SrcResource),
|
||||
DstResource: getResourceName(item.DstResource),
|
||||
}
|
||||
id, err := f.executionMgr.CreateTask(task)
|
||||
if err != nil {
|
||||
// if failed to create the task for one of the items,
|
||||
// the whole execution is marked as failure and all
|
||||
// the items will not be submitted
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
item.TaskID = id
|
||||
log.Debugf("task record %d for the execution %d created",
|
||||
id, f.executionID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *flow) schedule() error {
|
||||
results, err := f.scheduler.Schedule(f.scheduleItems)
|
||||
if err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
|
||||
allFailed := true
|
||||
for _, task := range tasks {
|
||||
if task.Status != model.TaskStatusFailed {
|
||||
allFailed = false
|
||||
for _, result := range results {
|
||||
// if the task is failed to be submitted, update the status of the
|
||||
// task as failure
|
||||
if result.Error != nil {
|
||||
log.Errorf("failed to schedule task %d: %v", result.TaskID, err)
|
||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusFailed); err != nil {
|
||||
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
task.ExecutionID = f.executionID
|
||||
taskID, err := f.executionMgr.CreateTask(task)
|
||||
if err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
allFailed = false
|
||||
// if the task is submitted successfully, update the status and start time
|
||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil {
|
||||
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
|
||||
}
|
||||
log.Debugf("task record %d for execution %d created", taskID, f.executionID)
|
||||
if err = f.executionMgr.UpdateTask(&model.Task{
|
||||
ID: result.TaskID,
|
||||
StartTime: time.Now(),
|
||||
}); err != nil {
|
||||
log.Errorf("failed to update task %d: %v", result.TaskID, err)
|
||||
}
|
||||
log.Debugf("the task %d scheduled", result.TaskID)
|
||||
}
|
||||
// if all the tasks are failed, mark the execution failed
|
||||
if allFailed {
|
||||
err = errors.New("all tasks are failed")
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -193,6 +244,8 @@ func (f *flow) markExecutionFailure(err error) {
|
||||
if err != nil {
|
||||
statusText = err.Error()
|
||||
}
|
||||
log.Errorf("the execution %d is marked as failure because of the error: %s",
|
||||
f.executionID, statusText)
|
||||
err = f.executionMgr.Update(
|
||||
&model.Execution{
|
||||
ID: f.executionID,
|
||||
@ -204,3 +257,19 @@ func (f *flow) markExecutionFailure(err error) {
|
||||
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
|
||||
// if the resource has vtags
|
||||
func getResourceName(res *model.Resource) string {
|
||||
if res == nil {
|
||||
return ""
|
||||
}
|
||||
meta := res.Metadata
|
||||
if meta == nil {
|
||||
return ""
|
||||
}
|
||||
if len(meta.Vtags) == 0 {
|
||||
return meta.Name
|
||||
}
|
||||
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
|
||||
}
|
||||
|
@ -31,28 +31,31 @@ const (
|
||||
ExecutionTriggerEvent string = "Event"
|
||||
ExecutionTriggerSchedule string = "Schedule"
|
||||
|
||||
TaskStatusFailed string = "Failed"
|
||||
TaskStatusSucceed string = "Succeed"
|
||||
TaskStatusStopped string = "Stopped"
|
||||
TaskStatusInProgress string = "InProgress"
|
||||
TaskStatusPending string = "Pending"
|
||||
// The task has been persisted in db but not submitted to Jobservice
|
||||
TaskStatusInitialized string = "Initialized"
|
||||
TaskStatusPending string = "Pending"
|
||||
TaskStatusInProgress string = "InProgress"
|
||||
TaskStatusSucceed string = "Succeed"
|
||||
TaskStatusFailed string = "Failed"
|
||||
TaskStatusStopped string = "Stopped"
|
||||
)
|
||||
|
||||
// Execution defines an execution of the replication
|
||||
type Execution struct {
|
||||
ID int64 `json:"id"`
|
||||
PolicyID int64 `json:"policy_id"`
|
||||
Status string `json:"status"`
|
||||
StatusText string `json:"status_text"`
|
||||
Trigger string `json:"trigger"`
|
||||
Total int `json:"total"`
|
||||
Failed int `json:"failed"`
|
||||
Succeed int `json:"succeed"`
|
||||
Pending int `json:"pending"`
|
||||
InProgress int `json:"in_progress"`
|
||||
Stopped int `json:"stopped"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
ID int64 `json:"id"`
|
||||
PolicyID int64 `json:"policy_id"`
|
||||
Status string `json:"status"`
|
||||
StatusText string `json:"status_text"`
|
||||
Trigger string `json:"trigger"`
|
||||
Total int `json:"total"`
|
||||
Failed int `json:"failed"`
|
||||
Succeed int `json:"succeed"`
|
||||
Pending int `json:"pending"`
|
||||
InProgress int `json:"in_progress"`
|
||||
Stopped int `json:"stopped"`
|
||||
Initialized int `json:"initialized"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
}
|
||||
|
||||
// Task holds the information of one replication task
|
||||
|
@ -18,10 +18,27 @@ import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
// Scheduler schedules tasks to transfer resource data
|
||||
// ScheduleItem is an item that can be scheduled
|
||||
type ScheduleItem struct {
|
||||
TaskID int64 // used as the param in the hook
|
||||
SrcResource *model.Resource
|
||||
DstResource *model.Resource
|
||||
}
|
||||
|
||||
// ScheduleResult is the result of the schedule for one item
|
||||
type ScheduleResult struct {
|
||||
TaskID int64
|
||||
Error error
|
||||
}
|
||||
|
||||
// Scheduler schedules
|
||||
type Scheduler interface {
|
||||
// Schedule tasks for one execution
|
||||
Schedule([]*model.Resource, []*model.Resource) ([]*model.Task, error)
|
||||
// Stop the task specified by ID
|
||||
// Preprocess the resources and returns the item list that can be scheduled
|
||||
Preprocess([]*model.Resource, []*model.Resource) ([]*ScheduleItem, error)
|
||||
// Schedule the items. If got error when scheduling one of the items,
|
||||
// the error should be put in the corresponding ScheduleResult and the
|
||||
// returning error of this function should be nil
|
||||
Schedule([]*ScheduleItem) ([]*ScheduleResult, error)
|
||||
// Stop the job specified by ID
|
||||
Stop(id string) error
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user