Setup/Unset trigger when CURD policies

This commit is contained in:
Wenkai Yin 2017-11-20 15:47:44 +08:00
parent 76154a233a
commit 59c1160edd
7 changed files with 97 additions and 39 deletions

View File

@ -91,22 +91,61 @@ func (ctl *Controller) Init() error {
//CreatePolicy is used to create a new policy and enable it if necessary
func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) (int64, error) {
//Validate policy
// TODO
id, err := ctl.policyManager.CreatePolicy(newPolicy)
if err != nil {
return 0, err
}
return ctl.policyManager.CreatePolicy(newPolicy)
if err = ctl.triggerManager.SetupTrigger(id, *newPolicy.Trigger); err != nil {
return 0, err
}
return id, nil
}
//UpdatePolicy will update the policy with new content.
//Parameter updatedPolicy must have the ID of the updated policy.
func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) error {
// TODO check pre-conditions
return ctl.policyManager.UpdatePolicy(updatedPolicy)
id := updatedPolicy.ID
originPolicy, err := ctl.policyManager.GetPolicy(id)
if err != nil {
return err
}
if originPolicy.ID == 0 {
return fmt.Errorf("policy %d not found", id)
}
if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil {
return err
}
if err = ctl.policyManager.UpdatePolicy(updatedPolicy); err != nil {
return err
}
return ctl.triggerManager.SetupTrigger(id, *updatedPolicy.Trigger)
}
//RemovePolicy will remove the specified policy and clean the related settings
func (ctl *Controller) RemovePolicy(policyID int64) error {
// TODO check pre-conditions
policy, err := ctl.policyManager.GetPolicy(policyID)
if err != nil {
return err
}
if policy.ID == 0 {
return fmt.Errorf("policy %d not found", policyID)
}
if err = ctl.triggerManager.UnsetTrigger(policyID, *policy.Trigger); err != nil {
return err
}
return ctl.policyManager.RemovePolicy(policyID)
}
@ -122,6 +161,9 @@ func (ctl *Controller) GetPolicies(query models.QueryParameter) ([]models.Replic
//Replicate starts one replication defined in the specified policy;
//Can be launched by the API layer and related triggers.
func (ctl *Controller) Replicate(policyID int64) error {
func (ctl *Controller) Replicate(policyID int64, item ...*models.FilterItem) error {
fmt.Printf("replicating %d ...\n", policyID)
return nil
}

View File

@ -43,5 +43,5 @@ func (st *ImmediateTrigger) Setup() error {
//Unset is the implementation of same method defined in Trigger interface
func (st *ImmediateTrigger) Unset() error {
return errors.New("Not implemented")
return DefaultWatchList.Remove(st.params.PolicyID)
}

View File

@ -12,17 +12,18 @@ import (
//with json format.
type Manager struct {
//Cache for triggers
cache *Cache
//cache *Cache
}
//NewManager is the constructor of trigger manager.
//capacity is the max number of trigger references manager can keep in memory
func NewManager(capacity int) *Manager {
return &Manager{
cache: NewCache(capacity),
//cache: NewCache(capacity),
}
}
/*
//GetTrigger returns the enabled trigger reference if existing in the cache.
func (m *Manager) GetTrigger(policyID int64) Interface {
return m.cache.Get(policyID)
@ -47,6 +48,7 @@ func (m *Manager) RemoveTrigger(policyID int64) error {
return nil
}
*/
//SetupTrigger will create the new trigger based on the provided json parameters.
//If failed, an error will be returned.

View File

@ -28,16 +28,11 @@ func (wl *WatchList) Add(item WatchItem) error {
}
//Remove the specified watch item from list
func (wl *WatchList) Remove() WatchItem {
return WatchItem{}
}
//Update the watch item in the list
func (wl *WatchList) Update(updatedItem WatchItem) error {
func (wl *WatchList) Remove(policyID int64) error {
return nil
}
//Get the specified watch item
func (wl *WatchList) Get(namespace string) WatchItem {
return WatchItem{}
//Get the watch items according to the namespace and operation
func (wl *WatchList) Get(namespace, operation string) ([]WatchItem, error) {
return []WatchItem{}, nil
}

View File

@ -255,7 +255,7 @@ func (ra *RepositoryAPI) Delete() {
}
log.Infof("delete tag: %s:%s", repoName, t)
go TriggerReplicationByRepository(project.ProjectID, repoName, []string{t}, models.RepOpDelete)
go CheckAndTriggerReplication(repoName+":"+t, "delete")
go func(tag string) {
if err := dao.AddAccessLog(models.AccessLog{

View File

@ -32,6 +32,10 @@ import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/common/utils/registry/auth"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/core"
rep_models "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/trigger"
"github.com/vmware/harbor/src/ui/config"
"github.com/vmware/harbor/src/ui/promgr"
"github.com/vmware/harbor/src/ui/service/token"
@ -77,7 +81,41 @@ func checkUserExists(name string) int {
return 0
}
// CheckAndTriggerReplication checks whether replication policy is set
// on the resource, if is, trigger the replication
func CheckAndTriggerReplication(image, operation string) {
project, _ := utils.ParseRepository(image)
watchItems, err := trigger.DefaultWatchList.Get(project, operation)
if err != nil {
log.Errorf("failed to get watch list for resource %s, operation %s: %v", image, operation, err)
return
}
if len(watchItems) == 0 {
log.Debugf("no replication should be triggered for resource %s, operation %s, skip", image, operation)
return
}
for _, watchItem := range watchItems {
// TODO define a new type ReplicationItem to wrap FilterItem and operation.
// Maybe change the FilterItem to interface and define a type Resource to
// implement FilterItem is better?
item := &rep_models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: image,
Metadata: map[string]interface{}{
"operation": operation,
},
}
if err := core.DefaultController.Replicate(watchItem.PolicyID, item); err != nil {
log.Errorf("failed to trigger replication for resource: %s, operation: %s: %v", image, operation, err)
return
}
log.Infof("replication for resource: %s, operation: %s triggered", image, operation)
}
}
// TriggerReplication triggers the replication according to the policy
// TODO remove
func TriggerReplication(policyID int64, repository string,
tags []string, operation string) error {
data := struct {
@ -101,26 +139,7 @@ func TriggerReplication(policyID int64, repository string,
return uiutils.RequestAsUI("POST", url, bytes.NewBuffer(b), uiutils.NewStatusRespHandler(http.StatusOK))
}
// TriggerReplicationByRepository triggers the replication according to the repository
func TriggerReplicationByRepository(projectID int64, repository string, tags []string, operation string) {
policies, err := dao.GetRepPolicyByProject(projectID)
if err != nil {
log.Errorf("failed to get policies for repository %s: %v", repository, err)
return
}
for _, policy := range policies {
if policy.Enabled == 0 {
continue
}
if err := TriggerReplication(policy.ID, repository, tags, operation); err != nil {
log.Errorf("failed to trigger replication of policy %d for %s: %v", policy.ID, repository, err)
} else {
log.Infof("replication of policy %d for %s triggered", policy.ID, repository)
}
}
}
// TODO remove
func postReplicationAction(policyID int64, acton string) error {
data := struct {
PolicyID int64 `json:"policy_id"`

View File

@ -104,7 +104,7 @@ func (n *NotificationHandler) Post() {
}
}()
go api.TriggerReplicationByRepository(pro.ProjectID, repository, []string{tag}, models.RepOpTransfer)
go api.CheckAndTriggerReplication(repository+":"+tag, "push")
if autoScanEnabled(pro) {
last, err := clairdao.GetLastUpdate()