diff --git a/api/jobs/replication.go b/api/jobs/replication.go index 4d955b7f3..3cd63206f 100644 --- a/api/jobs/replication.go +++ b/api/jobs/replication.go @@ -64,6 +64,33 @@ func (rj *ReplicationJob) Post() { } } +type RepActionReq struct { + PolicyID int64 `json:"policy_id"` + Action string `json:"action"` +} + +func (rj *ReplicationJob) HandleAction() { + var data RepActionReq + rj.DecodeJSONReq(&data) + //Currently only support stop action + if data.Action != "stop" { + log.Errorf("Unrecognized action: %s", data.Action) + rj.RenderError(http.StatusBadRequest, fmt.Sprintf("Unrecongized action: %s", data.Action)) + return + } + jobs, err := dao.GetRepJobToStop(data.PolicyID) + if err != nil { + log.Errorf("Failed to get jobs to stop, error: %v", err) + rj.RenderError(http.StatusInternalServerError, "Faild to get jobs to stop") + return + } + var jobIDList []int64 + for _, j := range jobs { + jobIDList = append(jobIDList, j.ID) + } + job.WorkerPool.StopJobs(jobIDList) +} + // calls the api from UI to get repo list func getRepoList(projectID int64) ([]string, error) { uiURL := os.Getenv("UI_URL") diff --git a/dao/base.go b/dao/base.go index fe1c5a0ed..57530ba0d 100644 --- a/dao/base.go +++ b/dao/base.go @@ -64,6 +64,7 @@ func GenerateRandomString() (string, error) { //InitDB initializes the database func InitDB() { + // orm.Debug = true orm.RegisterDriver("mysql", orm.DRMySQL) addr := os.Getenv("MYSQL_HOST") port := os.Getenv("MYSQL_PORT") diff --git a/dao/dao_test.go b/dao/dao_test.go index de7259a60..1019593c6 100644 --- a/dao/dao_test.go +++ b/dao/dao_test.go @@ -25,13 +25,13 @@ import ( "github.com/vmware/harbor/utils/log" ) -func execUpdate(o orm.Ormer, sql string, params interface{}) error { +func execUpdate(o orm.Ormer, sql string, params ...interface{}) error { p, err := o.Raw(sql).Prepare() if err != nil { return err } defer p.Close() - _, err = p.Exec(params) + _, err = p.Exec(params...) if err != nil { return err } @@ -95,6 +95,19 @@ func clearUp(username string) { o.Rollback() log.Error(err) } + + err = execUpdate(o, `delete from replication_job where id < 99`) + if err != nil { + log.Error(err) + } + err = execUpdate(o, `delete from replication_policy where id < 99`) + if err != nil { + log.Error(err) + } + err = execUpdate(o, `delete from replication_target where id < 99`) + if err != nil { + log.Error(err) + } o.Commit() } @@ -876,19 +889,23 @@ func TestAddRepJob(t *testing.T) { id, err := AddRepJob(job) if err != nil { t.Errorf("Error occurred in AddRepJob: %v", err) + return } else { jobID = id } j, err := GetRepJob(id) if err != nil { t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, id) + return } if j == nil { t.Errorf("Unable to find a job with id: %d", id) + return } if j.Status != models.JobPending || j.Repository != "library/ubuntu" || j.PolicyID != policyID || j.Operation != "transfer" { t.Errorf("Expected data of job, id: %d, Status: %s, Repository: library/ubuntu, PolicyID: %d, Operation: transfer, "+ "but in returned data:, Status: %s, Repository: %s, Operation: %s, PolicyID: %d", id, models.JobPending, policyID, j.Status, j.Repository, j.Operation, j.PolicyID) + return } } @@ -908,6 +925,11 @@ func TestUpdateRepJobStatus(t *testing.T) { if j.Status != models.JobFinished { t.Errorf("Job's status: %s, expected: %s, id: %d", j.Status, models.JobFinished, jobID) } + err = UpdateRepJobStatus(jobID, models.JobPending) + if err != nil { + t.Errorf("Error occured in UpdateRepJobStatus when update it back to status pending, error: %v, id: %d", err, jobID) + return + } } func TestGetRepPolicyByProject(t *testing.T) { @@ -961,6 +983,47 @@ func TestGetRepJobByPolicy(t *testing.T) { } } +func TestGetRepoJobToStop(t *testing.T) { + jobs := [...]models.RepJob{ + models.RepJob{ + Repository: "library/ubuntu", + PolicyID: policyID, + Operation: "transfer", + Status: models.JobRunning, + }, + models.RepJob{ + Repository: "library/ubuntu", + PolicyID: policyID, + Operation: "transfer", + Status: models.JobFinished, + }, + models.RepJob{ + Repository: "library/ubuntu", + PolicyID: policyID, + Operation: "transfer", + Status: models.JobCanceled, + }, + } + var err error + for _, j := range jobs { + _, err = AddRepJob(j) + if err != nil { + log.Errorf("Failed to add Job: %+v, error: %v", j, err) + return + } + } + res, err := GetRepJobToStop(policyID) + if err != nil { + log.Errorf("Failed to Get Jobs, error: %v", err) + return + } + //time.Sleep(15 * time.Second) + if len(res) != 2 { + log.Errorf("Expected length of stoppable jobs, expected:2, in fact: %d", len(res)) + return + } +} + func TestDeleteRepTarget(t *testing.T) { err := DeleteRepTarget(targetID) if err != nil { diff --git a/dao/replication_job.go b/dao/replication_job.go index 235ed03a3..7e58aaf27 100644 --- a/dao/replication_job.go +++ b/dao/replication_job.go @@ -102,11 +102,23 @@ func GetRepJob(id int64) (*models.RepJob, error) { return &j, err } func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) { - o := orm.NewOrm() var res []*models.RepJob - _, err := o.QueryTable("replication_job").Filter("policy_id", policyID).All(&res) + _, err := repJobPolicyIDQs(policyID).All(&res) return res, err } + +// GetRepJobToStop get jobs that are possibly being handled by workers of a certain policy. +func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) { + var res []*models.RepJob + _, err := repJobPolicyIDQs(policyID).Filter("status__in", models.JobPending, models.JobRunning).All(&res) + return res, err +} + +func repJobPolicyIDQs(policyID int64) orm.QuerySeter { + o := orm.NewOrm() + return o.QueryTable("replication_job").Filter("policy_id", policyID) +} + func DeleteRepJob(id int64) error { o := orm.NewOrm() _, err := o.Delete(&models.RepJob{ID: id}) diff --git a/job/statemachine.go b/job/statemachine.go index e1b4059cf..cd08be32b 100644 --- a/job/statemachine.go +++ b/job/statemachine.go @@ -39,12 +39,12 @@ type JobSM struct { // EnsterState transit the statemachine from the current state to the state in parameter. // It returns the next state the statemachine should tranit to. func (sm *JobSM) EnterState(s string) (string, error) { - log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s) + log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s) targets, ok := sm.Transitions[sm.CurrentState] _, exist := targets[s] _, isForced := sm.ForcedStates[s] if !exist && !isForced { - return "", fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s) + return "", fmt.Errorf("Job id: %d, transition from %s to %s does not exist!", sm.JobID, sm.CurrentState, s) } exitHandler, ok := sm.Handlers[sm.CurrentState] if ok { @@ -52,7 +52,7 @@ func (sm *JobSM) EnterState(s string) (string, error) { return "", err } } else { - log.Debugf("No handler found for state:%s, skip", sm.CurrentState) + log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState) } enterHandler, ok := sm.Handlers[s] var next string = models.JobContinue @@ -62,11 +62,11 @@ func (sm *JobSM) EnterState(s string) (string, error) { return "", err } } else { - log.Debugf("No handler found for state:%s, skip", s) + log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, s) } sm.PreviousState = sm.CurrentState sm.CurrentState = s - log.Debugf("Transition succeeded, current state: %s", s) + log.Debugf("Job id: %d, transition succeeded, current state: %s", sm.JobID, s) return next, nil } @@ -75,10 +75,10 @@ func (sm *JobSM) EnterState(s string) (string, error) { // will enter error state if there's more than one possible path when next state is "_continue" func (sm *JobSM) Start(s string) { n, err := sm.EnterState(s) - log.Debugf("next state from handler: %s", n) + log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n) for len(n) > 0 && err == nil { if d := sm.getDesiredState(); len(d) > 0 { - log.Debugf("Desired state: %s, will ignore the next state from handler") + log.Debugf("Job id: %d. Desired state: %s, will ignore the next state from handler", sm.JobID, d) n = d sm.setDesiredState("") continue @@ -87,19 +87,19 @@ func (sm *JobSM) Start(s string) { for n = range sm.Transitions[sm.CurrentState] { break } - log.Debugf("Continue to state: %s", n) + log.Debugf("Job id: %d, Continue to state: %s", sm.JobID, n) continue } if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 { - log.Errorf("Next state is continue but there are %d possible next states in transition table", len(sm.Transitions[sm.CurrentState])) + log.Errorf("Job id: %d, next state is continue but there are %d possible next states in transition table", sm.JobID, len(sm.Transitions[sm.CurrentState])) err = fmt.Errorf("Unable to continue") break } n, err = sm.EnterState(n) - log.Debugf("next state from handler: %s", n) + log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n) } if err != nil { - log.Warningf("The statemachin will enter error state due to error: %v", err) + log.Warningf("Job id: %d, the statemachin will enter error state due to error: %v", sm.JobID, err) sm.EnterState(models.JobError) } } @@ -121,8 +121,17 @@ func (sm *JobSM) RemoveTransition(from string, to string) { delete(sm.Transitions[from], to) } -func (sm *JobSM) Stop() { - sm.setDesiredState(models.JobStopped) +func (sm *JobSM) Stop(id int64) { + log.Debugf("Trying to stop the job: %d", id) + sm.lock.Lock() + defer sm.lock.Unlock() + //need to check if the sm switched to other job + if id == sm.JobID { + sm.desiredState = models.JobStopped + log.Debugf("Desired state of job %d is set to stopped", id) + } else { + log.Debugf("State machine has switched to job %d, so the action to stop job %d will be ignored", sm.JobID, id) + } } func (sm *JobSM) getDesiredState() string { @@ -141,14 +150,19 @@ func (sm *JobSM) Init() { sm.lock = &sync.Mutex{} sm.Handlers = make(map[string]StateHandler) sm.Transitions = make(map[string]map[string]struct{}) - + sm.ForcedStates = map[string]struct{}{ + models.JobError: struct{}{}, + models.JobStopped: struct{}{}, + models.JobCanceled: struct{}{}, + } } -func (sm *JobSM) Reset(jid int64) error { - sm.JobID = jid - sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning}) - sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError} - sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped} +func (sm *JobSM) Reset(jid int64) error { + //To ensure the new jobID is visible to the thread to stop the SM + sm.lock.Lock() + sm.JobID = jid + sm.desiredState = "" + sm.lock.Unlock() //init parms job, err := dao.GetRepJob(sm.JobID) @@ -172,7 +186,7 @@ func (sm *JobSM) Reset(jid int64) error { Operation: job.Operation, } if policy.Enabled == 0 { - //handler will cancel this job + //worker will cancel this job return nil } target, err := dao.GetRepTarget(policy.TargetID) @@ -188,16 +202,23 @@ func (sm *JobSM) Reset(jid int64) error { //init states handlers sm.Logger = utils.Logger{sm.JobID} sm.CurrentState = models.JobPending + + sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning}) + sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError} + sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped} + if sm.Parms.Operation == models.RepOpTransfer { /* sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger}) - //only handle on target for now + //only handle one target for now sm.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: DummyHandler{JobID: sm.JobID}, targetURL: sm.Parms.TargetURL, logger: sm.Logger}) sm.AddTransition("push-img", models.JobFinished, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished}) */ + if err = addImgOutTransition(sm); err != nil { return err } + } return nil } diff --git a/jobservice/router.go b/jobservice/router.go index d224d44ea..c173020b4 100644 --- a/jobservice/router.go +++ b/jobservice/router.go @@ -8,4 +8,5 @@ import ( func initRouters() { beego.Router("/api/jobs/replication", &api.ReplicationJob{}) + beego.Router("/api/jobs/replication/actions", &api.ReplicationJob{}, "post:HandleAction") } diff --git a/jobservice/stop.json b/jobservice/stop.json new file mode 100644 index 000000000..3f43e37da --- /dev/null +++ b/jobservice/stop.json @@ -0,0 +1,4 @@ +{ + "policy_id":1, + "action":"stop" +}