Return immediately after creating the execution record when starting the replication

When fetching resources is slow, the starting replication request may timeout, this commit returns immediately after creating the execution record when starting the replication

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-04-26 10:43:15 +08:00
parent 6511417ba6
commit 621d2f20f3
2 changed files with 96 additions and 26 deletions

View File

@ -42,16 +42,26 @@ type Controller interface {
GetTaskLog(int64) ([]byte, error)
}
const (
maxReplicators = 1024
)
// NewController returns a controller implementation
func NewController(js job.Client) Controller {
return &controller{
ctl := &controller{
replicators: make(chan struct{}, maxReplicators),
executionMgr: execution.NewDefaultManager(),
scheduler: scheduler.NewScheduler(js),
flowCtl: flow.NewController(),
}
for i := 0; i < maxReplicators; i++ {
ctl.replicators <- struct{}{}
}
return ctl
}
type controller struct {
replicators chan struct{}
flowCtl flow.Controller
executionMgr execution.Manager
scheduler scheduler.Scheduler
@ -68,24 +78,31 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso
if err != nil {
return 0, err
}
flow := c.createFlow(id, policy, resource)
if n, err := c.flowCtl.Start(flow); err != nil {
// only update the execution when got error.
// if got no error, it will be updated automatically
// when listing the execution records
if e := c.executionMgr.Update(&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: err.Error(),
Total: n,
Failed: n,
}, "Status", "StatusText", "Total", "Failed"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
// control the count of concurrent replication requests
log.Debugf("waiting for the available replicator ...")
<-c.replicators
log.Debugf("got an available replicator, starting the replication ...")
go func() {
defer func() {
c.replicators <- struct{}{}
}()
flow := c.createFlow(id, policy, resource)
if n, err := c.flowCtl.Start(flow); err != nil {
// only update the execution when got error.
// if got no error, it will be updated automatically
// when listing the execution records
if e := c.executionMgr.Update(&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: err.Error(),
Total: n,
Failed: n,
}, "Status", "StatusText", "Total", "Failed"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
}
log.Errorf("the execution %d failed: %v", id, err)
}
log.Errorf("the execution %d failed: %v", id, err)
}
}()
return id, nil
}

View File

@ -17,6 +17,7 @@ package operation
import (
"errors"
"io"
"os"
"testing"
"github.com/docker/distribution"
@ -198,16 +199,25 @@ func (f *fakedAdapter) DeleteChart(name, version string) error {
return nil
}
var ctl = &controller{
executionMgr: &fakedExecutionManager{},
scheduler: &fakedScheduler{},
flowCtl: flow.NewController(),
var ctl *controller
func TestMain(m *testing.M) {
ctl = &controller{
replicators: make(chan struct{}, 1),
executionMgr: &fakedExecutionManager{},
scheduler: &fakedScheduler{},
flowCtl: flow.NewController(),
}
ctl.replicators <- struct{}{}
os.Exit(m.Run())
}
func TestStartReplication(t *testing.T) {
err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory)
require.Nil(t, err)
config.Config = &config.Configuration{}
// policy is disabled
policy := &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
@ -225,24 +235,67 @@ func TestStartReplication(t *testing.T) {
Vtags: []string{"1.0", "2.0"},
},
}
// policy is disabled
_, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.NotNil(t, err)
policy.Enabled = true
// replicate resource deletion
resource.Deleted = true
policy = &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
Enabled: true,
}
resource = &model.Resource{
Type: model.ResourceTypeImage,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
Vtags: []string{"1.0", "2.0"},
},
Deleted: true,
}
id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
// replicate resource copy
resource.Deleted = false
policy = &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
Enabled: true,
}
resource = &model.Resource{
Type: model.ResourceTypeImage,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
Vtags: []string{"1.0", "2.0"},
},
Deleted: false,
}
id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
// nil resource
policy = &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
Enabled: true,
}
id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased)
require.Nil(t, err)
assert.Equal(t, int64(1), id)