Skip to content

Commit

Permalink
Add message queue for gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 17, 2025
1 parent e379701 commit f12d7ab
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cache/cache_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func (c *Cache) GetManifestContent(ctx context.Context, host, image, tagOrBlob s
return content, digest, mediaType, nil
}

func (c *Cache) DigestManifest(ctx context.Context, host, image, tag string) (string, error) {
manifestLinkPath := manifestTagCachePath(host, image, tag)

digestContent, err := c.GetContent(ctx, manifestLinkPath)
if err != nil {
return "", fmt.Errorf("get manifest path %s error: %w", manifestLinkPath, err)
}
return string(digestContent), nil
}

func (c *Cache) StatManifest(ctx context.Context, host, image, tagOrBlob string) (bool, error) {
var manifestLinkPath string
isHash := strings.HasPrefix(tagOrBlob, "sha256:")
Expand Down
13 changes: 13 additions & 0 deletions cmd/crproxy/cluster/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/daocloud/crproxy/internal/pki"
"github.com/daocloud/crproxy/internal/server"
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/signing"
"github.com/daocloud/crproxy/storage"
"github.com/daocloud/crproxy/token"
Expand Down Expand Up @@ -57,6 +58,9 @@ type flagpole struct {
RegistryAlias map[string]string

Concurrency int

QueueURL string
QueueToken string
}

func NewCommand() *cobra.Command {
Expand Down Expand Up @@ -105,6 +109,10 @@ func NewCommand() *cobra.Command {
cmd.Flags().StringToStringVar(&flags.RegistryAlias, "registry-alias", flags.RegistryAlias, "registry alias")

cmd.Flags().IntVar(&flags.Concurrency, "concurrency", flags.Concurrency, "Concurrency to source")

cmd.Flags().StringVar(&flags.QueueToken, "queue-token", flags.QueueToken, "Queue token")
cmd.Flags().StringVar(&flags.QueueURL, "queue-url", flags.QueueURL, "Queue URL")

return cmd
}

Expand Down Expand Up @@ -207,6 +215,11 @@ func runE(ctx context.Context, flags *flagpole) error {
})
}

if flags.QueueURL != "" {
queueClient := client.NewMessageClient(http.DefaultClient, flags.QueueURL, flags.QueueToken)
opts = append(opts, gateway.WithQueueClient(queueClient))
}

tp = transport.NewLogTransport(tp, logger, time.Second)

client := &http.Client{
Expand Down
9 changes: 9 additions & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/internal/queue"
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/token"
"github.com/docker/distribution/registry/api/errcode"
"github.com/wzshiming/geario"
Expand Down Expand Up @@ -55,6 +56,8 @@ type Gateway struct {
blobsLENoAgent int

agent *agent.Agent

queueClient *client.MessageClient
}

type Option func(c *Gateway)
Expand Down Expand Up @@ -137,6 +140,12 @@ func WithConcurrency(concurrency int) Option {
}
}

func WithQueueClient(queueClient *client.MessageClient) Option {
return func(c *Gateway) {
c.queueClient = queueClient
}
}

func NewGateway(opts ...Option) (*Gateway, error) {
c := &Gateway{
logger: slog.Default(),
Expand Down
21 changes: 21 additions & 0 deletions gateway/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ func (c *Gateway) cacheManifest(info *PathInfo) (int, error) {
return 0, errcode.ErrorCodeDenied
}

if c.queueClient != nil {
cachedDigest, err := c.cache.DigestManifest(ctx, info.Host, info.Image, info.Manifests)
if err == nil {
if cachedDigest != digest {
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, digest)
_, err := c.queueClient.Create(context.Background(), msg, 0)
if err != nil {
c.logger.Warn("failed add message to queue", "msg", msg, "error", err)
} else {
c.logger.Info("Add message to queue", "msg", msg, "digest", digest)
}
digest = cachedDigest
}

c.manifestCache.Put(info, cacheValue{
Digest: digest,
})
return 0, nil
}
}

err = c.cache.RelinkManifest(ctx, info.Host, info.Image, info.Manifests, digest)
if err != nil {
c.logger.Warn("failed relink manifest", "url", u.String(), "error", err)
Expand Down

0 comments on commit f12d7ab

Please sign in to comment.