diff --git a/src/common/dao/base.go b/src/common/dao/base.go index a22a338e5..c5fd00471 100644 --- a/src/common/dao/base.go +++ b/src/common/dao/base.go @@ -133,6 +133,16 @@ func paginateForRawSQL(sql string, limit, offset int64) string { return fmt.Sprintf("%s limit %d offset %d", sql, limit, offset) } +func paginateForQuerySetter(qs orm.QuerySeter, page, size int64) orm.QuerySeter { + if size > 0 { + qs = qs.Limit(size) + if page > 0 { + qs = qs.Offset((page - 1) * size) + } + } + return qs +} + //Escape .. func Escape(str string) string { str = strings.Replace(str, `%`, `\%`, -1) diff --git a/src/common/dao/dao_test.go b/src/common/dao/dao_test.go index 238b33505..c65900f1b 100644 --- a/src/common/dao/dao_test.go +++ b/src/common/dao/dao_test.go @@ -1145,45 +1145,74 @@ func TestGetRepPolicyByProject(t *testing.T) { } } -func TestGetRepJobByPolicy(t *testing.T) { - jobs, err := GetRepJobByPolicy(999) - if err != nil { - t.Errorf("Error occurred in GetRepJobByPolicy: %v, policy ID: %d", err, 999) - return - } - if len(jobs) > 0 { - t.Errorf("Unexpected length of jobs, expected: 0, in fact: %d", len(jobs)) - return - } - jobs, err = GetRepJobByPolicy(policyID) - if err != nil { - t.Errorf("Error occurred in GetRepJobByPolicy: %v, policy ID: %d", err, policyID) - return - } - if len(jobs) != 1 { - t.Errorf("Unexpected length of jobs, expected: 1, in fact: %d", len(jobs)) - return - } - if jobs[0].ID != jobID { - t.Errorf("Unexpected job ID in the result, expected: %d, in fact: %d", jobID, jobs[0].ID) - return - } -} +func TestGetRepJobs(t *testing.T) { + var policyID int64 = 10000 + repository := "repository_for_test_get_rep_jobs" + operation := "operation_for_test" + status := "status_for_test" + now := time.Now().Add(1 * time.Minute) + id, err := AddRepJob(models.RepJob{ + PolicyID: policyID, + Repository: repository, + Operation: operation, + Status: status, + CreationTime: now, + UpdateTime: now, + }) + require.Nil(t, err) + defer DeleteRepJob(id) -func TestFilterRepJobs(t *testing.T) { - jobs, _, err := FilterRepJobs(policyID, "", []string{}, nil, nil, 1000, 0) - if err != nil { - t.Errorf("Error occurred in FilterRepJobs: %v, policy ID: %d", err, policyID) - return - } - if len(jobs) != 1 { - t.Errorf("Unexpected length of jobs, expected: 1, in fact: %d", len(jobs)) - return - } - if jobs[0].ID != jobID { - t.Errorf("Unexpected job ID in the result, expected: %d, in fact: %d", jobID, jobs[0].ID) - return + // no query + jobs, err := GetRepJobs() + require.Nil(t, err) + found := false + for _, job := range jobs { + if job.ID == id { + found = true + break + } } + assert.True(t, found) + + // query by policy ID + jobs, err = GetRepJobs(&models.RepJobQuery{ + PolicyID: policyID, + }) + require.Nil(t, err) + require.Equal(t, 1, len(jobs)) + assert.Equal(t, id, jobs[0].ID) + + // query by repository + jobs, err = GetRepJobs(&models.RepJobQuery{ + Repository: repository, + }) + require.Nil(t, err) + require.Equal(t, 1, len(jobs)) + assert.Equal(t, id, jobs[0].ID) + + // query by operation + jobs, err = GetRepJobs(&models.RepJobQuery{ + Operations: []string{operation}, + }) + require.Nil(t, err) + require.Equal(t, 1, len(jobs)) + assert.Equal(t, id, jobs[0].ID) + + // query by status + jobs, err = GetRepJobs(&models.RepJobQuery{ + Statuses: []string{status}, + }) + require.Nil(t, err) + require.Equal(t, 1, len(jobs)) + assert.Equal(t, id, jobs[0].ID) + + // query by creation time + jobs, err = GetRepJobs(&models.RepJobQuery{ + StartTime: &now, + }) + require.Nil(t, err) + require.Equal(t, 1, len(jobs)) + assert.Equal(t, id, jobs[0].ID) } func TestDeleteRepJob(t *testing.T) { @@ -1204,57 +1233,6 @@ func TestDeleteRepJob(t *testing.T) { } } -func TestGetRepoJobToStop(t *testing.T) { - jobs := [...]models.RepJob{ - models.RepJob{ - Repository: "library/ubuntu", - PolicyID: policyID, - Operation: "transfer", - Status: models.JobRunning, - }, - models.RepJob{ - Repository: "library/ubuntu", - PolicyID: policyID, - Operation: "transfer", - Status: models.JobFinished, - }, - models.RepJob{ - Repository: "library/ubuntu", - PolicyID: policyID, - Operation: "transfer", - Status: models.JobCanceled, - }, - } - var err error - var i int64 - var ids []int64 - for _, j := range jobs { - i, err = AddRepJob(j) - ids = append(ids, i) - if err != nil { - log.Errorf("Failed to add Job: %+v, error: %v", j, err) - return - } - } - res, err := GetRepJobToStop(policyID) - if err != nil { - log.Errorf("Failed to Get Jobs, error: %v", err) - return - } - //time.Sleep(15 * time.Second) - if len(res) != 1 { - log.Errorf("Expected length of stoppable jobs, expected:1, in fact: %d", len(res)) - return - } - for _, id := range ids { - err = DeleteRepJob(id) - if err != nil { - log.Errorf("Failed to delete job, id: %d, error: %v", id, err) - return - } - } -} - func TestDeleteRepTarget(t *testing.T) { err := DeleteRepTarget(targetID) if err != nil { @@ -1309,77 +1287,6 @@ func TestDeleteRepPolicy(t *testing.T) { } } -func TestResetRepJobs(t *testing.T) { - - job1 := models.RepJob{ - Repository: "library/ubuntua", - PolicyID: policyID, - Operation: "transfer", - Status: models.JobRunning, - } - job2 := models.RepJob{ - Repository: "library/ubuntub", - PolicyID: policyID, - Operation: "transfer", - Status: models.JobCanceled, - } - id1, err := AddRepJob(job1) - if err != nil { - t.Errorf("Failed to add job: %+v, error: %v", job1, err) - return - } - id2, err := AddRepJob(job2) - if err != nil { - t.Errorf("Failed to add job: %+v, error: %v", job2, err) - return - } - err = ResetRunningJobs() - if err != nil { - t.Errorf("Failed to reset running jobs, error: %v", err) - } - j1, err := GetRepJob(id1) - if err != nil { - t.Errorf("Failed to get rep job, id: %d, error: %v", id1, err) - return - } - if j1.Status != models.JobPending { - t.Errorf("The rep job: %d, status should be Pending, but infact: %s", id1, j1.Status) - return - } - j2, err := GetRepJob(id2) - if err != nil { - t.Errorf("Failed to get rep job, id: %d, error: %v", id2, err) - return - } - if j2.Status == models.JobPending { - t.Errorf("The rep job: %d, status should be Canceled, but infact: %s", id2, j2.Status) - return - } -} - -func TestGetJobByStatus(t *testing.T) { - r1, err := GetRepJobByStatus(models.JobPending, models.JobRunning) - if err != nil { - t.Errorf("Failed to run GetRepJobByStatus, error: %v", err) - } - if len(r1) != 1 { - t.Errorf("Unexpected length of result, expected 1, but in fact:%d", len(r1)) - return - } - - r2, err := GetRepJobByStatus(models.JobPending, models.JobCanceled) - if err != nil { - t.Errorf("Failed to run GetRepJobByStatus, error: %v", err) - } - if len(r2) != 2 { - t.Errorf("Unexpected length of result, expected 2, but in fact:%d", len(r2)) - return - } - for _, j := range r2 { - DeleteRepJob(j.ID) - } -} - func TestGetOrmer(t *testing.T) { o := GetOrmer() if o == nil { diff --git a/src/common/dao/replication_job.go b/src/common/dao/replication_job.go index 9aa3634bd..51023da3e 100644 --- a/src/common/dao/replication_job.go +++ b/src/common/dao/replication_job.go @@ -15,7 +15,6 @@ package dao import ( - "fmt" "time" "strings" @@ -302,74 +301,58 @@ func GetRepJob(id int64) (*models.RepJob, error) { return &j, nil } -// GetRepJobByPolicy ... -func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) { - var res []*models.RepJob - _, err := repJobPolicyIDQs(policyID).All(&res) - genTagListForJob(res...) - return res, err +// GetTotalCountOfRepJobs ... +func GetTotalCountOfRepJobs(query ...*models.RepJobQuery) (int64, error) { + qs := repJobQueryConditions(query...) + return qs.Count() } -// FilterRepJobs ... -func FilterRepJobs(policyID int64, repository string, status []string, startTime, - endTime *time.Time, limit, offset int64, operations ...string) ([]*models.RepJob, int64, error) { - +// GetRepJobs ... +func GetRepJobs(query ...*models.RepJobQuery) ([]*models.RepJob, error) { jobs := []*models.RepJob{} - qs := GetOrmer().QueryTable(new(models.RepJob)) - - if policyID != 0 { - qs = qs.Filter("PolicyID", policyID) - } - if len(repository) != 0 { - qs = qs.Filter("Repository__icontains", repository) - } - if len(status) != 0 { - qs = qs.Filter("Status__in", status) - } - if startTime != nil { - qs = qs.Filter("CreationTime__gte", startTime) - } - if endTime != nil { - qs = qs.Filter("CreationTime__lte", endTime) - } - if len(operations) != 0 { - qs = qs.Filter("Operation__in", operations) - } - - total, err := qs.Count() - if err != nil { - return jobs, 0, err + qs := repJobQueryConditions(query...) + if len(query) > 0 && query[0] != nil { + qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size) } qs = qs.OrderBy("-UpdateTime") - _, err = qs.Limit(limit).Offset(offset).All(&jobs) - if err != nil { - return jobs, 0, err + if _, err := qs.All(&jobs); err != nil { + return jobs, err } genTagListForJob(jobs...) - return jobs, total, nil + return jobs, nil } -// GetRepJobToStop get jobs that are possibly being handled by workers of a certain policy. -func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) { - var res []*models.RepJob - _, err := repJobPolicyIDQs(policyID).Filter("status__in", - models.JobPending, models.JobRunning, models.JobRetrying).All(&res) - genTagListForJob(res...) - return res, err -} +func repJobQueryConditions(query ...*models.RepJobQuery) orm.QuerySeter { + qs := GetOrmer().QueryTable(new(models.RepJob)) + if len(query) == 0 || query[0] == nil { + return qs + } -func repJobQs() orm.QuerySeter { - o := GetOrmer() - return o.QueryTable("replication_job") -} - -func repJobPolicyIDQs(policyID int64) orm.QuerySeter { - return repJobQs().Filter("policy_id", policyID) + q := query[0] + if q.PolicyID != 0 { + qs = qs.Filter("PolicyID", q.PolicyID) + } + if len(q.Repository) > 0 { + qs = qs.Filter("Repository__icontains", q.Repository) + } + if len(q.Statuses) > 0 { + qs = qs.Filter("Status__in", q.Statuses) + } + if len(q.Operations) > 0 { + qs = qs.Filter("Operation__in", q.Operations) + } + if q.StartTime != nil { + qs = qs.Filter("CreationTime__gte", q.StartTime) + } + if q.EndTime != nil { + qs = qs.Filter("CreationTime__lte", q.EndTime) + } + return qs } // DeleteRepJob ... @@ -408,31 +391,6 @@ func SetRepJobUUID(id int64, uuid string) error { return err } -// ResetRunningJobs update all running jobs status to pending, including replication jobs and scan jobs. -func ResetRunningJobs() error { - o := GetOrmer() - sql := fmt.Sprintf("update replication_job set status = '%s', update_time = ? where status = '%s'", models.JobPending, models.JobRunning) - _, err := o.Raw(sql, time.Now()).Exec() - if err != nil { - return err - } - sql = fmt.Sprintf("update %s set status = '%s', update_time = ? where status = '%s'", models.ScanJobTable, models.JobPending, models.JobRunning) - _, err = o.Raw(sql, time.Now()).Exec() - return err -} - -// GetRepJobByStatus get jobs of certain statuses -func GetRepJobByStatus(status ...string) ([]*models.RepJob, error) { - var res []*models.RepJob - var t []interface{} - for _, s := range status { - t = append(t, interface{}(s)) - } - _, err := repJobQs().Filter("status__in", t...).All(&res) - genTagListForJob(res...) - return res, err -} - func genTagListForJob(jobs ...*models.RepJob) { for _, j := range jobs { if len(j.Tags) > 0 { diff --git a/src/common/models/replication_job.go b/src/common/models/replication_job.go index 8843c7470..08fda9da8 100644 --- a/src/common/models/replication_job.go +++ b/src/common/models/replication_job.go @@ -124,3 +124,14 @@ func (r *RepJob) TableName() string { func (r *RepPolicy) TableName() string { return RepPolicyTable } + +// RepJobQuery holds query conditions for replication job +type RepJobQuery struct { + PolicyID int64 + Repository string + Statuses []string + Operations []string + StartTime *time.Time + EndTime *time.Time + Pagination +} diff --git a/src/replication/trigger/schedule.go b/src/replication/trigger/schedule.go index eeefbd159..43891bff5 100644 --- a/src/replication/trigger/schedule.go +++ b/src/replication/trigger/schedule.go @@ -79,7 +79,10 @@ func (st *ScheduleTrigger) Setup() error { //Unset is the implementation of same method defined in Trigger interface func (st *ScheduleTrigger) Unset() error { - jobs, _, err := dao.FilterRepJobs(st.params.PolicyID, "", nil, nil, nil, 0, 0, models.RepOpSchedule) + jobs, err := dao.GetRepJobs(&models.RepJobQuery{ + PolicyID: st.params.PolicyID, + Operations: []string{models.RepOpSchedule}, + }) if err != nil { return err } diff --git a/src/ui/api/replication.go b/src/ui/api/replication.go index c920520bf..87234807a 100644 --- a/src/ui/api/replication.go +++ b/src/ui/api/replication.go @@ -63,9 +63,10 @@ func (r *ReplicationAPI) Post() { return } - _, count, err := dao.FilterRepJobs(replication.PolicyID, "", - []string{models.JobRunning, models.JobPending}, nil, nil, 1, 0, - models.RepOpTransfer, models.RepOpDelete) + count, err := dao.GetTotalCountOfRepJobs(&models.RepJobQuery{ + PolicyID: replication.PolicyID, + Statuses: []string{models.RepOpTransfer, models.RepOpDelete}, + }) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to filter jobs of policy %d: %v", replication.PolicyID, err)) diff --git a/src/ui/api/replication_job.go b/src/ui/api/replication_job.go index c4ea967ca..ebc0282ab 100644 --- a/src/ui/api/replication_job.go +++ b/src/ui/api/replication_job.go @@ -81,10 +81,16 @@ func (ra *RepJobAPI) List() { return } - repository := ra.GetString("repository") - statuses := ra.GetStrings("status") + query := &models.RepJobQuery{ + PolicyID: policyID, + // hide the schedule job, the schedule job is used to trigger replication + // for scheduled policy + Operations: []string{models.RepOpTransfer, models.RepOpDelete}, + } + + query.Repository = ra.GetString("repository") + query.Statuses = ra.GetStrings("status") - var startTime *time.Time startTimeStr := ra.GetString("start_time") if len(startTimeStr) != 0 { i, err := strconv.ParseInt(startTimeStr, 10, 64) @@ -92,10 +98,9 @@ func (ra *RepJobAPI) List() { ra.CustomAbort(http.StatusBadRequest, "invalid start_time") } t := time.Unix(i, 0) - startTime = &t + query.StartTime = &t } - var endTime *time.Time endTimeStr := ra.GetString("end_time") if len(endTimeStr) != 0 { i, err := strconv.ParseInt(endTimeStr, 10, 64) @@ -103,20 +108,23 @@ func (ra *RepJobAPI) List() { ra.CustomAbort(http.StatusBadRequest, "invalid end_time") } t := time.Unix(i, 0) - endTime = &t + query.EndTime = &t } - page, pageSize := ra.GetPaginationParams() + query.Page, query.Size = ra.GetPaginationParams() - jobs, total, err := dao.FilterRepJobs(policyID, repository, statuses, - startTime, endTime, pageSize, pageSize*(page-1)) + total, err := dao.GetTotalCountOfRepJobs(query) if err != nil { - log.Errorf("failed to filter jobs according policy ID %d, repository %s, status %v, start time %v, end time %v: %v", - policyID, repository, statuses, startTime, endTime, err) - ra.CustomAbort(http.StatusInternalServerError, "") + ra.HandleInternalServerError(fmt.Sprintf("failed to get total count of repository jobs of policy %d: %v", policyID, err)) + return + } + jobs, err := dao.GetRepJobs(query) + if err != nil { + ra.HandleInternalServerError(fmt.Sprintf("failed to get repository jobs, query: %v :%v", query, err)) + return } - ra.SetPaginationHeader(total, page, pageSize) + ra.SetPaginationHeader(total, query.Page, query.Size) ra.Data["json"] = jobs ra.ServeJSON() @@ -206,7 +214,10 @@ func (ra *RepJobAPI) StopJobs() { ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID)) } - jobs, err := dao.GetRepJobByPolicy(policy.ID) + jobs, err := dao.GetRepJobs(&models.RepJobQuery{ + PolicyID: policy.ID, + Operations: []string{models.RepOpTransfer, models.RepOpDelete}, + }) if err != nil { ra.HandleInternalServerError(fmt.Sprintf("failed to list jobs of policy %d: %v", policy.ID, err)) return diff --git a/src/ui/api/replication_policy.go b/src/ui/api/replication_policy.go index a182e1d2e..1c16db388 100644 --- a/src/ui/api/replication_policy.go +++ b/src/ui/api/replication_policy.go @@ -249,8 +249,10 @@ func (pa *RepPolicyAPI) Delete() { pa.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound)) } - _, count, err := dao.FilterRepJobs(id, "", - []string{models.JobRunning, models.JobRetrying, models.JobPending}, nil, nil, 1, 0) + count, err := dao.GetTotalCountOfRepJobs(&models.RepJobQuery{ + PolicyID: id, + Statuses: []string{models.JobRunning, models.JobRetrying, models.JobPending}, + }) if err != nil { log.Errorf("failed to filter jobs of policy %d: %v", id, err) pa.CustomAbort(http.StatusInternalServerError, "") @@ -303,7 +305,10 @@ func convertFromRepPolicy(projectMgr promgr.ProjectManager, policy rep_models.Re } // TODO call the method from replication controller - _, errJobCount, err := dao.FilterRepJobs(policy.ID, "", []string{"error"}, nil, nil, 0, 0) + errJobCount, err := dao.GetTotalCountOfRepJobs(&models.RepJobQuery{ + PolicyID: policy.ID, + Statuses: []string{models.JobError}, + }) if err != nil { return nil, err }