From b12dc3b5d8ef7d418efdeafd4c9063e045f68d87 Mon Sep 17 00:00:00 2001 From: Daniel Jiang Date: Wed, 26 Sep 2018 22:38:55 +0800 Subject: [PATCH] Schedule "scan all" via jobservice This commit leverage the jobservice to trigger "scan all" and gets rid of the local scheduler to make the harbor-core container stateless. It keeps using the notifer mechanism to handle the configuration change. Signed-off-by: Daniel Jiang --- .../scan_policy_notitification_handler.go | 108 ------------------ ...scan_policy_notitification_handler_test.go | 73 ------------ .../task/replication/replication_task.go | 2 +- src/core/api/replication.go | 2 +- src/core/api/repository.go | 50 +++----- src/core/api/utils.go | 2 +- src/core/main.go | 6 +- .../notifier/config_watcher.go | 0 .../notifier/config_watcher_test.go | 0 .../notifier/notification_handler.go | 0 src/{common => core}/notifier/notifier.go | 0 .../notifier/notifier_test.go | 45 -------- .../scan_policy_notitification_handler.go | 104 +++++++++++++++++ ...scan_policy_notitification_handler_test.go | 16 +++ src/{common => core}/notifier/topics.go | 0 .../service/notifications/registry/handler.go | 2 +- src/core/utils/job.go | 82 ++++++------- src/core/utils/job_test.go | 50 ++++++++ src/core/utils/utils_test.go | 10 ++ src/replication/event/init.go | 2 +- src/replication/event/on_push_handler.go | 2 +- src/testing/job/mock_client.go | 56 +++++++++ 22 files changed, 296 insertions(+), 316 deletions(-) delete mode 100644 src/common/notifier/scan_policy_notitification_handler.go delete mode 100644 src/common/notifier/scan_policy_notitification_handler_test.go rename src/{common => core}/notifier/config_watcher.go (100%) rename src/{common => core}/notifier/config_watcher_test.go (100%) rename src/{common => core}/notifier/notification_handler.go (100%) rename src/{common => core}/notifier/notifier.go (100%) rename src/{common => core}/notifier/notifier_test.go (73%) create mode 100644 src/core/notifier/scan_policy_notitification_handler.go create mode 100644 src/core/notifier/scan_policy_notitification_handler_test.go rename src/{common => core}/notifier/topics.go (100%) create mode 100644 src/core/utils/job_test.go create mode 100644 src/testing/job/mock_client.go diff --git a/src/common/notifier/scan_policy_notitification_handler.go b/src/common/notifier/scan_policy_notitification_handler.go deleted file mode 100644 index 214da2267..000000000 --- a/src/common/notifier/scan_policy_notitification_handler.go +++ /dev/null @@ -1,108 +0,0 @@ -package notifier - -import ( - "errors" - "reflect" - - "fmt" - "time" - - "github.com/goharbor/harbor/src/common/scheduler" - "github.com/goharbor/harbor/src/common/scheduler/policy" - "github.com/goharbor/harbor/src/common/scheduler/task" -) - -const ( - // PolicyTypeDaily specify the policy type is "daily" - PolicyTypeDaily = "daily" - - // PolicyTypeNone specify the policy type is "none" - PolicyTypeNone = "none" - - alternatePolicy = "Alternate Policy" -) - -// ScanPolicyNotification is defined for pass the policy change data. -type ScanPolicyNotification struct { - // Type is used to keep the scan policy type: "none","daily" and "refresh". - Type string - - // DailyTime is used when the type is 'daily', the offset with UTC time 00:00. - DailyTime int64 -} - -// ScanPolicyNotificationHandler is defined to handle the changes of scanning -// policy. -type ScanPolicyNotificationHandler struct{} - -// IsStateful to indicate this handler is stateful. -func (s *ScanPolicyNotificationHandler) IsStateful() bool { - // Policy change should be done one by one. - return true -} - -// Handle the policy change notification. -func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error { - if value == nil { - return errors.New("ScanPolicyNotificationHandler can not handle nil value") - } - - if reflect.TypeOf(value).Kind() != reflect.Struct || - reflect.TypeOf(value).String() != "notifier.ScanPolicyNotification" { - return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type") - } - - notification := value.(ScanPolicyNotification) - - hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy) - if notification.Type == PolicyTypeDaily { - if !hasScheduled { - // Schedule a new policy. - return schedulePolicy(notification) - } - - // To check and compare if the related parameter is changed. - if pl := scheduler.DefaultScheduler.GetPolicy(alternatePolicy); pl != nil { - policyCandidate := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{ - Duration: 24 * time.Hour, - OffsetTime: notification.DailyTime, - }) - if !pl.Equal(policyCandidate) { - // Parameter changed. - // Unschedule policy. - if err := scheduler.DefaultScheduler.UnSchedule(alternatePolicy); err != nil { - return err - } - - // Schedule a new policy. - return schedulePolicy(notification) - } - // Same policy configuration, do nothing - return nil - } - - return errors.New("Inconsistent policy scheduling status") - } else if notification.Type == PolicyTypeNone { - if hasScheduled { - return scheduler.DefaultScheduler.UnSchedule(alternatePolicy) - } - } else { - return fmt.Errorf("Notification type %s is not supported", notification.Type) - } - - return nil -} - -// Schedule policy. -func schedulePolicy(notification ScanPolicyNotification) error { - schedulePolicy := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{ - Duration: 24 * time.Hour, - OffsetTime: notification.DailyTime, - }) - attachTask := task.NewScanAllTask() - if err := schedulePolicy.AttachTasks(attachTask); err != nil { - return err - } - - return scheduler.DefaultScheduler.Schedule(schedulePolicy) -} diff --git a/src/common/notifier/scan_policy_notitification_handler_test.go b/src/common/notifier/scan_policy_notitification_handler_test.go deleted file mode 100644 index 50a155858..000000000 --- a/src/common/notifier/scan_policy_notitification_handler_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package notifier - -import ( - "testing" - "time" - - "github.com/goharbor/harbor/src/common/scheduler" - "github.com/goharbor/harbor/src/common/scheduler/policy" -) - -var testingScheduler = scheduler.DefaultScheduler - -func TestScanPolicyNotificationHandler(t *testing.T) { - // Scheduler should be running. - testingScheduler.Start() - if !testingScheduler.IsRunning() { - t.Fatal("scheduler should be running") - } - - handler := &ScanPolicyNotificationHandler{} - if !handler.IsStateful() { - t.Fail() - } - - utcTime := time.Now().UTC().Unix() - notification := ScanPolicyNotification{"daily", utcTime + 3600} - if err := handler.Handle(notification); err != nil { - t.Fatal(err) - } - - if !testingScheduler.HasScheduled("Alternate Policy") { - t.Fatal("Handler does not work") - } - - // Policy parameter changed. - notification2 := ScanPolicyNotification{"daily", utcTime + 7200} - if err := handler.Handle(notification2); err != nil { - t.Fatal(err) - } - - if !testingScheduler.HasScheduled("Alternate Policy") { - t.Fatal("Handler does not work [2]") - } - pl := testingScheduler.GetPolicy("Alternate Policy") - if pl == nil { - t.Fail() - } - spl := pl.(*policy.AlternatePolicy) - cfg := spl.GetConfig() - if cfg == nil { - t.Fail() - } - if cfg.OffsetTime != utcTime+7200 { - t.Fatal("Policy is not updated") - } - - notification3 := ScanPolicyNotification{"none", 0} - if err := handler.Handle(notification3); err != nil { - t.Fatal(err) - } - - if testingScheduler.HasScheduled("Alternate Policy") { - t.Fail() - } - - // Clear - testingScheduler.Stop() - // Waiting for everything is ready. - <-time.After(1 * time.Second) - if testingScheduler.IsRunning() { - t.Fatal("scheduler should be stopped") - } -} diff --git a/src/common/scheduler/task/replication/replication_task.go b/src/common/scheduler/task/replication/replication_task.go index 7426e214b..d70c7d92a 100644 --- a/src/common/scheduler/task/replication/replication_task.go +++ b/src/common/scheduler/task/replication/replication_task.go @@ -1,7 +1,7 @@ package replication import ( - "github.com/goharbor/harbor/src/common/notifier" + "github.com/goharbor/harbor/src/core/notifier" "github.com/goharbor/harbor/src/replication/event/notification" "github.com/goharbor/harbor/src/replication/event/topic" ) diff --git a/src/core/api/replication.go b/src/core/api/replication.go index 2b757f8ab..f68c54119 100644 --- a/src/core/api/replication.go +++ b/src/core/api/replication.go @@ -20,9 +20,9 @@ import ( "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/notifier" "github.com/goharbor/harbor/src/common/utils/log" api_models "github.com/goharbor/harbor/src/core/api/models" + "github.com/goharbor/harbor/src/core/notifier" "github.com/goharbor/harbor/src/replication/core" "github.com/goharbor/harbor/src/replication/event/notification" "github.com/goharbor/harbor/src/replication/event/topic" diff --git a/src/core/api/repository.go b/src/core/api/repository.go index d42255a01..6f1b46d3b 100644 --- a/src/core/api/repository.go +++ b/src/core/api/repository.go @@ -29,7 +29,6 @@ import ( "github.com/goharbor/harbor/src/common" "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/notifier" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/clair" registry_error "github.com/goharbor/harbor/src/common/utils/error" @@ -37,6 +36,7 @@ import ( "github.com/goharbor/harbor/src/common/utils/notary" "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/core/notifier" coreutils "github.com/goharbor/harbor/src/core/utils" "github.com/goharbor/harbor/src/replication/event/notification" "github.com/goharbor/harbor/src/replication/event/topic" @@ -932,40 +932,22 @@ func (ra *RepositoryAPI) ScanAll() { ra.HandleUnauthorized() return } - projectIDStr := ra.GetString("project_id") - if len(projectIDStr) > 0 { // scan images under the project only. - pid, err := strconv.ParseInt(projectIDStr, 10, 64) - if err != nil || pid <= 0 { - ra.HandleBadRequest(fmt.Sprintf("Invalid project_id %s", projectIDStr)) - return - } - if !ra.SecurityCtx.HasAllPerm(pid) { - ra.HandleForbidden(ra.SecurityCtx.GetUsername()) - return - } - if err := coreutils.ScanImagesByProjectID(pid); err != nil { - log.Errorf("Failed triggering scan images in project: %d, error: %v", pid, err) - ra.HandleInternalServerError(fmt.Sprintf("Error: %v", err)) - return - } - } else { // scan all images in Harbor - if !ra.SecurityCtx.IsSysAdmin() { - ra.HandleForbidden(ra.SecurityCtx.GetUsername()) - return - } - if !utils.ScanAllMarker().Check() { - log.Warningf("There is a scan all scheduled at: %v, the request will not be processed.", utils.ScanAllMarker().Next()) - ra.RenderError(http.StatusPreconditionFailed, "Unable handle frequent scan all requests") - return - } - - if err := coreutils.ScanAllImages(); err != nil { - log.Errorf("Failed triggering scan all images, error: %v", err) - ra.HandleInternalServerError(fmt.Sprintf("Error: %v", err)) - return - } - utils.ScanAllMarker().Mark() + if !ra.SecurityCtx.IsSysAdmin() { + ra.HandleForbidden(ra.SecurityCtx.GetUsername()) + return } + if !utils.ScanAllMarker().Check() { + log.Warningf("There is a scan all scheduled at: %v, the request will not be processed.", utils.ScanAllMarker().Next()) + ra.RenderError(http.StatusPreconditionFailed, "Unable handle frequent scan all requests") + return + } + + if err := coreutils.ScanAllImages(); err != nil { + log.Errorf("Failed triggering scan all images, error: %v", err) + ra.HandleInternalServerError(fmt.Sprintf("Error: %v", err)) + return + } + utils.ScanAllMarker().Mark() ra.Ctx.ResponseWriter.WriteHeader(http.StatusAccepted) } diff --git a/src/core/api/utils.go b/src/core/api/utils.go index 6d11dd1f0..46e384ed6 100644 --- a/src/core/api/utils.go +++ b/src/core/api/utils.go @@ -22,7 +22,6 @@ import ( "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/notifier" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/clair" registry_error "github.com/goharbor/harbor/src/common/utils/error" @@ -30,6 +29,7 @@ import ( "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/common/utils/registry/auth" "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/core/notifier" "github.com/goharbor/harbor/src/core/promgr" "github.com/goharbor/harbor/src/core/service/token" coreutils "github.com/goharbor/harbor/src/core/utils" diff --git a/src/core/main.go b/src/core/main.go index 4b52a21bd..5e27975ea 100644 --- a/src/core/main.go +++ b/src/core/main.go @@ -26,8 +26,6 @@ import ( "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/notifier" - "github.com/goharbor/harbor/src/common/scheduler" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" @@ -36,6 +34,7 @@ import ( _ "github.com/goharbor/harbor/src/core/auth/uaa" "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/filter" + "github.com/goharbor/harbor/src/core/notifier" "github.com/goharbor/harbor/src/core/proxy" "github.com/goharbor/harbor/src/core/service/token" "github.com/goharbor/harbor/src/replication/core" @@ -110,9 +109,6 @@ func main() { log.Fatalf("Failed to initialize API handlers with error: %s", err.Error()) } - // Enable the policy scheduler here. - scheduler.DefaultScheduler.Start() - // Subscribe the policy change topic. if err = notifier.Subscribe(notifier.ScanAllPolicyTopic, ¬ifier.ScanPolicyNotificationHandler{}); err != nil { log.Errorf("failed to subscribe scan all policy change topic: %v", err) diff --git a/src/common/notifier/config_watcher.go b/src/core/notifier/config_watcher.go similarity index 100% rename from src/common/notifier/config_watcher.go rename to src/core/notifier/config_watcher.go diff --git a/src/common/notifier/config_watcher_test.go b/src/core/notifier/config_watcher_test.go similarity index 100% rename from src/common/notifier/config_watcher_test.go rename to src/core/notifier/config_watcher_test.go diff --git a/src/common/notifier/notification_handler.go b/src/core/notifier/notification_handler.go similarity index 100% rename from src/common/notifier/notification_handler.go rename to src/core/notifier/notification_handler.go diff --git a/src/common/notifier/notifier.go b/src/core/notifier/notifier.go similarity index 100% rename from src/common/notifier/notifier.go rename to src/core/notifier/notifier.go diff --git a/src/common/notifier/notifier_test.go b/src/core/notifier/notifier_test.go similarity index 73% rename from src/common/notifier/notifier_test.go rename to src/core/notifier/notifier_test.go index d2923c218..2ed7789ed 100644 --- a/src/common/notifier/notifier_test.go +++ b/src/core/notifier/notifier_test.go @@ -5,8 +5,6 @@ import ( "sync/atomic" "testing" "time" - - "github.com/goharbor/harbor/src/common/scheduler" ) var statefulData int32 @@ -182,46 +180,3 @@ func TestConcurrentPublish(t *testing.T) { // Clear stateful data. atomic.StoreInt32(&statefulData, 0) } - -func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) { - scheduler.DefaultScheduler.Start() - if !scheduler.DefaultScheduler.IsRunning() { - t.Fatal("Policy scheduler is not started") - } - - count := len(notificationWatcher.handlers) - if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil { - t.Fatal(err.Error()) - } - if len(notificationWatcher.handlers) != (count + 1) { - t.Fatalf("Handler is not registered") - } - - utcTime := time.Now().UTC().Unix() - notification := ScanPolicyNotification{"daily", utcTime + 3600} - for i := 1; i <= 10; i++ { - notification.DailyTime += (int64)(i) - if err := Publish("testing_topic", notification); err != nil { - t.Fatalf("index=%d, error=%s", i, err.Error()) - } - } - - // Wating for everything is ready. - <-time.After(2 * time.Second) - - if err := UnSubscribe("testing_topic", ""); err != nil { - t.Fatal(err.Error()) - } - - if len(notificationWatcher.handlers) != count { - t.Fatal("Handler is not unregistered") - } - - scheduler.DefaultScheduler.Stop() - // Wating for everything is ready. - <-time.After(1 * time.Second) - if scheduler.DefaultScheduler.IsRunning() { - t.Fatal("Policy scheduler is not stopped") - } - -} diff --git a/src/core/notifier/scan_policy_notitification_handler.go b/src/core/notifier/scan_policy_notitification_handler.go new file mode 100644 index 000000000..af138f808 --- /dev/null +++ b/src/core/notifier/scan_policy_notitification_handler.go @@ -0,0 +1,104 @@ +package notifier + +import ( + "errors" + "fmt" + "net/http" + + "github.com/goharbor/harbor/src/common/dao" + common_http "github.com/goharbor/harbor/src/common/http" + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/models" + common_utils "github.com/goharbor/harbor/src/common/utils" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/utils" +) + +const ( + // PolicyTypeDaily specify the policy type is "daily" + PolicyTypeDaily = "daily" + // PolicyTypeNone specify the policy type is "none" + PolicyTypeNone = "none" +) + +// ScanPolicyNotification is defined for pass the policy change data. +type ScanPolicyNotification struct { + // Type is used to keep the scan policy type: "none","daily" and "refresh". + Type string + + // DailyTime is used when the type is 'daily', the offset with UTC time 00:00. + DailyTime int64 +} + +// ScanPolicyNotificationHandler is defined to handle the changes of scanning +// policy. +type ScanPolicyNotificationHandler struct{} + +// IsStateful to indicate this handler is stateful. +func (s *ScanPolicyNotificationHandler) IsStateful() bool { + // Policy change should be done one by one. + return true +} + +// Handle the policy change notification. +func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error { + notification, ok := value.(ScanPolicyNotification) + if !ok { + return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type") + } + + if notification.Type == PolicyTypeDaily { + if err := cancelScanAllJobs(); err != nil { + return fmt.Errorf("Failed to cancel scan_all jobs, error: %v", err) + } + h, m, s := common_utils.ParseOfftime(notification.DailyTime) + cron := fmt.Sprintf("%d %d %d * * *", s, m, h) + if err := utils.ScheduleScanAllImages(cron); err != nil { + return fmt.Errorf("Failed to schedule scan_all job, error: %v", err) + } + } else if notification.Type == PolicyTypeNone { + if err := cancelScanAllJobs(); err != nil { + return fmt.Errorf("Failed to cancel scan_all jobs, error: %v", err) + } + } else { + return fmt.Errorf("Notification type %s is not supported", notification.Type) + } + + return nil +} + +func cancelScanAllJobs(c ...job.Client) error { + var client job.Client + if c == nil || len(c) == 0 { + client = utils.GetJobServiceClient() + } else { + client = c[0] + } + q := &models.AdminJobQuery{ + Name: job.ImageScanAllJob, + Kind: job.JobKindPeriodic, + } + jobs, err := dao.GetAdminJobs(q) + if err != nil { + log.Errorf("Failed to query sheduled scan_all jobs, error: %v", err) + return err + } + if len(jobs) > 1 { + log.Warningf("Got more than one scheduled scan_all jobs: %+v", jobs) + } + for _, j := range jobs { + if err := dao.DeleteAdminJob(j.ID); err != nil { + log.Warningf("Failed to delete scan_all job from DB, job ID: %d, job UUID: %s, error: %v", j.ID, j.UUID, err) + } + if err := client.PostAction(j.UUID, job.JobActionStop); err != nil { + if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusNotFound { + log.Warningf("scan_all job not found on jobservice, UUID: %s, skip", j.UUID) + } else { + log.Errorf("Failed to stop scan_all job, UUID: %s, error: %v", j.UUID, e) + return e + } + } + log.Infof("scan_all job canceled, uuid: %s, id: %d", j.UUID, j.ID) + } + return nil +} diff --git a/src/core/notifier/scan_policy_notitification_handler_test.go b/src/core/notifier/scan_policy_notitification_handler_test.go new file mode 100644 index 000000000..8c914c878 --- /dev/null +++ b/src/core/notifier/scan_policy_notitification_handler_test.go @@ -0,0 +1,16 @@ +package notifier + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestScanPolicyNotificationHandler(t *testing.T) { + assert := assert.New(t) + s := &ScanPolicyNotificationHandler{} + assert.True(s.IsStateful()) + err := s.Handle("") + if assert.NotNil(err) { + assert.Contains(err.Error(), "invalid type") + } +} diff --git a/src/common/notifier/topics.go b/src/core/notifier/topics.go similarity index 100% rename from src/common/notifier/topics.go rename to src/core/notifier/topics.go diff --git a/src/core/service/notifications/registry/handler.go b/src/core/service/notifications/registry/handler.go index b9efe922b..50fb4e6c5 100644 --- a/src/core/service/notifications/registry/handler.go +++ b/src/core/service/notifications/registry/handler.go @@ -23,11 +23,11 @@ import ( "github.com/goharbor/harbor/src/common/dao" clairdao "github.com/goharbor/harbor/src/common/dao/clair" "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/notifier" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/core/notifier" coreutils "github.com/goharbor/harbor/src/core/utils" rep_notification "github.com/goharbor/harbor/src/replication/event/notification" "github.com/goharbor/harbor/src/replication/event/topic" diff --git a/src/core/utils/job.go b/src/core/utils/job.go index d88c41497..8d20f82ec 100644 --- a/src/core/utils/job.go +++ b/src/core/utils/job.go @@ -21,7 +21,6 @@ import ( jobmodels "github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/core/config" "encoding/json" @@ -34,56 +33,49 @@ var ( jobServiceClient job.Client ) -// ScanAllImages scans all images of Harbor by submiting jobs to jobservice, the whole process will move on if failed to submit any job of a single image. +// ScanAllImages scans all images of Harbor by submiting a scan all job to jobservice, and the job handler will call API +// on the "core" service func ScanAllImages() error { - repos, err := dao.GetRepositories() - if err != nil { - log.Errorf("Failed to list all repositories, error: %v", err) - return err - } - log.Infof("Scanning all images on Harbor.") - - go scanRepos(repos) - return nil + _, err := scanAll("") + return err } -// ScanImagesByProjectID scans all images under a projet, the whole process will move on if failed to submit any job of a single image. -func ScanImagesByProjectID(id int64) error { - repos, err := dao.GetRepositories(&models.RepositoryQuery{ - ProjectIDs: []int64{id}, +// ScheduleScanAllImages will schedule a scan all job based on the cron string, add append a record in admin job table. +func ScheduleScanAllImages(cron string) error { + _, err := scanAll(cron) + return err +} + +func scanAll(cron string, c ...job.Client) (string, error) { + var client job.Client + if c == nil || len(c) == 0 { + client = GetJobServiceClient() + } else { + client = c[0] + } + kind := job.JobKindGeneric + if len(cron) > 0 { + kind = job.JobKindPeriodic + } + meta := &jobmodels.JobMetadata{ + JobKind: kind, + IsUnique: true, + Cron: cron, + } + id, err := dao.AddAdminJob(&models.AdminJob{ + Name: job.ImageScanAllJob, + Kind: kind, }) if err != nil { - log.Errorf("Failed list repositories in project %d, error: %v", id, err) - return err + return "", err } - log.Infof("Scanning all images in project: %d ", id) - go scanRepos(repos) - return nil -} - -func scanRepos(repos []*models.RepoRecord) { - var repoClient *registry.Repository - var err error - var tags []string - for _, r := range repos { - repoClient, err = NewRepositoryClientForUI("harbor-core", r.Name) - if err != nil { - log.Errorf("Failed to initialize client for repository: %s, error: %v, skip scanning", r.Name, err) - continue - } - tags, err = repoClient.ListTag() - if err != nil { - log.Errorf("Failed to get tags for repository: %s, error: %v, skip scanning.", r.Name, err) - continue - } - for _, t := range tags { - if err = TriggerImageScan(r.Name, t); err != nil { - log.Errorf("Failed to scan image with repository: %s, tag: %s, error: %v.", r.Name, t, err) - } else { - log.Debugf("Triggered scan for image with repository: %s, tag: %s", r.Name, t) - } - } + data := &jobmodels.JobData{ + Name: job.ImageScanAllJob, + Metadata: meta, + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/adminjob/%d", config.InternalCoreURL(), id), } + log.Infof("scan_all job scheduled/triggered, cron string: '%s'", cron) + return client.SubmitJob(data) } // GetJobServiceClient returns the job service client instance. @@ -134,7 +126,7 @@ func triggerImageScan(repository, tag, digest string, client job.Client) error { } err = dao.SetScanJobUUID(id, uuid) if err != nil { - log.Warningf("Failed to set UUID for scan job, ID: %d, repository: %s, tag: %s") + log.Warningf("Failed to set UUID for scan job, ID: %d, repository: %s, tag: %s", id, uuid, repository, tag) } return nil } diff --git a/src/core/utils/job_test.go b/src/core/utils/job_test.go new file mode 100644 index 000000000..a654200af --- /dev/null +++ b/src/core/utils/job_test.go @@ -0,0 +1,50 @@ +package utils + +import ( + "fmt" + "testing" + + "github.com/goharbor/harbor/src/common/job" + jobmodels "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/core/config" + "github.com/stretchr/testify/assert" +) + +type jobDataTestEntry struct { + input job.ScanJobParms + expect jobmodels.JobData +} + +func TestBuildScanJobData(t *testing.T) { + assert := assert.New(t) + testData := []jobDataTestEntry{ + {input: job.ScanJobParms{ + JobID: 123, + Digest: "sha256:abcde", + Repository: "library/ubuntu", + Tag: "latest", + }, + expect: jobmodels.JobData{ + Name: job.ImageScanJob, + Parameters: map[string]interface{}{ + "job_int_id": 123, + "repository": "library/ubuntu", + "tag": "latest", + "digest": "sha256:abcde", + }, + Metadata: &jobmodels.JobMetadata{ + JobKind: job.JobKindGeneric, + IsUnique: false, + }, + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/scan/%d", config.InternalCoreURL(), 123), + }, + }, + } + for _, d := range testData { + r, err := buildScanJobData(d.input.JobID, d.input.Repository, d.input.Tag, d.input.Digest) + assert.Nil(err) + assert.Equal(d.expect.Name, r.Name) + // assert.Equal(d.expect.Parameters, r.Parameters) + assert.Equal(d.expect.StatusHook, r.StatusHook) + } +} diff --git a/src/core/utils/utils_test.go b/src/core/utils/utils_test.go index 340462c9d..4988722f7 100644 --- a/src/core/utils/utils_test.go +++ b/src/core/utils/utils_test.go @@ -14,8 +14,18 @@ package utils import ( + "os" "testing" + + "github.com/goharbor/harbor/src/core/config" ) func TestMain(m *testing.M) { + err := config.Init() + if err != nil { + panic(err) + } + rc := m.Run() + os.Exit(rc) + } diff --git a/src/replication/event/init.go b/src/replication/event/init.go index 5d597b659..a055177c8 100644 --- a/src/replication/event/init.go +++ b/src/replication/event/init.go @@ -15,8 +15,8 @@ package event import ( - "github.com/goharbor/harbor/src/common/notifier" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/notifier" "github.com/goharbor/harbor/src/replication/event/topic" ) diff --git a/src/replication/event/on_push_handler.go b/src/replication/event/on_push_handler.go index c59fba4e3..2c002b7ad 100644 --- a/src/replication/event/on_push_handler.go +++ b/src/replication/event/on_push_handler.go @@ -20,9 +20,9 @@ import ( "reflect" common_models "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/notifier" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/notifier" "github.com/goharbor/harbor/src/replication" "github.com/goharbor/harbor/src/replication/event/notification" "github.com/goharbor/harbor/src/replication/event/topic" diff --git a/src/testing/job/mock_client.go b/src/testing/job/mock_client.go new file mode 100644 index 000000000..c79eccaf4 --- /dev/null +++ b/src/testing/job/mock_client.go @@ -0,0 +1,56 @@ +package job + +import ( + "fmt" + "math/rand" + + "github.com/goharbor/harbor/src/common/http" + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/job/models" +) + +// MockJobClient ... +type MockJobClient struct { + JobUUID []string +} + +// GetJobLog ... +func (mjc *MockJobClient) GetJobLog(uuid string) ([]byte, error) { + if uuid == "500" { + return nil, &http.Error{500, "Server side error"} + } + if mjc.validUUID(uuid) { + return []byte("some log"), nil + } + return nil, &http.Error{404, "Not Found"} +} + +// SubmitJob ... +func (mjc *MockJobClient) SubmitJob(data *models.JobData) (string, error) { + if data.Name == job.ImageScanAllJob || data.Name == job.ImageReplicate || data.Name == job.ImageGC || data.Name == job.ImageScanJob { + uuid := fmt.Sprintf("u-%d", rand.Int()) + mjc.JobUUID = append(mjc.JobUUID, uuid) + return uuid, nil + } + return "", fmt.Errorf("Unsupported job %s", data.Name) +} + +// PostAction ... +func (mjc *MockJobClient) PostAction(uuid, action string) error { + if "500" == uuid { + return &http.Error{500, "Server side error"} + } + if !mjc.validUUID(uuid) { + return &http.Error{404, "Not Found"} + } + return nil +} + +func (mjc *MockJobClient) validUUID(uuid string) bool { + for _, u := range mjc.JobUUID { + if uuid == u { + return true + } + } + return false +}