Hide schedule job when listing replication jobs

This commit is contained in:
Wenkai Yin 2018-04-03 01:11:55 +08:00
parent 44d63fe935
commit 500651a5a1
8 changed files with 165 additions and 259 deletions

View File

@ -133,6 +133,16 @@ func paginateForRawSQL(sql string, limit, offset int64) string {
return fmt.Sprintf("%s limit %d offset %d", sql, limit, offset)
}
func paginateForQuerySetter(qs orm.QuerySeter, page, size int64) orm.QuerySeter {
if size > 0 {
qs = qs.Limit(size)
if page > 0 {
qs = qs.Offset((page - 1) * size)
}
}
return qs
}
//Escape ..
func Escape(str string) string {
str = strings.Replace(str, `%`, `\%`, -1)

View File

@ -1145,45 +1145,74 @@ func TestGetRepPolicyByProject(t *testing.T) {
}
}
func TestGetRepJobByPolicy(t *testing.T) {
jobs, err := GetRepJobByPolicy(999)
if err != nil {
t.Errorf("Error occurred in GetRepJobByPolicy: %v, policy ID: %d", err, 999)
return
}
if len(jobs) > 0 {
t.Errorf("Unexpected length of jobs, expected: 0, in fact: %d", len(jobs))
return
}
jobs, err = GetRepJobByPolicy(policyID)
if err != nil {
t.Errorf("Error occurred in GetRepJobByPolicy: %v, policy ID: %d", err, policyID)
return
}
if len(jobs) != 1 {
t.Errorf("Unexpected length of jobs, expected: 1, in fact: %d", len(jobs))
return
}
if jobs[0].ID != jobID {
t.Errorf("Unexpected job ID in the result, expected: %d, in fact: %d", jobID, jobs[0].ID)
return
}
}
func TestGetRepJobs(t *testing.T) {
var policyID int64 = 10000
repository := "repository_for_test_get_rep_jobs"
operation := "operation_for_test"
status := "status_for_test"
now := time.Now().Add(1 * time.Minute)
id, err := AddRepJob(models.RepJob{
PolicyID: policyID,
Repository: repository,
Operation: operation,
Status: status,
CreationTime: now,
UpdateTime: now,
})
require.Nil(t, err)
defer DeleteRepJob(id)
func TestFilterRepJobs(t *testing.T) {
jobs, _, err := FilterRepJobs(policyID, "", []string{}, nil, nil, 1000, 0)
if err != nil {
t.Errorf("Error occurred in FilterRepJobs: %v, policy ID: %d", err, policyID)
return
}
if len(jobs) != 1 {
t.Errorf("Unexpected length of jobs, expected: 1, in fact: %d", len(jobs))
return
}
if jobs[0].ID != jobID {
t.Errorf("Unexpected job ID in the result, expected: %d, in fact: %d", jobID, jobs[0].ID)
return
// no query
jobs, err := GetRepJobs()
require.Nil(t, err)
found := false
for _, job := range jobs {
if job.ID == id {
found = true
break
}
}
assert.True(t, found)
// query by policy ID
jobs, err = GetRepJobs(&models.RepJobQuery{
PolicyID: policyID,
})
require.Nil(t, err)
require.Equal(t, 1, len(jobs))
assert.Equal(t, id, jobs[0].ID)
// query by repository
jobs, err = GetRepJobs(&models.RepJobQuery{
Repository: repository,
})
require.Nil(t, err)
require.Equal(t, 1, len(jobs))
assert.Equal(t, id, jobs[0].ID)
// query by operation
jobs, err = GetRepJobs(&models.RepJobQuery{
Operations: []string{operation},
})
require.Nil(t, err)
require.Equal(t, 1, len(jobs))
assert.Equal(t, id, jobs[0].ID)
// query by status
jobs, err = GetRepJobs(&models.RepJobQuery{
Statuses: []string{status},
})
require.Nil(t, err)
require.Equal(t, 1, len(jobs))
assert.Equal(t, id, jobs[0].ID)
// query by creation time
jobs, err = GetRepJobs(&models.RepJobQuery{
StartTime: &now,
})
require.Nil(t, err)
require.Equal(t, 1, len(jobs))
assert.Equal(t, id, jobs[0].ID)
}
func TestDeleteRepJob(t *testing.T) {
@ -1204,57 +1233,6 @@ func TestDeleteRepJob(t *testing.T) {
}
}
func TestGetRepoJobToStop(t *testing.T) {
jobs := [...]models.RepJob{
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobRunning,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobFinished,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobCanceled,
},
}
var err error
var i int64
var ids []int64
for _, j := range jobs {
i, err = AddRepJob(j)
ids = append(ids, i)
if err != nil {
log.Errorf("Failed to add Job: %+v, error: %v", j, err)
return
}
}
res, err := GetRepJobToStop(policyID)
if err != nil {
log.Errorf("Failed to Get Jobs, error: %v", err)
return
}
//time.Sleep(15 * time.Second)
if len(res) != 1 {
log.Errorf("Expected length of stoppable jobs, expected:1, in fact: %d", len(res))
return
}
for _, id := range ids {
err = DeleteRepJob(id)
if err != nil {
log.Errorf("Failed to delete job, id: %d, error: %v", id, err)
return
}
}
}
func TestDeleteRepTarget(t *testing.T) {
err := DeleteRepTarget(targetID)
if err != nil {
@ -1309,77 +1287,6 @@ func TestDeleteRepPolicy(t *testing.T) {
}
}
func TestResetRepJobs(t *testing.T) {
job1 := models.RepJob{
Repository: "library/ubuntua",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobRunning,
}
job2 := models.RepJob{
Repository: "library/ubuntub",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobCanceled,
}
id1, err := AddRepJob(job1)
if err != nil {
t.Errorf("Failed to add job: %+v, error: %v", job1, err)
return
}
id2, err := AddRepJob(job2)
if err != nil {
t.Errorf("Failed to add job: %+v, error: %v", job2, err)
return
}
err = ResetRunningJobs()
if err != nil {
t.Errorf("Failed to reset running jobs, error: %v", err)
}
j1, err := GetRepJob(id1)
if err != nil {
t.Errorf("Failed to get rep job, id: %d, error: %v", id1, err)
return
}
if j1.Status != models.JobPending {
t.Errorf("The rep job: %d, status should be Pending, but infact: %s", id1, j1.Status)
return
}
j2, err := GetRepJob(id2)
if err != nil {
t.Errorf("Failed to get rep job, id: %d, error: %v", id2, err)
return
}
if j2.Status == models.JobPending {
t.Errorf("The rep job: %d, status should be Canceled, but infact: %s", id2, j2.Status)
return
}
}
func TestGetJobByStatus(t *testing.T) {
r1, err := GetRepJobByStatus(models.JobPending, models.JobRunning)
if err != nil {
t.Errorf("Failed to run GetRepJobByStatus, error: %v", err)
}
if len(r1) != 1 {
t.Errorf("Unexpected length of result, expected 1, but in fact:%d", len(r1))
return
}
r2, err := GetRepJobByStatus(models.JobPending, models.JobCanceled)
if err != nil {
t.Errorf("Failed to run GetRepJobByStatus, error: %v", err)
}
if len(r2) != 2 {
t.Errorf("Unexpected length of result, expected 2, but in fact:%d", len(r2))
return
}
for _, j := range r2 {
DeleteRepJob(j.ID)
}
}
func TestGetOrmer(t *testing.T) {
o := GetOrmer()
if o == nil {

View File

@ -15,7 +15,6 @@
package dao
import (
"fmt"
"time"
"strings"
@ -302,74 +301,58 @@ func GetRepJob(id int64) (*models.RepJob, error) {
return &j, nil
}
// GetRepJobByPolicy ...
func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) {
var res []*models.RepJob
_, err := repJobPolicyIDQs(policyID).All(&res)
genTagListForJob(res...)
return res, err
// GetTotalCountOfRepJobs ...
func GetTotalCountOfRepJobs(query ...*models.RepJobQuery) (int64, error) {
qs := repJobQueryConditions(query...)
return qs.Count()
}
// FilterRepJobs ...
func FilterRepJobs(policyID int64, repository string, status []string, startTime,
endTime *time.Time, limit, offset int64, operations ...string) ([]*models.RepJob, int64, error) {
// GetRepJobs ...
func GetRepJobs(query ...*models.RepJobQuery) ([]*models.RepJob, error) {
jobs := []*models.RepJob{}
qs := GetOrmer().QueryTable(new(models.RepJob))
if policyID != 0 {
qs = qs.Filter("PolicyID", policyID)
}
if len(repository) != 0 {
qs = qs.Filter("Repository__icontains", repository)
}
if len(status) != 0 {
qs = qs.Filter("Status__in", status)
}
if startTime != nil {
qs = qs.Filter("CreationTime__gte", startTime)
}
if endTime != nil {
qs = qs.Filter("CreationTime__lte", endTime)
}
if len(operations) != 0 {
qs = qs.Filter("Operation__in", operations)
}
total, err := qs.Count()
if err != nil {
return jobs, 0, err
qs := repJobQueryConditions(query...)
if len(query) > 0 && query[0] != nil {
qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size)
}
qs = qs.OrderBy("-UpdateTime")
_, err = qs.Limit(limit).Offset(offset).All(&jobs)
if err != nil {
return jobs, 0, err
if _, err := qs.All(&jobs); err != nil {
return jobs, err
}
genTagListForJob(jobs...)
return jobs, total, nil
return jobs, nil
}
// GetRepJobToStop get jobs that are possibly being handled by workers of a certain policy.
func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) {
var res []*models.RepJob
_, err := repJobPolicyIDQs(policyID).Filter("status__in",
models.JobPending, models.JobRunning, models.JobRetrying).All(&res)
genTagListForJob(res...)
return res, err
}
func repJobQueryConditions(query ...*models.RepJobQuery) orm.QuerySeter {
qs := GetOrmer().QueryTable(new(models.RepJob))
if len(query) == 0 || query[0] == nil {
return qs
}
func repJobQs() orm.QuerySeter {
o := GetOrmer()
return o.QueryTable("replication_job")
}
func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
return repJobQs().Filter("policy_id", policyID)
q := query[0]
if q.PolicyID != 0 {
qs = qs.Filter("PolicyID", q.PolicyID)
}
if len(q.Repository) > 0 {
qs = qs.Filter("Repository__icontains", q.Repository)
}
if len(q.Statuses) > 0 {
qs = qs.Filter("Status__in", q.Statuses)
}
if len(q.Operations) > 0 {
qs = qs.Filter("Operation__in", q.Operations)
}
if q.StartTime != nil {
qs = qs.Filter("CreationTime__gte", q.StartTime)
}
if q.EndTime != nil {
qs = qs.Filter("CreationTime__lte", q.EndTime)
}
return qs
}
// DeleteRepJob ...
@ -408,31 +391,6 @@ func SetRepJobUUID(id int64, uuid string) error {
return err
}
// ResetRunningJobs update all running jobs status to pending, including replication jobs and scan jobs.
func ResetRunningJobs() error {
o := GetOrmer()
sql := fmt.Sprintf("update replication_job set status = '%s', update_time = ? where status = '%s'", models.JobPending, models.JobRunning)
_, err := o.Raw(sql, time.Now()).Exec()
if err != nil {
return err
}
sql = fmt.Sprintf("update %s set status = '%s', update_time = ? where status = '%s'", models.ScanJobTable, models.JobPending, models.JobRunning)
_, err = o.Raw(sql, time.Now()).Exec()
return err
}
// GetRepJobByStatus get jobs of certain statuses
func GetRepJobByStatus(status ...string) ([]*models.RepJob, error) {
var res []*models.RepJob
var t []interface{}
for _, s := range status {
t = append(t, interface{}(s))
}
_, err := repJobQs().Filter("status__in", t...).All(&res)
genTagListForJob(res...)
return res, err
}
func genTagListForJob(jobs ...*models.RepJob) {
for _, j := range jobs {
if len(j.Tags) > 0 {

View File

@ -124,3 +124,14 @@ func (r *RepJob) TableName() string {
func (r *RepPolicy) TableName() string {
return RepPolicyTable
}
// RepJobQuery holds query conditions for replication job
type RepJobQuery struct {
PolicyID int64
Repository string
Statuses []string
Operations []string
StartTime *time.Time
EndTime *time.Time
Pagination
}

View File

@ -79,7 +79,10 @@ func (st *ScheduleTrigger) Setup() error {
//Unset is the implementation of same method defined in Trigger interface
func (st *ScheduleTrigger) Unset() error {
jobs, _, err := dao.FilterRepJobs(st.params.PolicyID, "", nil, nil, nil, 0, 0, models.RepOpSchedule)
jobs, err := dao.GetRepJobs(&models.RepJobQuery{
PolicyID: st.params.PolicyID,
Operations: []string{models.RepOpSchedule},
})
if err != nil {
return err
}

View File

@ -63,9 +63,10 @@ func (r *ReplicationAPI) Post() {
return
}
_, count, err := dao.FilterRepJobs(replication.PolicyID, "",
[]string{models.JobRunning, models.JobPending}, nil, nil, 1, 0,
models.RepOpTransfer, models.RepOpDelete)
count, err := dao.GetTotalCountOfRepJobs(&models.RepJobQuery{
PolicyID: replication.PolicyID,
Statuses: []string{models.RepOpTransfer, models.RepOpDelete},
})
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to filter jobs of policy %d: %v",
replication.PolicyID, err))

View File

@ -81,10 +81,16 @@ func (ra *RepJobAPI) List() {
return
}
repository := ra.GetString("repository")
statuses := ra.GetStrings("status")
query := &models.RepJobQuery{
PolicyID: policyID,
// hide the schedule job, the schedule job is used to trigger replication
// for scheduled policy
Operations: []string{models.RepOpTransfer, models.RepOpDelete},
}
query.Repository = ra.GetString("repository")
query.Statuses = ra.GetStrings("status")
var startTime *time.Time
startTimeStr := ra.GetString("start_time")
if len(startTimeStr) != 0 {
i, err := strconv.ParseInt(startTimeStr, 10, 64)
@ -92,10 +98,9 @@ func (ra *RepJobAPI) List() {
ra.CustomAbort(http.StatusBadRequest, "invalid start_time")
}
t := time.Unix(i, 0)
startTime = &t
query.StartTime = &t
}
var endTime *time.Time
endTimeStr := ra.GetString("end_time")
if len(endTimeStr) != 0 {
i, err := strconv.ParseInt(endTimeStr, 10, 64)
@ -103,20 +108,23 @@ func (ra *RepJobAPI) List() {
ra.CustomAbort(http.StatusBadRequest, "invalid end_time")
}
t := time.Unix(i, 0)
endTime = &t
query.EndTime = &t
}
page, pageSize := ra.GetPaginationParams()
query.Page, query.Size = ra.GetPaginationParams()
jobs, total, err := dao.FilterRepJobs(policyID, repository, statuses,
startTime, endTime, pageSize, pageSize*(page-1))
total, err := dao.GetTotalCountOfRepJobs(query)
if err != nil {
log.Errorf("failed to filter jobs according policy ID %d, repository %s, status %v, start time %v, end time %v: %v",
policyID, repository, statuses, startTime, endTime, err)
ra.CustomAbort(http.StatusInternalServerError, "")
ra.HandleInternalServerError(fmt.Sprintf("failed to get total count of repository jobs of policy %d: %v", policyID, err))
return
}
jobs, err := dao.GetRepJobs(query)
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to get repository jobs, query: %v :%v", query, err))
return
}
ra.SetPaginationHeader(total, page, pageSize)
ra.SetPaginationHeader(total, query.Page, query.Size)
ra.Data["json"] = jobs
ra.ServeJSON()
@ -206,7 +214,10 @@ func (ra *RepJobAPI) StopJobs() {
ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID))
}
jobs, err := dao.GetRepJobByPolicy(policy.ID)
jobs, err := dao.GetRepJobs(&models.RepJobQuery{
PolicyID: policy.ID,
Operations: []string{models.RepOpTransfer, models.RepOpDelete},
})
if err != nil {
ra.HandleInternalServerError(fmt.Sprintf("failed to list jobs of policy %d: %v", policy.ID, err))
return

View File

@ -249,8 +249,10 @@ func (pa *RepPolicyAPI) Delete() {
pa.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound))
}
_, count, err := dao.FilterRepJobs(id, "",
[]string{models.JobRunning, models.JobRetrying, models.JobPending}, nil, nil, 1, 0)
count, err := dao.GetTotalCountOfRepJobs(&models.RepJobQuery{
PolicyID: id,
Statuses: []string{models.JobRunning, models.JobRetrying, models.JobPending},
})
if err != nil {
log.Errorf("failed to filter jobs of policy %d: %v", id, err)
pa.CustomAbort(http.StatusInternalServerError, "")
@ -303,7 +305,10 @@ func convertFromRepPolicy(projectMgr promgr.ProjectManager, policy rep_models.Re
}
// TODO call the method from replication controller
_, errJobCount, err := dao.FilterRepJobs(policy.ID, "", []string{"error"}, nil, nil, 0, 0)
errJobCount, err := dao.GetTotalCountOfRepJobs(&models.RepJobQuery{
PolicyID: policy.ID,
Statuses: []string{models.JobError},
})
if err != nil {
return nil, err
}