Skip to content

Commit

Permalink
Refactor sync
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Dec 10, 2024
1 parent cc660c4 commit 3e71a87
Show file tree
Hide file tree
Showing 7 changed files with 679 additions and 418 deletions.
24 changes: 24 additions & 0 deletions clientset/credentials.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clientset

import (
"fmt"
"net/http"
"net/url"
"strings"
Expand All @@ -13,6 +14,29 @@ type Userpass struct {
Password string
}

func ToUserAndPass(userpass []string) (map[string]Userpass, error) {
bc := map[string]Userpass{}
for _, up := range userpass {
s := strings.SplitN(up, "@", 3)
if len(s) != 2 {
return nil, fmt.Errorf("invalid userpass %q", up)
}

u := strings.SplitN(s[0], ":", 3)
if len(s) != 2 {
return nil, fmt.Errorf("invalid userpass %q", up)
}
host := s[1]
user := u[0]
pwd := u[1]
bc[host] = Userpass{
Username: user,
Password: pwd,
}
}
return bc, nil
}

type basicCredentials struct {
credentials map[string]Userpass
}
Expand Down
49 changes: 21 additions & 28 deletions cmd/crproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/clientset"
csync "github.com/daocloud/crproxy/cmd/crproxy/sync"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/gorilla/handlers"
"github.com/spf13/pflag"
"github.com/spf13/cobra"
"github.com/wzshiming/geario"
"github.com/wzshiming/hostmatcher"

Expand Down Expand Up @@ -139,34 +140,30 @@ func init() {

pflag.StringVar(&tokenPrivateKeyFile, "token-private-key-file", "", "private key file")
pflag.StringVar(&tokenPublicKeyFile, "token-public-key-file", "", "public key file")
pflag.Parse()

cmd.AddCommand(csync.NewCommand())
}

func toUserAndPass(userpass []string) (map[string]clientset.Userpass, error) {
bc := map[string]clientset.Userpass{}
for _, up := range userpass {
s := strings.SplitN(up, "@", 3)
if len(s) != 2 {
return nil, fmt.Errorf("invalid userpass %q", up)
}
var (
cmd = &cobra.Command{
Use: "crproxy",
Short: "crproxy",
Run: func(cmd *cobra.Command, args []string) {
run(cmd.Context())
},
}
pflag = cmd.Flags()
)

u := strings.SplitN(s[0], ":", 3)
if len(s) != 2 {
return nil, fmt.Errorf("invalid userpass %q", up)
}
host := s[1]
user := u[0]
pwd := u[1]
bc[host] = clientset.Userpass{
Username: user,
Password: pwd,
}
func main() {
err := cmd.Execute()
if err != nil {
slog.Error("execute failed", "error", err)
os.Exit(1)
}
return bc, nil
}

func main() {
ctx := context.Background()
func run(ctx context.Context) {
logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))

mux := http.NewServeMux()
Expand Down Expand Up @@ -429,7 +426,7 @@ func main() {
}

if len(userpass) != 0 {
bc, err := toUserAndPass(userpass)
bc, err := clientset.ToUserAndPass(userpass)
if err != nil {
logger.Error("failed to toUserAndPass", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -591,10 +588,6 @@ func main() {

mux.Handle("/v2/", crp)

if enableInternalAPI {
mux.HandleFunc("/internal/api/image/sync", crp.Sync)
}

if enablePprof {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down
170 changes: 170 additions & 0 deletions cmd/crproxy/sync/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package sync

import (
"context"
"fmt"
"log/slog"
"math/rand"
"os"
"strings"

"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/clientset"
csync "github.com/daocloud/crproxy/sync"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/spf13/cobra"
)

type flagpole struct {
StorageDriver string
StorageParameters map[string]string
Deep bool
List []string
ListFromFile string
Platform []string
MaxWarn int
Userpass []string
}

func NewCommand() *cobra.Command {
flags := &flagpole{
Platform: []string{
"linux/amd64",
"linux/arm64",
},
MaxWarn: -1,
}

cmd := &cobra.Command{
Args: cobra.NoArgs,
Use: "sync",
Short: "Sync images",
RunE: func(cmd *cobra.Command, args []string) error {
return runE(cmd.Context(), flags)
},
}
cmd.Flags().StringVar(&flags.StorageDriver, "storage-driver", flags.StorageDriver, "Storage driver")
cmd.Flags().StringToStringVar(&flags.StorageParameters, "storage-parameters", flags.StorageParameters, "Storage parameters")
cmd.Flags().BoolVar(&flags.Deep, "deep", flags.Deep, "Deep sync")
cmd.Flags().StringSliceVar(&flags.List, "list", flags.List, "List")
cmd.Flags().StringVar(&flags.ListFromFile, "list-from-file", flags.ListFromFile, "List from file")
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
cmd.Flags().IntVar(&flags.MaxWarn, "max-warn", flags.MaxWarn, "Max warn")
cmd.Flags().StringSliceVarP(&flags.Userpass, "user", "u", flags.Userpass, "host and username and password -u user:pwd@host")
return cmd
}

func runE(ctx context.Context, flags *flagpole) error {

opts := []csync.Option{}

logger := slog.New(slog.NewJSONHandler(os.Stderr, nil))

cacheOpts := []cache.Option{}

parameters := map[string]interface{}{}
for k, v := range flags.StorageParameters {
parameters[k] = v
}
sd, err := factory.Create(flags.StorageDriver, parameters)
if err != nil {
return fmt.Errorf("create storage driver failed: %w", err)
}
cacheOpts = append(cacheOpts, cache.WithStorageDriver(sd))

cache, err := cache.NewCache(cacheOpts...)
if err != nil {
return fmt.Errorf("create cache failed: %w", err)
}

clientOpts := []clientset.Option{
clientset.WithLogger(logger),
clientset.WithMaxClientSizeForEachRegistry(16),
}

if len(flags.Userpass) != 0 {
bc, err := clientset.ToUserAndPass(flags.Userpass)
if err != nil {
return fmt.Errorf("failed to toUserAndPass: %w", err)
}
clientOpts = append(clientOpts, clientset.WithUserAndPass(bc))
}

client, err := clientset.NewClientset(clientOpts...)
if err != nil {
return fmt.Errorf("create clientset failed: %w", err)
}

opts = append(opts,
csync.WithCache(cache),
csync.WithDomainAlias(map[string]string{
"docker.io": "registry-1.docker.io",
"ollama.ai": "registry.ollama.ai",
}),
csync.WithDeep(flags.Deep),
csync.WithClient(client),
csync.WithLogger(logger),
csync.WithFilterPlatform(filterPlatform(flags.Platform)),
)

sm, err := csync.NewSyncManager(opts...)
if err != nil {
return fmt.Errorf("create sync manager failed: %w", err)
}

warnCount := 0
for _, item := range flags.List {
err = sm.Image(ctx, item)
if err != nil {
logger.Warn("Sync failed", "error ", err)
warnCount++
if flags.MaxWarn > -1 && warnCount >= flags.MaxWarn {
return fmt.Errorf("max warn reached")
}
}
}

if flags.ListFromFile != "" {
f, err := os.ReadFile(flags.ListFromFile)
if err != nil {
return fmt.Errorf("read list from file failed: %w", err)
}
lines := strings.Split(string(f), "\n")

rand.Shuffle(len(lines), func(i, j int) {
lines[i], lines[j] = lines[j], lines[i]
})
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}

err = sm.Image(ctx, line)
if err != nil {
logger.Warn("Sync failed", "error ", err)
warnCount++
if flags.MaxWarn > -1 && warnCount >= flags.MaxWarn {
return fmt.Errorf("max warn reached")
}
}
}
}
return nil
}

func filterPlatform(ps []string) func(pf manifestlist.PlatformSpec) bool {
platforms := map[string]struct{}{}
for _, p := range ps {
platforms[p] = struct{}{}
}
return func(pf manifestlist.PlatformSpec) bool {
p := fmt.Sprintf("%s/%s", pf.OS, pf.Architecture)

if _, ok := platforms[p]; ok {
return true
}
return false
}
}
33 changes: 18 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/gorilla/handlers v1.5.2
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.6+incompatible
github.com/opencontainers/go-digest v1.0.0
github.com/spf13/pflag v1.0.5
github.com/spf13/cobra v1.8.1
github.com/wzshiming/cmux v0.4.2
github.com/wzshiming/geario v0.0.0-20240308093553-a996e3817533
github.com/wzshiming/hostmatcher v0.0.3
Expand All @@ -35,32 +35,35 @@ require (
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/aws/aws-sdk-go v1.48.10 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dnaeon/go-vcr v1.2.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/wzshiming/trie v0.3.1 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.30.0 // indirect
Expand All @@ -70,7 +73,7 @@ require (
google.golang.org/api v0.126.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/cloud v0.0.0-20151119220103-975617b05ea8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
Loading

0 comments on commit 3e71a87

Please sign in to comment.