Skip to content

Commit

Permalink
Add --deep to sync
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Dec 10, 2024
1 parent fd9f2a2 commit 1c9441b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
3 changes: 3 additions & 0 deletions cmd/crproxy/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type flagpole struct {
StorageDriver string
StorageParameters map[string]string
Deep bool
List []string
ListFromFile string
Platform []string
Expand All @@ -45,6 +46,7 @@ func NewCommand() *cobra.Command {
}
cmd.Flags().StringVar(&flags.StorageDriver, "storage-driver", flags.StorageDriver, "Storage driver")
cmd.Flags().StringToStringVar(&flags.StorageParameters, "storage-parameters", flags.StorageParameters, "Storage parameters")
cmd.Flags().BoolVar(&flags.Deep, "deep", flags.Deep, "Deep sync")
cmd.Flags().StringSliceVar(&flags.List, "list", flags.List, "List")
cmd.Flags().StringVar(&flags.ListFromFile, "list-from-file", flags.ListFromFile, "List from file")
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
Expand Down Expand Up @@ -101,6 +103,7 @@ func runE(ctx context.Context, flags *flagpole) error {
"docker.io": "registry-1.docker.io",
"ollama.ai": "registry.ollama.ai",
}),
csync.WithDeep(flags.Deep),
csync.WithClient(client),
csync.WithLogger(logger),
)
Expand Down
53 changes: 44 additions & 9 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type SyncManager struct {
cache *cache.Cache
logger *slog.Logger
domainAlias map[string]string
deep bool
}

func (c *SyncManager) getDomainAlias(host string) string {
Expand All @@ -61,6 +62,12 @@ func (c *SyncManager) getDomainAlias(host string) string {

type Option func(*SyncManager)

func WithDeep(deep bool) Option {
return func(c *SyncManager) {
c.deep = deep
}
}

func WithDomainAlias(domainAlias map[string]string) Option {
return func(c *SyncManager) {
c.domainAlias = domainAlias
Expand Down Expand Up @@ -243,15 +250,16 @@ func (c *SyncManager) Image(ctx context.Context, image string, filter func(pf ma
return nil
}

ts := repo.Tags(ctx)

switch ref.(type) {
case reference.Digested, reference.Tagged:
err = c.syncLayerFromManifestList(ctx, ms, ref, filter, blobCallback, manifestCallback, host+"/"+ref.String())
err = c.syncLayerFromManifestList(ctx, ms, ts, ref, filter, blobCallback, manifestCallback, host+"/"+ref.String())
if err != nil {
return fmt.Errorf("sync layer from manifest list failed: %w", err)
}
default:
t := repo.Tags(ctx)
tags, err := t.All(ctx)
tags, err := ts.All(ctx)
if err != nil {
return fmt.Errorf("get tags failed: %w", err)
}
Expand All @@ -261,7 +269,7 @@ func (c *SyncManager) Image(ctx context.Context, image string, filter func(pf ma
if err != nil {
return fmt.Errorf("with tag failed: %w", err)
}
err = c.syncLayerFromManifestList(ctx, ms, t, filter, blobCallback, manifestCallback, host+"/"+t.String())
err = c.syncLayerFromManifestList(ctx, ms, ts, t, filter, blobCallback, manifestCallback, host+"/"+t.String())
if err != nil {
return fmt.Errorf("sync layer from manifest list failed: %w", err)
}
Expand All @@ -271,28 +279,49 @@ func (c *SyncManager) Image(ctx context.Context, image string, filter func(pf ma
return nil
}

func (c *SyncManager) syncLayerFromManifestList(ctx context.Context, ms distribution.ManifestService, ref reference.Reference, filter func(pf manifestlist.PlatformSpec) bool,
func (c *SyncManager) syncLayerFromManifestList(ctx context.Context, ms distribution.ManifestService, ts distribution.TagService, ref reference.Reference, filter func(pf manifestlist.PlatformSpec) bool,
digestCallback func(dgst digest.Digest, size int64, pf *manifestlist.PlatformSpec, name string) error,
manifestCallback func(tagOrHash string, m distribution.Manifest) error, name string) error {

var (
m distribution.Manifest
err error
)

var hash digest.Digest
switch r := ref.(type) {
case reference.Digested:
m, err = ms.Get(ctx, r.Digest())
hash = r.Digest()
if !c.deep {
stat, err := c.cache.StatBlob(ctx, hash.String())
if err == nil && stat.Size() > 0 {
return nil
}
}
m, err = ms.Get(ctx, hash)
if err != nil {
return fmt.Errorf("get manifest digest failed: %w", err)
}
err = manifestCallback(r.Digest().String(), m)
err = manifestCallback(hash.String(), m)
if err != nil {
return fmt.Errorf("manifest callback failed: %w", err)
}
case reference.Tagged:
tag := r.Tag()
m, err = ms.Get(ctx, "", distribution.WithTag(tag))
desc, err := ts.Get(ctx, tag)
if err != nil {
return fmt.Errorf("get manifest tag failed: %w", err)
return fmt.Errorf("get tag failed: %w", err)
}
hash = desc.Digest
if !c.deep {
stat, err := c.cache.StatBlob(ctx, hash.String())
if err == nil && stat.Size() == desc.Size {
return nil
}
}
m, err = ms.Get(ctx, hash)
if err != nil {
return fmt.Errorf("get manifest digest failed: %w", err)
}
err = manifestCallback(tag, m)
if err != nil {
Expand All @@ -308,6 +337,12 @@ func (c *SyncManager) syncLayerFromManifestList(ctx context.Context, ms distribu
if filter != nil && !filter(mfest.Platform) {
continue
}
if !c.deep {
stat, err := c.cache.StatBlob(ctx, mfest.Digest.String())
if err == nil && stat.Size() == mfest.Size {
continue
}
}

m0, err := ms.Get(ctx, mfest.Digest)
if err != nil {
Expand Down

0 comments on commit 1c9441b

Please sign in to comment.