From aa9fc2a0833ab87b03acbc0a657805ba20ed2e83 Mon Sep 17 00:00:00 2001 From: Tan Jiang Date: Fri, 17 Jun 2016 18:54:29 +0800 Subject: [PATCH] resume running jobs when jobservice starts --- Deploy/db/registry.sql | 6 +++ dao/dao_test.go | 103 ++++++++++++++++++++++++++++++++++---- dao/replication_job.go | 52 +++++++++++++------ jobservice/main.go | 44 +++++++++++----- jobservice/populate.sql | 1 + models/replication_job.go | 25 ++++----- 6 files changed, 182 insertions(+), 49 deletions(-) diff --git a/Deploy/db/registry.sql b/Deploy/db/registry.sql index 17f075e11..2d32b4cbb 100644 --- a/Deploy/db/registry.sql +++ b/Deploy/db/registry.sql @@ -123,6 +123,12 @@ create table replication_target ( url varchar(64), username varchar(40), password varchar(40), + /* + target_type indicates the type of target registry, + 0 means it's a harbor instance, + 1 means it's a regulart registry + */ + target_type tinyint(1) NOT NULL DEFAULT 0, creation_time timestamp default CURRENT_TIMESTAMP, update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, PRIMARY KEY (id) diff --git a/dao/dao_test.go b/dao/dao_test.go index 343c38b57..837ff01bd 100644 --- a/dao/dao_test.go +++ b/dao/dao_test.go @@ -1092,6 +1092,22 @@ func TestGetRepJobByPolicy(t *testing.T) { } } +func TestDeleteRepJob(t *testing.T) { + err := DeleteRepJob(jobID) + if err != nil { + t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID) + return + } + t.Logf("deleted rep job, id: %d", jobID) + j, err := GetRepJob(jobID) + if err != nil { + t.Errorf("Error occured in GetRepJob:%v", err) + } + if j != nil { + t.Errorf("Able to find rep job after deletion, id: %d", jobID) + } +} + func TestGetRepoJobToStop(t *testing.T) { jobs := [...]models.RepJob{ models.RepJob{ @@ -1114,8 +1130,11 @@ func TestGetRepoJobToStop(t *testing.T) { }, } var err error + var i int64 + ids := make([]int64, 0) for _, j := range jobs { - _, err = AddRepJob(j) + i, err = AddRepJob(j) + ids = append(ids, i) if err != nil { log.Errorf("Failed to add Job: %+v, error: %v", j, err) return @@ -1127,10 +1146,17 @@ func TestGetRepoJobToStop(t *testing.T) { return } //time.Sleep(15 * time.Second) - if len(res) != 2 { - log.Errorf("Expected length of stoppable jobs, expected:2, in fact: %d", len(res)) + if len(res) != 1 { + log.Errorf("Expected length of stoppable jobs, expected:1, in fact: %d", len(res)) return } + for _, id := range ids { + err = DeleteRepJob(id) + if err != nil { + log.Errorf("Failed to delete job, id: %d, error: %v", id, err) + return + } + } } func TestDeleteRepTarget(t *testing.T) { @@ -1182,19 +1208,74 @@ func TestDeleteRepPolicy(t *testing.T) { } } -func TestDeleteRepJob(t *testing.T) { - err := DeleteRepJob(jobID) +func TestResetRepJobs(t *testing.T) { + + job1 := models.RepJob{ + Repository: "library/ubuntua", + PolicyID: policyID, + Operation: "transfer", + Status: models.JobRunning, + } + job2 := models.RepJob{ + Repository: "library/ubuntub", + PolicyID: policyID, + Operation: "transfer", + Status: models.JobCanceled, + } + id1, err := AddRepJob(job1) if err != nil { - t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID) + t.Errorf("Failed to add job: %+v, error: %v", job1, err) return } - t.Logf("deleted rep job, id: %d", jobID) - j, err := GetRepJob(jobID) + id2, err := AddRepJob(job2) if err != nil { - t.Errorf("Error occured in GetRepJob:%v", err) + t.Errorf("Failed to add job: %+v, error: %v", job2, err) + return } - if j != nil { - t.Errorf("Able to find rep job after deletion, id: %d", jobID) + err = ResetRunningJobs() + if err != nil { + t.Errorf("Failed to reset running jobs, error: %v", err) + } + j1, err := GetRepJob(id1) + if err != nil { + t.Errorf("Failed to get rep job, id: %d, error: %v", id1, err) + return + } + if j1.Status != models.JobPending { + t.Errorf("The rep job: %d, status should be Pending, but infact: %s", id1, j1.Status) + return + } + j2, err := GetRepJob(id2) + if err != nil { + t.Errorf("Failed to get rep job, id: %d, error: %v", id2, err) + return + } + if j2.Status == models.JobPending { + t.Errorf("The rep job: %d, status should be Canceled, but infact: %s", id2, j2.Status) + return + } +} + +func TestGetJobByStatus(t *testing.T) { + r1, err := GetRepJobByStatus(models.JobPending, models.JobRunning) + if err != nil { + t.Errorf("Failed to run GetRepJobByStatus, error: %v", err) + } + if len(r1) != 1 { + t.Errorf("Unexpected length of result, expected 1, but in fact:%d", len(r1)) + return + } + + r2, err := GetRepJobByStatus(models.JobPending, models.JobCanceled) + if err != nil { + t.Errorf("Failed to run GetRepJobByStatus, error: %v", err) + } + if len(r2) != 2 { + t.Errorf("Unexpected length of result, expected 2, but in fact:%d", len(r2)) + return + } + for _, j := range r2 { + DeleteRepJob(j.ID) } } diff --git a/dao/replication_job.go b/dao/replication_job.go index 83a5b9d11..b23db707f 100644 --- a/dao/replication_job.go +++ b/dao/replication_job.go @@ -1,16 +1,16 @@ /* - Copyright (c) 2016 VMware, Inc. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Copyright (c) 2016 VMware, Inc. All Rights Reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package dao @@ -267,9 +267,13 @@ func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) { return res, err } -func repJobPolicyIDQs(policyID int64) orm.QuerySeter { +func repJobQs() orm.QuerySeter { o := GetOrmer() - return o.QueryTable("replication_job").Filter("policy_id", policyID) + return o.QueryTable("replication_job") +} + +func repJobPolicyIDQs(policyID int64) orm.QuerySeter { + return repJobQs().Filter("policy_id", policyID) } // DeleteRepJob ... @@ -293,6 +297,26 @@ func UpdateRepJobStatus(id int64, status string) error { return err } +// ResetRunningJobs update all running jobs status to pending +func ResetRunningJobs() error { + o := GetOrmer() + sql := fmt.Sprintf("update replication_job set status = '%s' where status = '%s'", models.JobPending, models.JobRunning) + _, err := o.Raw(sql).Exec() + return err +} + +// GetRepJobsByStatus get jobs of certain statuses +func GetRepJobByStatus(status ...string) ([]*models.RepJob, error) { + var res []*models.RepJob + t := make([]interface{}, 0) + for _, s := range status { + t = append(t, interface{}(s)) + } + _, err := repJobQs().Filter("status__in", t...).All(&res) + genTagListForJob(res...) + return res, err +} + func genTagListForJob(jobs ...*models.RepJob) { for _, j := range jobs { if len(j.Tags) > 0 { diff --git a/jobservice/main.go b/jobservice/main.go index 6f17eb9c5..d960a0904 100644 --- a/jobservice/main.go +++ b/jobservice/main.go @@ -1,16 +1,16 @@ /* - Copyright (c) 2016 VMware, Inc. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Copyright (c) 2016 VMware, Inc. All Rights Reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package main @@ -19,6 +19,8 @@ import ( "github.com/astaxie/beego" "github.com/vmware/harbor/dao" "github.com/vmware/harbor/job" + "github.com/vmware/harbor/models" + "github.com/vmware/harbor/utils/log" ) func main() { @@ -26,5 +28,23 @@ func main() { initRouters() job.InitWorkerPool() go job.Dispatch() + resumeJobs() beego.Run() } + +func resumeJobs() { + log.Debugf("Trying to resume halted jobs...") + err := dao.ResetRunningJobs() + if err != nil { + log.Warningf("Failed to reset all running jobs to pending, error: %v", err) + } + jobs, err := dao.GetRepJobByStatus(models.JobPending) + if err == nil { + for _, j := range jobs { + log.Debugf("Rescheduling job: %d", j.ID) + job.Schedule(j.ID) + } + } else { + log.Warningf("Failed to get pending jobs, error: %v", err) + } +} diff --git a/jobservice/populate.sql b/jobservice/populate.sql index 10611c647..4cb43fb5d 100644 --- a/jobservice/populate.sql +++ b/jobservice/populate.sql @@ -1,3 +1,4 @@ use registry; insert into replication_target (name, url, username, password) values ('test', 'http://10.117.171.31', 'admin', 'Harbor12345'); insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW()); +insert into replication_job (status, policy_id, repository, operation) value ('running', 1, 'library/whatever', 'transfer') diff --git a/models/replication_job.go b/models/replication_job.go index 80eb1b36b..4203705f1 100644 --- a/models/replication_job.go +++ b/models/replication_job.go @@ -1,16 +1,16 @@ /* - Copyright (c) 2016 VMware, Inc. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Copyright (c) 2016 VMware, Inc. All Rights Reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package models @@ -110,6 +110,7 @@ type RepTarget struct { Name string `orm:"column(name)" json:"name"` Username string `orm:"column(username)" json:"username"` Password string `orm:"column(password)" json:"password"` + Type int `orm:"column(target_type)" json:"type"` CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"` UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` }