Skip to content

Commit

Permalink
Refactor sync
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Dec 9, 2024
1 parent cc660c4 commit 8a3447c
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 159 deletions.
28 changes: 20 additions & 8 deletions cmd/crproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/clientset"
csync "github.com/daocloud/crproxy/cmd/crproxy/sync"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/gorilla/handlers"
"github.com/spf13/pflag"
"github.com/spf13/cobra"
"github.com/wzshiming/geario"
"github.com/wzshiming/hostmatcher"

Expand Down Expand Up @@ -139,7 +140,23 @@ func init() {

pflag.StringVar(&tokenPrivateKeyFile, "token-private-key-file", "", "private key file")
pflag.StringVar(&tokenPublicKeyFile, "token-public-key-file", "", "public key file")
pflag.Parse()

cmd.AddCommand(csync.NewCommand())
}

var (
cmd = &cobra.Command{
Use: "crproxy",
Short: "crproxy",
Run: func(cmd *cobra.Command, args []string) {
run(cmd.Context())
},
}
pflag = cmd.Flags()
)

func main() {
cmd.Execute()
}

func toUserAndPass(userpass []string) (map[string]clientset.Userpass, error) {
Expand All @@ -165,8 +182,7 @@ func toUserAndPass(userpass []string) (map[string]clientset.Userpass, error) {
return bc, nil
}

func main() {
ctx := context.Background()
func run(ctx context.Context) {
logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))

mux := http.NewServeMux()
Expand Down Expand Up @@ -591,10 +607,6 @@ func main() {

mux.Handle("/v2/", crp)

if enableInternalAPI {
mux.HandleFunc("/internal/api/image/sync", crp.Sync)
}

if enablePprof {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down
162 changes: 162 additions & 0 deletions cmd/crproxy/sync/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package sync

import (
"bufio"
"context"
"fmt"
"log/slog"
"os"
"strings"

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/clientset"
csync "github.com/daocloud/crproxy/sync"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/spf13/cobra"
)

type flagpole struct {
StorageDriver string
StorageParameters map[string]string
List []string
ListFromFile string
Platform []string
MaxWarn int
}

func NewCommand() *cobra.Command {
flags := &flagpole{
Platform: []string{
"linux/amd64",
"linux/arm64",
},
MaxWarn: -1,
}

cmd := &cobra.Command{
Args: cobra.NoArgs,
Use: "sync",
Short: "Sync images",
RunE: func(cmd *cobra.Command, args []string) error {
return runE(cmd.Context(), flags)
},
}
cmd.Flags().StringVar(&flags.StorageDriver, "storage-driver", flags.StorageDriver, "Storage driver")
cmd.Flags().StringToStringVar(&flags.StorageParameters, "storage-parameters", flags.StorageParameters, "Storage parameters")
cmd.Flags().StringSliceVar(&flags.List, "list", flags.List, "List")
cmd.Flags().StringVar(&flags.ListFromFile, "list-from-file", flags.ListFromFile, "List from file")
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
cmd.Flags().IntVar(&flags.MaxWarn, "max-warn", flags.MaxWarn, "Max warn")
return cmd
}

func runE(ctx context.Context, flags *flagpole) error {

opts := []csync.Option{}

logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))

cacheOpts := []cache.Option{}

parameters := map[string]interface{}{}
for k, v := range flags.StorageParameters {
parameters[k] = v
}
sd, err := factory.Create(flags.StorageDriver, parameters)
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
cacheOpts = append(cacheOpts, cache.WithStorageDriver(sd))

cache, err := cache.NewCache(cacheOpts...)
if err != nil {
return fmt.Errorf("create cache failed: %w", err)
}

clientOpts := []clientset.Option{
clientset.WithLogger(logger),
clientset.WithMaxClientSizeForEachRegistry(16),
}

client, err := clientset.NewClientset(clientOpts...)
if err != nil {
return fmt.Errorf("create clientset failed: %w", err)
}

opts = append(opts,
csync.WithCache(cache),
csync.WithDomainAlias(map[string]string{
"docker.io": "registry-1.docker.io",
"ollama.ai": "registry.ollama.ai",
}),
csync.WithClient(client),
csync.WithLogger(logger),
)

sm, err := csync.NewSyncManager(opts...)
if err != nil {
return fmt.Errorf("create sync manager failed: %w", err)
}

warnCount := 0
for _, item := range flags.List {
err = sm.Image(ctx, item, platformFilter(flags.Platform), func(sp csync.Progress) error {
logger.Info("Sync", "progress", sp)
return nil
})
if err != nil {
logger.Warn("Sync failed", "error ", err)
warnCount++
if flags.MaxWarn > -1 && warnCount >= flags.MaxWarn {
return fmt.Errorf("max warn reached")
}
}
}

if flags.ListFromFile != "" {
f, err := os.Open(flags.ListFromFile)
if err != nil {
return fmt.Errorf("read list from file failed: %w", err)
}
defer f.Close()
reader := bufio.NewReader(f)
for {
line, err := reader.ReadString('\n')
if err != nil {
break
}
line = strings.TrimSpace(line)
if line == "" {
continue
}
err = sm.Image(ctx, line, platformFilter(flags.Platform), func(sp csync.Progress) error {
logger.Info("Sync", "progress", sp)
return nil
})
if err != nil {
logger.Warn("Sync failed", "error ", err)
warnCount++
if flags.MaxWarn > -1 && warnCount >= flags.MaxWarn {
return fmt.Errorf("max warn reached")
}
}
}
}
return nil
}

func platformFilter(ps []string) func(pf manifestlist.PlatformSpec) bool {
platforms := map[string]struct{}{}
for _, p := range ps {
platforms[p] = struct{}{}
}
return func(pf manifestlist.PlatformSpec) bool {
p := fmt.Sprintf("%s/%s", pf.OS, pf.Architecture)

if _, ok := platforms[p]; ok {
return true
}
return false
}
}
33 changes: 18 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/gorilla/handlers v1.5.2
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.6+incompatible
github.com/opencontainers/go-digest v1.0.0
github.com/spf13/pflag v1.0.5
github.com/spf13/cobra v1.8.1
github.com/wzshiming/cmux v0.4.2
github.com/wzshiming/geario v0.0.0-20240308093553-a996e3817533
github.com/wzshiming/hostmatcher v0.0.3
Expand All @@ -35,32 +35,35 @@ require (
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/aws/aws-sdk-go v1.48.10 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dnaeon/go-vcr v1.2.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/wzshiming/trie v0.3.1 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.30.0 // indirect
Expand All @@ -70,7 +73,7 @@ require (
google.golang.org/api v0.126.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/cloud v0.0.0-20151119220103-975617b05ea8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
Loading

0 comments on commit 8a3447c

Please sign in to comment.