From e4cb015d34ce593f78b18a36043ad530c1b78885 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 17 May 2016 18:23:45 +0800 Subject: [PATCH 1/3] stream the transmission of blobs --- utils/registry/repository.go | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/utils/registry/repository.go b/utils/registry/repository.go index ac49e04f8..642b076aa 100644 --- a/utils/registry/repository.go +++ b/utils/registry/repository.go @@ -19,6 +19,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -385,7 +386,7 @@ func (r *Repository) BlobExist(digest string) (bool, error) { } // PullBlob ... -func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error) { +func (r *Repository) PullBlob(digest string) (size int64, data io.ReadCloser, err error) { req, err := http.NewRequest("GET", buildBlobURL(r.Endpoint.String(), r.Name, digest), nil) if err != nil { return @@ -401,19 +402,19 @@ func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error return } - defer resp.Body.Close() - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - return - } - if resp.StatusCode == http.StatusOK { contengLength := resp.Header.Get(http.CanonicalHeaderKey("Content-Length")) size, err = strconv.ParseInt(contengLength, 10, 64) if err != nil { return } - data = b + data = resp.Body + return + } + + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { return } @@ -462,8 +463,8 @@ func (r *Repository) initiateBlobUpload(name string) (location, uploadUUID strin return } -func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data []byte) error { - req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), bytes.NewReader(data)) +func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data io.Reader) error { + req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), data) if err != nil { return err } @@ -496,17 +497,7 @@ func (r *Repository) monolithicBlobUpload(location, digest string, size int64, d } // PushBlob ... -func (r *Repository) PushBlob(digest string, size int64, data []byte) error { - exist, err := r.BlobExist(digest) - if err != nil { - return err - } - - if exist { - log.Infof("blob already exists, skip pushing: %s %s", r.Name, digest) - return nil - } - +func (r *Repository) PushBlob(digest string, size int64, data io.Reader) error { location, _, err := r.initiateBlobUpload(r.Name) if err != nil { return err From 4ee50bc8b6af6c8faab785a84cd1a38ac456f997 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 18 May 2016 16:37:11 +0800 Subject: [PATCH 2/3] caching the existence of blobs to avoid duplicate checking --- job/imgout/statehandlers.go | 48 +++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/job/imgout/statehandlers.go b/job/imgout/statehandlers.go index 36c3bd2cf..220fee1a4 100644 --- a/job/imgout/statehandlers.go +++ b/job/imgout/statehandlers.go @@ -43,6 +43,7 @@ const ( StatePushManifest = "push_manifest" ) +// BaseHandler holds informations shared by other state handlers type BaseHandler struct { project string // project_name repository string // prject_name/repo_name @@ -61,9 +62,13 @@ type BaseHandler struct { manifest distribution.Manifest // manifest of tags[0] blobs []string // blobs need to be transferred for tags[0] + blobsExistence map[string]bool //key: digest of blob, value: existence + logger *utils.Logger } +// InitBaseHandler initializes a BaseHandler: creating clients for source and destination registry, +// listing tags of the repository if parameter tags is nil. func InitBaseHandler(repository, srcURL, srcSecretKey, dstURL, dstUsr, dstPwd string, tags []string, logger *utils.Logger) (*BaseHandler, error) { @@ -71,14 +76,15 @@ func InitBaseHandler(repository, srcURL, srcSecretKey, repository, tags, srcURL, dstURL, dstUsr) base := &BaseHandler{ - repository: repository, - tags: tags, - srcURL: srcURL, - srcSecretKey: srcSecretKey, - dstURL: dstURL, - dstUsr: dstUsr, - dstPwd: dstPwd, - logger: logger, + repository: repository, + tags: tags, + srcURL: srcURL, + srcSecretKey: srcSecretKey, + dstURL: dstURL, + dstUsr: dstUsr, + dstPwd: dstPwd, + blobsExistence: make(map[string]bool, 10), + logger: logger, } base.project = getProjectName(base.repository) @@ -112,6 +118,7 @@ func InitBaseHandler(repository, srcURL, srcSecretKey, return base, nil } +// Exit ... func (b *BaseHandler) Exit() error { return nil } @@ -122,11 +129,12 @@ func getProjectName(repository string) string { return repository[:strings.LastIndex(repository, "/")] } +// Checker checks the existence of project and the user's privlege to the project type Checker struct { *BaseHandler } -// check existence of project, if it does not exist, create it, +// Enter check existence of project, if it does not exist, create it, // if it exists, check whether the user has write privilege to it. func (c *Checker) Enter() (string, error) { exist, canWrite, err := c.projectExist() @@ -247,10 +255,13 @@ func (c *Checker) createProject() error { return nil } +// ManifestPuller pulls the manifest of a tag. And if no tag needs to be pulled, +// the next state that state machine should enter is "finished". type ManifestPuller struct { *BaseHandler } +// Enter pulls manifest of a tag and checks if all blobs exist in the destination registry func (m *ManifestPuller) Enter() (string, error) { if len(m.tags) == 0 { m.logger.Infof("no tag needs to be replicated, entering finish state") @@ -296,11 +307,16 @@ func (m *ManifestPuller) Enter() (string, error) { m.logger.Infof("all blobs of %s:%s from %s: %v", name, tag, m.srcURL, blobs) for _, blob := range blobs { - exist, err := m.dstClient.BlobExist(blob) - if err != nil { - m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err) - return "", err + exist, ok := m.blobsExistence[blob] + if !ok { + exist, err = m.dstClient.BlobExist(blob) + if err != nil { + m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err) + return "", err + } + m.blobsExistence[blob] = exist } + if !exist { m.blobs = append(m.blobs, blob) } @@ -312,10 +328,12 @@ func (m *ManifestPuller) Enter() (string, error) { return StateTransferBlob, nil } +// BlobTransfer transfers blobs of a tag type BlobTransfer struct { *BaseHandler } +// Enter pulls blobs and then pushs them to destination registry. func (b *BlobTransfer) Enter() (string, error) { name := b.repository tag := b.tags[0] @@ -335,10 +353,14 @@ func (b *BlobTransfer) Enter() (string, error) { return StatePushManifest, nil } +// ManifestPusher pushs the manifest to destination registry type ManifestPusher struct { *BaseHandler } +// Enter checks the existence of manifest in the source registry first, and if it +// exists, pushs it to destination registry. The checking operation is to avoid +// the situation that the tag is deleted during the blobs transfering func (m *ManifestPusher) Enter() (string, error) { name := m.repository tag := m.tags[0] From 9dc931d7c9cc1095e746aa7dfc96749c3fb4c24b Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 18 May 2016 18:17:40 +0800 Subject: [PATCH 3/3] rename package name --- job/{imgout => replication}/parm.go | 2 +- job/{imgout => replication}/runner.go | 2 +- job/{imgout => replication}/statehandlers.go | 17 ++++++++++------- job/statemachine.go | 16 ++++++++-------- 4 files changed, 20 insertions(+), 17 deletions(-) rename job/{imgout => replication}/parm.go (93%) rename job/{imgout => replication}/runner.go (98%) rename job/{imgout => replication}/statehandlers.go (94%) diff --git a/job/imgout/parm.go b/job/replication/parm.go similarity index 93% rename from job/imgout/parm.go rename to job/replication/parm.go index d1ff6bc8d..68deb4f2d 100644 --- a/job/imgout/parm.go +++ b/job/replication/parm.go @@ -1,4 +1,4 @@ -package imgout +package replication type ImgOutParm struct { Secret string `json:"secret"` diff --git a/job/imgout/runner.go b/job/replication/runner.go similarity index 98% rename from job/imgout/runner.go rename to job/replication/runner.go index d4311fb4a..075a1ee99 100644 --- a/job/imgout/runner.go +++ b/job/replication/runner.go @@ -1,4 +1,4 @@ -package imgout +package replication /* import ( diff --git a/job/imgout/statehandlers.go b/job/replication/statehandlers.go similarity index 94% rename from job/imgout/statehandlers.go rename to job/replication/statehandlers.go index 220fee1a4..6d789dcc5 100644 --- a/job/imgout/statehandlers.go +++ b/job/replication/statehandlers.go @@ -13,7 +13,7 @@ limitations under the License. */ -package imgout +package replication import ( "bytes" @@ -72,7 +72,7 @@ type BaseHandler struct { func InitBaseHandler(repository, srcURL, srcSecretKey, dstURL, dstUsr, dstPwd string, tags []string, logger *utils.Logger) (*BaseHandler, error) { - logger.Infof("initializing base handler: repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s", + logger.Infof("initializing: repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s", repository, tags, srcURL, dstURL, dstUsr) base := &BaseHandler{ @@ -112,7 +112,7 @@ func InitBaseHandler(repository, srcURL, srcSecretKey, base.tags = tags } - base.logger.Infof("initialization of base handler completed: project: %s, repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s", + base.logger.Infof("initialization completed: project: %s, repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s", base.project, base.repository, base.tags, base.srcURL, base.dstURL, base.dstUsr) return base, nil @@ -264,7 +264,7 @@ type ManifestPuller struct { // Enter pulls manifest of a tag and checks if all blobs exist in the destination registry func (m *ManifestPuller) Enter() (string, error) { if len(m.tags) == 0 { - m.logger.Infof("no tag needs to be replicated, entering finish state") + m.logger.Infof("no tag needs to be replicated, next state is \"finished\"") return models.JobFinished, nil } @@ -319,12 +319,12 @@ func (m *ManifestPuller) Enter() (string, error) { if !exist { m.blobs = append(m.blobs, blob) + } else { + m.logger.Infof("blob %s of %s:%s already exists in %s", blob, name, tag, m.dstURL) } } m.logger.Infof("blobs of %s:%s need to be transferred to %s: %v", name, tag, m.dstURL, m.blobs) - m.blobs = blobs - return StateTransferBlob, nil } @@ -338,6 +338,7 @@ func (b *BlobTransfer) Enter() (string, error) { name := b.repository tag := b.tags[0] for _, blob := range b.blobs { + b.logger.Infof("transferring blob %s of %s:%s to %s ...", blob, name, tag, b.dstURL) size, data, err := b.srcClient.PullBlob(blob) if err != nil { b.logger.Errorf("an error occurred while pulling blob %s of %s:%s from %s: %v", blob, name, tag, b.srcURL, err) @@ -347,7 +348,7 @@ func (b *BlobTransfer) Enter() (string, error) { b.logger.Errorf("an error occurred while pushing blob %s of %s:%s to %s : %v", blob, name, tag, b.dstURL, err) return "", err } - b.logger.Infof("blob %s of %s:%s tranferred to %s completed", blob, name, tag, b.dstURL) + b.logger.Infof("blob %s of %s:%s transferred to %s completed", blob, name, tag, b.dstURL) } return StatePushManifest, nil @@ -387,6 +388,8 @@ func (m *ManifestPusher) Enter() (string, error) { } m.tags = m.tags[1:] + m.manifest = nil + m.blobs = nil return StatePullManifest, nil } diff --git a/job/statemachine.go b/job/statemachine.go index e1b4059cf..2595e8556 100644 --- a/job/statemachine.go +++ b/job/statemachine.go @@ -6,7 +6,7 @@ import ( "github.com/vmware/harbor/dao" "github.com/vmware/harbor/job/config" - "github.com/vmware/harbor/job/imgout" + "github.com/vmware/harbor/job/replication" "github.com/vmware/harbor/job/utils" "github.com/vmware/harbor/models" "github.com/vmware/harbor/utils/log" @@ -203,17 +203,17 @@ func (sm *JobSM) Reset(jid int64) error { } func addImgOutTransition(sm *JobSM) error { - base, err := imgout.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, "", + base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, "", sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword, nil, &sm.Logger) if err != nil { return err } - sm.AddTransition(models.JobRunning, imgout.StateCheck, &imgout.Checker{BaseHandler: base}) - sm.AddTransition(imgout.StateCheck, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base}) - sm.AddTransition(imgout.StatePullManifest, imgout.StateTransferBlob, &imgout.BlobTransfer{BaseHandler: base}) - sm.AddTransition(imgout.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished}) - sm.AddTransition(imgout.StateTransferBlob, imgout.StatePushManifest, &imgout.ManifestPusher{BaseHandler: base}) - sm.AddTransition(imgout.StatePushManifest, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base}) + sm.AddTransition(models.JobRunning, replication.StateCheck, &replication.Checker{BaseHandler: base}) + sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base}) + sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base}) + sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished}) + sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base}) + sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base}) return nil }