Refine replication schedule trigger API

This commit is contained in:
Wenkai Yin 2017-12-12 15:56:46 +08:00
parent 1d2e206ce4
commit 055ab0ba15
9 changed files with 154 additions and 83 deletions

View File

@ -160,7 +160,7 @@ func (ctl *DefaultController) UpdatePolicy(updatedPolicy models.ReplicationPolic
} else { } else {
switch updatedPolicy.Trigger.Kind { switch updatedPolicy.Trigger.Kind {
case replication.TriggerKindSchedule: case replication.TriggerKindSchedule:
if updatedPolicy.Trigger.Param != originPolicy.Trigger.Param { if !originPolicy.Trigger.ScheduleParam.Equal(updatedPolicy.Trigger.ScheduleParam) {
reset = true reset = true
} }
case replication.TriggerKindImmediate: case replication.TriggerKindImmediate:
@ -176,7 +176,7 @@ func (ctl *DefaultController) UpdatePolicy(updatedPolicy models.ReplicationPolic
} }
if reset { if reset {
if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil { if err = ctl.triggerManager.UnsetTrigger(&originPolicy); err != nil {
return err return err
} }
@ -199,7 +199,7 @@ func (ctl *DefaultController) RemovePolicy(policyID int64) error {
return fmt.Errorf("policy %d not found", policyID) return fmt.Errorf("policy %d not found", policyID)
} }
if err = ctl.triggerManager.UnsetTrigger(policyID, *policy.Trigger); err != nil { if err = ctl.triggerManager.UnsetTrigger(&policy); err != nil {
return err return err
} }
@ -230,7 +230,6 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i
// prepare candidates for replication // prepare candidates for replication
candidates := getCandidates(&policy, ctl.sourcer, metadata...) candidates := getCandidates(&policy, ctl.sourcer, metadata...)
// TODO
/* /*
targets := []*common_models.RepTarget{} targets := []*common_models.RepTarget{}
for _, targetID := range policy.TargetIDs { for _, targetID := range policy.TargetIDs {

View File

@ -42,7 +42,11 @@ func TestInit(t *testing.T) {
} }
func TestCreatePolicy(t *testing.T) { func TestCreatePolicy(t *testing.T) {
_, err := GlobalController.CreatePolicy(models.ReplicationPolicy{}) _, err := GlobalController.CreatePolicy(models.ReplicationPolicy{
Trigger: &models.Trigger{
Kind: replication.TriggerKindManual,
},
})
assert.Nil(t, err) assert.Nil(t, err)
} }

View File

@ -23,11 +23,8 @@ import (
//Trigger is replication launching approach definition //Trigger is replication launching approach definition
type Trigger struct { type Trigger struct {
//The name of the trigger Kind string `json:"kind"` // the type of the trigger
Kind string `json:"kind"` ScheduleParam *ScheduleParam `json:"schedule_param"` // optional, only used when kind is 'schedule'
//The parameters with json text format required by the trigger
Param string `json:"param"`
} }
// Valid ... // Valid ...
@ -37,4 +34,46 @@ func (t *Trigger) Valid(v *validation.Validation) {
t.Kind == replication.TriggerKindSchedule) { t.Kind == replication.TriggerKindSchedule) {
v.SetError("kind", fmt.Sprintf("invalid trigger kind: %s", t.Kind)) v.SetError("kind", fmt.Sprintf("invalid trigger kind: %s", t.Kind))
} }
if t.Kind == replication.TriggerKindSchedule {
if t.ScheduleParam == nil {
v.SetError("schedule_param", "empty schedule_param")
} else {
t.ScheduleParam.Valid(v)
}
}
}
// ScheduleParam defines the parameters used by schedule trigger
type ScheduleParam struct {
Type string `json:"type"` //daily or weekly
Weekday int8 `json:"weekday"` //Optional, only used when type is 'weekly'
Offtime int64 `json:"offtime"` //The time offset with the UTC 00:00 in seconds
}
// Valid ...
func (s *ScheduleParam) Valid(v *validation.Validation) {
if !(s.Type == replication.TriggerScheduleDaily ||
s.Type == replication.TriggerScheduleWeekly) {
v.SetError("type", fmt.Sprintf("invalid schedule trigger parameter type: %s", s.Type))
}
if s.Type == replication.TriggerScheduleWeekly {
if s.Weekday < 1 || s.Weekday > 7 {
v.SetError("weekday", fmt.Sprintf("invalid schedule trigger parameter weekday: %d", s.Weekday))
}
}
if s.Offtime < 0 || s.Offtime > 3600*24 {
v.SetError("offtime", fmt.Sprintf("invalid schedule trigger parameter offtime: %d", s.Offtime))
}
}
// Equal ...
func (s *ScheduleParam) Equal(param *ScheduleParam) bool {
if param == nil {
return false
}
return s.Type == param.Type && s.Weekday == param.Weekday && s.Offtime == param.Offtime
} }

View File

@ -31,6 +31,9 @@ func TestValidOfTrigger(t *testing.T) {
&Trigger{ &Trigger{
Kind: replication.TriggerKindImmediate, Kind: replication.TriggerKindImmediate,
}: false, }: false,
&Trigger{
Kind: replication.TriggerKindSchedule,
}: true,
} }
for filter, hasError := range cases { for filter, hasError := range cases {
@ -39,3 +42,36 @@ func TestValidOfTrigger(t *testing.T) {
assert.Equal(t, hasError, v.HasErrors()) assert.Equal(t, hasError, v.HasErrors())
} }
} }
func TestValidOfScheduleParam(t *testing.T) {
cases := map[*ScheduleParam]bool{
&ScheduleParam{}: true,
&ScheduleParam{
Type: "invalid_type",
}: true,
&ScheduleParam{
Type: replication.TriggerScheduleDaily,
Offtime: 3600*24 + 1,
}: true,
&ScheduleParam{
Type: replication.TriggerScheduleDaily,
Offtime: 3600 * 2,
}: false,
&ScheduleParam{
Type: replication.TriggerScheduleWeekly,
Weekday: 0,
Offtime: 3600 * 2,
}: true,
&ScheduleParam{
Type: replication.TriggerScheduleWeekly,
Weekday: 7,
Offtime: 3600 * 2,
}: false,
}
for param, hasError := range cases {
v := &validation.Validation{}
param.Valid(v)
assert.Equal(t, hasError, v.HasErrors())
}
}

View File

@ -49,6 +49,12 @@ func (r *RepositoryFilter) GetConvertor() Convertor {
// DoFilter filters repository and image(according to the repository part) and drops any other resource types // DoFilter filters repository and image(according to the repository part) and drops any other resource types
func (r *RepositoryFilter) DoFilter(items []models.FilterItem) []models.FilterItem { func (r *RepositoryFilter) DoFilter(items []models.FilterItem) []models.FilterItem {
candidates := []string{}
for _, item := range items {
candidates = append(candidates, item.Value)
}
log.Debugf("repository filter candidates: %v", candidates)
result := []models.FilterItem{} result := []models.FilterItem{}
for _, item := range items { for _, item := range items {
if item.Kind != replication.FilterItemKindRepository && item.Kind != replication.FilterItemKindTag { if item.Kind != replication.FilterItemKindRepository && item.Kind != replication.FilterItemKindTag {

View File

@ -49,6 +49,12 @@ func (t *TagFilter) GetConvertor() Convertor {
// DoFilter filters tag of the image // DoFilter filters tag of the image
func (t *TagFilter) DoFilter(items []models.FilterItem) []models.FilterItem { func (t *TagFilter) DoFilter(items []models.FilterItem) []models.FilterItem {
candidates := []string{}
for _, item := range items {
candidates = append(candidates, item.Value)
}
log.Debugf("tag filter candidates: %v", candidates)
result := []models.FilterItem{} result := []models.FilterItem{}
for _, item := range items { for _, item := range items {
if item.Kind != replication.FilterItemKindTag { if item.Kind != replication.FilterItemKindTag {

View File

@ -1,7 +1,6 @@
package trigger package trigger
import ( import (
"errors"
"fmt" "fmt"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
@ -55,88 +54,71 @@ func (m *Manager) RemoveTrigger(policyID int64) error {
//SetupTrigger will create the new trigger based on the provided policy. //SetupTrigger will create the new trigger based on the provided policy.
//If failed, an error will be returned. //If failed, an error will be returned.
func (m *Manager) SetupTrigger(policy *models.ReplicationPolicy) error { func (m *Manager) SetupTrigger(policy *models.ReplicationPolicy) error {
if policy == nil || policy.Trigger == nil { trigger, err := createTrigger(policy)
log.Debug("empty policy or trigger, skip trigger setup") if err != nil {
return err
}
// manual trigger, do nothing
if trigger == nil {
return nil return nil
} }
tg := trigger.(Interface)
if err = tg.Setup(); err != nil {
return err
}
log.Debugf("%s trigger for policy %d is set", tg.Kind(), policy.ID)
return nil
}
//UnsetTrigger will disable the trigger which is not cached in the trigger cache.
func (m *Manager) UnsetTrigger(policy *models.ReplicationPolicy) error {
trigger, err := createTrigger(policy)
if err != nil {
return err
}
// manual trigger, do nothing
if trigger == nil {
return nil
}
tg := trigger.(Interface)
if err = tg.Unset(); err != nil {
return err
}
log.Debugf("%s trigger for policy %d is unset", tg.Kind(), policy.ID)
return nil
}
func createTrigger(policy *models.ReplicationPolicy) (interface{}, error) {
if policy == nil || policy.Trigger == nil {
return nil, fmt.Errorf("empty policy or trigger")
}
trigger := policy.Trigger trigger := policy.Trigger
switch trigger.Kind { switch trigger.Kind {
case replication.TriggerKindSchedule: case replication.TriggerKindSchedule:
param := ScheduleParam{} param := ScheduleParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID and whether replicate deletion
param.PolicyID = policy.ID param.PolicyID = policy.ID
param.OnDeletion = policy.ReplicateDeletion param.Type = trigger.ScheduleParam.Type
param.Weekday = trigger.ScheduleParam.Weekday
param.Offtime = trigger.ScheduleParam.Offtime
newTrigger := NewScheduleTrigger(param) return NewScheduleTrigger(param), nil
if err := newTrigger.Setup(); err != nil {
return err
}
case replication.TriggerKindImmediate: case replication.TriggerKindImmediate:
param := ImmediateParam{} param := ImmediateParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID and whether replicate deletion
param.PolicyID = policy.ID param.PolicyID = policy.ID
param.OnDeletion = policy.ReplicateDeletion param.OnDeletion = policy.ReplicateDeletion
param.Namespaces = policy.Namespaces param.Namespaces = policy.Namespaces
newTrigger := NewImmediateTrigger(param) return NewImmediateTrigger(param), nil
if err := newTrigger.Setup(); err != nil {
return err
}
case replication.TriggerKindManual: case replication.TriggerKindManual:
// do nothing return nil, nil
default: default:
return fmt.Errorf("invalid trigger type: %s", policy.Trigger.Kind) return nil, fmt.Errorf("invalid trigger type: %s", trigger.Kind)
} }
return nil
}
//UnsetTrigger will disable the trigger which is not cached in the trigger cache.
func (m *Manager) UnsetTrigger(policyID int64, trigger models.Trigger) error {
if policyID <= 0 {
return errors.New("Invalid policy ID")
}
if len(trigger.Kind) == 0 {
return errors.New("Invalid replication trigger definition")
}
switch trigger.Kind {
case replication.TriggerKindSchedule:
param := ScheduleParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewScheduleTrigger(param)
if err := newTrigger.Unset(); err != nil {
return err
}
case replication.TriggerKindImmediate:
param := ImmediateParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewImmediateTrigger(param)
if err := newTrigger.Unset(); err != nil {
return err
}
default:
//Treat as manual trigger
break
}
return nil
} }

View File

@ -2,6 +2,7 @@ package trigger
import ( import (
"fmt" "fmt"
"time"
"github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/common/scheduler"
"github.com/vmware/harbor/src/common/scheduler/policy" "github.com/vmware/harbor/src/common/scheduler/policy"
@ -31,10 +32,10 @@ func (st *ScheduleTrigger) Setup() error {
config := &policy.AlternatePolicyConfiguration{} config := &policy.AlternatePolicyConfiguration{}
switch st.params.Type { switch st.params.Type {
case replication.TriggerScheduleDaily: case replication.TriggerScheduleDaily:
config.Duration = 24 * 3600 config.Duration = 24 * 3600 * time.Second
config.OffsetTime = st.params.Offtime config.OffsetTime = st.params.Offtime
case replication.TriggerScheduleWeekly: case replication.TriggerScheduleWeekly:
config.Duration = 7 * 24 * 3600 config.Duration = 7 * 24 * 3600 * time.Second
config.OffsetTime = st.params.Offtime config.OffsetTime = st.params.Offtime
config.Weekday = st.params.Weekday config.Weekday = st.params.Weekday
default: default:

View File

@ -471,8 +471,7 @@ func TestConvertToRepPolicy(t *testing.T) {
}, },
ReplicateDeletion: true, ReplicateDeletion: true,
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
Kind: "trigger_kind_01", Kind: "trigger_kind_01",
Param: "{param}",
}, },
Projects: []*models.Project{ Projects: []*models.Project{
&models.Project{ &models.Project{
@ -498,8 +497,7 @@ func TestConvertToRepPolicy(t *testing.T) {
}, },
ReplicateDeletion: true, ReplicateDeletion: true,
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
Kind: "trigger_kind_01", Kind: "trigger_kind_01",
Param: "{param}",
}, },
ProjectIDs: []int64{1}, ProjectIDs: []int64{1},
Namespaces: []string{"library"}, Namespaces: []string{"library"},