retry several times when failed to copy blob during the replication

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2020-08-23 21:31:17 +08:00
parent 7bcfb502d2
commit 65d658a059

View File

@ -17,7 +17,10 @@ package image
import ( import (
"errors" "errors"
"net/http" "net/http"
"os"
"strconv"
"strings" "strings"
"time"
"github.com/docker/distribution/manifest/manifestlist" "github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/manifest/schema1"
@ -32,7 +35,15 @@ import (
trans "github.com/goharbor/harbor/src/replication/transfer" trans "github.com/goharbor/harbor/src/replication/transfer"
) )
var (
retry int
)
func init() { func init() {
retry, _ = strconv.Atoi(os.Getenv("COPY_BLOB_RETRY_COUNT"))
if retry <= 0 {
retry = 5
}
if err := trans.RegisterFactory(model.ResourceTypeImage, factory); err != nil { if err := trans.RegisterFactory(model.ResourceTypeImage, factory); err != nil {
log.Errorf("failed to register transfer factory: %v", err) log.Errorf("failed to register transfer factory: %v", err)
} }
@ -244,17 +255,33 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo
// the media type of the layer or config can be "application/octet-stream", // the media type of the layer or config can be "application/octet-stream",
// schema1.MediaTypeManifestLayer, schema2.MediaTypeLayer, schema2.MediaTypeImageConfig // schema1.MediaTypeManifestLayer, schema2.MediaTypeLayer, schema2.MediaTypeImageConfig
default: default:
return t.copyBlob(srcRepo, dstRepo, digest, content.Size) return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size)
} }
} }
func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64) error {
var err error
for i, backoff := 1, 2*time.Second; i <= retry; i, backoff = i+1, backoff*2 {
t.logger.Infof("copying the blob %s(the %dth running)...", digest, i)
if err = t.copyBlob(srcRepo, dstRepo, digest, sizeFromDescriptor); err == nil {
t.logger.Infof("copy the blob %s completed", digest)
return nil
}
if i == retry {
break
}
t.logger.Infof("will retry %v later", backoff)
time.Sleep(backoff)
}
return err
}
// copy the layer or artifact config from the source registry to destination // copy the layer or artifact config from the source registry to destination
// the size parameter is taken from manifests. // the size parameter is taken from manifests.
func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64) error { func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64) error {
if t.shouldStop() { if t.shouldStop() {
return nil return nil
} }
t.logger.Infof("copying the blob %s...", digest)
exist, err := t.dst.BlobExist(dstRepo, digest) exist, err := t.dst.BlobExist(dstRepo, digest)
if err != nil { if err != nil {
t.logger.Errorf("failed to check the existence of blob %s on the destination registry: %v", digest, err) t.logger.Errorf("failed to check the existence of blob %s on the destination registry: %v", digest, err)
@ -278,10 +305,9 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor
} }
if err = t.dst.PushBlob(dstRepo, digest, size, data); err != nil { if err = t.dst.PushBlob(dstRepo, digest, size, data); err != nil {
t.logger.Errorf("failed to pushing the blob %s: %v", digest, err) t.logger.Errorf("failed to pushing the blob %s, size %d: %v", digest, size, err)
return err return err
} }
t.logger.Infof("copy the blob %s completed", digest)
return nil return nil
} }