Merge pull request #7522 from ywk253100/190426_start_replication

Return immediately after creating the execution record when starting the replication
This commit is contained in:
Steven Zou 2019-04-29 15:01:52 +08:00 committed by GitHub
commit e07e25eeef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 26 deletions

View File

@ -41,16 +41,26 @@ type Controller interface {
GetTaskLog(int64) ([]byte, error) GetTaskLog(int64) ([]byte, error)
} }
const (
maxReplicators = 1024
)
// NewController returns a controller implementation // NewController returns a controller implementation
func NewController(js job.Client) Controller { func NewController(js job.Client) Controller {
return &controller{ ctl := &controller{
replicators: make(chan struct{}, maxReplicators),
executionMgr: execution.NewDefaultManager(), executionMgr: execution.NewDefaultManager(),
scheduler: scheduler.NewScheduler(js), scheduler: scheduler.NewScheduler(js),
flowCtl: flow.NewController(), flowCtl: flow.NewController(),
} }
for i := 0; i < maxReplicators; i++ {
ctl.replicators <- struct{}{}
}
return ctl
} }
type controller struct { type controller struct {
replicators chan struct{}
flowCtl flow.Controller flowCtl flow.Controller
executionMgr execution.Manager executionMgr execution.Manager
scheduler scheduler.Scheduler scheduler scheduler.Scheduler
@ -67,24 +77,31 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso
if err != nil { if err != nil {
return 0, err return 0, err
} }
// control the count of concurrent replication requests
flow := c.createFlow(id, policy, resource) log.Debugf("waiting for the available replicator ...")
if n, err := c.flowCtl.Start(flow); err != nil { <-c.replicators
// only update the execution when got error. log.Debugf("got an available replicator, starting the replication ...")
// if got no error, it will be updated automatically go func() {
// when listing the execution records defer func() {
if e := c.executionMgr.Update(&models.Execution{ c.replicators <- struct{}{}
ID: id, }()
Status: models.ExecutionStatusFailed, flow := c.createFlow(id, policy, resource)
StatusText: err.Error(), if n, err := c.flowCtl.Start(flow); err != nil {
Total: n, // only update the execution when got error.
Failed: n, // if got no error, it will be updated automatically
}, "Status", "StatusText", "Total", "Failed"); e != nil { // when listing the execution records
log.Errorf("failed to update the execution %d: %v", id, e) if e := c.executionMgr.Update(&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: err.Error(),
Total: n,
Failed: n,
}, "Status", "StatusText", "Total", "Failed"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
}
log.Errorf("the execution %d failed: %v", id, err)
} }
log.Errorf("the execution %d failed: %v", id, err) }()
}
return id, nil return id, nil
} }

View File

@ -16,6 +16,7 @@ package operation
import ( import (
"io" "io"
"os"
"testing" "testing"
"github.com/docker/distribution" "github.com/docker/distribution"
@ -197,16 +198,25 @@ func (f *fakedAdapter) DeleteChart(name, version string) error {
return nil return nil
} }
var ctl = &controller{ var ctl *controller
executionMgr: &fakedExecutionManager{},
scheduler: &fakedScheduler{}, func TestMain(m *testing.M) {
flowCtl: flow.NewController(), ctl = &controller{
replicators: make(chan struct{}, 1),
executionMgr: &fakedExecutionManager{},
scheduler: &fakedScheduler{},
flowCtl: flow.NewController(),
}
ctl.replicators <- struct{}{}
os.Exit(m.Run())
} }
func TestStartReplication(t *testing.T) { func TestStartReplication(t *testing.T) {
err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory) err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory)
require.Nil(t, err) require.Nil(t, err)
config.Config = &config.Configuration{} config.Config = &config.Configuration{}
// policy is disabled
policy := &model.Policy{ policy := &model.Policy{
SrcRegistry: &model.Registry{ SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor, Type: model.RegistryTypeHarbor,
@ -224,24 +234,67 @@ func TestStartReplication(t *testing.T) {
Vtags: []string{"1.0", "2.0"}, Vtags: []string{"1.0", "2.0"},
}, },
} }
// policy is disabled
_, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) _, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.NotNil(t, err) require.NotNil(t, err)
policy.Enabled = true
// replicate resource deletion // replicate resource deletion
resource.Deleted = true policy = &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
Enabled: true,
}
resource = &model.Resource{
Type: model.ResourceTypeImage,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
Vtags: []string{"1.0", "2.0"},
},
Deleted: true,
}
id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, int64(1), id) assert.Equal(t, int64(1), id)
// replicate resource copy // replicate resource copy
resource.Deleted = false policy = &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
Enabled: true,
}
resource = &model.Resource{
Type: model.ResourceTypeImage,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
Vtags: []string{"1.0", "2.0"},
},
Deleted: false,
}
id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, int64(1), id) assert.Equal(t, int64(1), id)
// nil resource // nil resource
policy = &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
Enabled: true,
}
id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased) id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased)
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, int64(1), id) assert.Equal(t, int64(1), id)