Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 17, 2025
1 parent f12d7ab commit bf6f98f
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 19 deletions.
4 changes: 2 additions & 2 deletions gateway/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ func (c *Gateway) cacheManifest(info *PathInfo) (int, error) {
cachedDigest, err := c.cache.DigestManifest(ctx, info.Host, info.Image, info.Manifests)
if err == nil {
if cachedDigest != digest {
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, digest)
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests)
_, err := c.queueClient.Create(context.Background(), msg, 0)
if err != nil {
c.logger.Warn("failed add message to queue", "msg", msg, "error", err)
c.logger.Warn("failed add message to queue", "msg", msg, "digest", digest, "error", err)
} else {
c.logger.Info("Add message to queue", "msg", msg, "digest", digest)
}
Expand Down
2 changes: 1 addition & 1 deletion queue/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *MessageClient) Create(ctx context.Context, content string, priority int
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
return MessageResponse{}, handleErrorResponse(resp)
}

Expand Down
9 changes: 7 additions & 2 deletions queue/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (mc *MessageController) Schedule(ctx context.Context, logger *slog.Logger)
case <-ctx.Done():
return
case <-ticker.C:
staleList, err := mc.messageService.GetStale(ctx, time.Now().Add(-time.Minute))
staleList, err := mc.messageService.GetStale(ctx)
if err != nil {
logger.Error("ReleaseStale", "error", err)
} else {
Expand All @@ -246,7 +246,7 @@ func (mc *MessageController) Schedule(ctx context.Context, logger *slog.Logger)
}
}

cleanList, err := mc.messageService.GetCompletedAndFailed(ctx, time.Now().Add(-time.Hour))
cleanList, err := mc.messageService.GetCompletedAndFailed(ctx)
if err != nil {
logger.Error("DeleteCompletedAndFailed", "error", err)
} else {
Expand All @@ -257,6 +257,11 @@ func (mc *MessageController) Schedule(ctx context.Context, logger *slog.Logger)
}
}
}

err = mc.messageService.CleanUp(ctx)
if err != nil {
logger.Error("CleanUp", "error", err)
}
}
}
}
Expand Down
30 changes: 21 additions & 9 deletions queue/dao/messgae.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"time"

"github.com/daocloud/crproxy/queue/model"
)
Expand Down Expand Up @@ -113,12 +112,12 @@ func (m *Message) UpdatePriorityByID(ctx context.Context, id int64, priority int
}

const deleteMessageByIDSQL = `
UPDATE messages SET delete_at = NOW() WHERE id = ? AND delete_at IS NULL
UPDATE messages SET delete_at = NOW(), data = ? WHERE id = ? AND delete_at IS NULL
`

func (m *Message) DeleteByID(ctx context.Context, id int64) error {
db := GetDB(ctx)
_, err := db.ExecContext(ctx, deleteMessageByIDSQL, id)
_, err := db.ExecContext(ctx, deleteMessageByIDSQL, model.MessageAttr{}, id)
if err != nil {
return fmt.Errorf("failed to delete message: %w", err)
}
Expand Down Expand Up @@ -153,16 +152,29 @@ func (m *Message) List(ctx context.Context) ([]model.Message, error) {
return messages, nil
}

const cleanUpSQL = `
DELETE FROM messages WHERE delete_at IS NOT NULL
`

func (m *Message) CleanUp(ctx context.Context) error {
db := GetDB(ctx)
_, err := db.ExecContext(ctx, cleanUpSQL)
if err != nil {
return fmt.Errorf("failed to clean up messages: %w", err)
}
return nil
}

const getCompletedAndFailedMessagesSQL = `
SELECT id, content, priority, status, data, last_heartbeat
FROM messages
WHERE (status = ? OR status = ?)
AND last_heartbeat < ? AND delete_at IS NULL
AND last_heartbeat < NOW() - INTERVAL 1 HOUR AND delete_at IS NULL
`

func (m *Message) GetCompletedAndFailed(ctx context.Context, threshold time.Time) ([]model.Message, error) {
func (m *Message) GetCompletedAndFailed(ctx context.Context) ([]model.Message, error) {
db := GetDB(ctx)
rows, err := db.QueryContext(ctx, getCompletedAndFailedMessagesSQL, model.StatusCompleted, model.StatusFailed, threshold)
rows, err := db.QueryContext(ctx, getCompletedAndFailedMessagesSQL, model.StatusCompleted, model.StatusFailed)
if err != nil {
return nil, fmt.Errorf("failed to get completed and failed messages: %w", err)
}
Expand All @@ -188,12 +200,12 @@ const getStaleMessagesSQL = `
SELECT id, content, priority, status, data, last_heartbeat
FROM messages
WHERE status = ?
AND last_heartbeat < ? AND delete_at IS NULL
AND last_heartbeat < NOW() - INTERVAL 1 MINUTE AND delete_at IS NULL
`

func (m *Message) GetStale(ctx context.Context, threshold time.Time) ([]model.Message, error) {
func (m *Message) GetStale(ctx context.Context) ([]model.Message, error) {
db := GetDB(ctx)
rows, err := db.QueryContext(ctx, getStaleMessagesSQL, model.StatusProcessing, threshold)
rows, err := db.QueryContext(ctx, getStaleMessagesSQL, model.StatusProcessing)
if err != nil {
return nil, fmt.Errorf("failed to get stale messages: %w", err)
}
Expand Down
14 changes: 9 additions & 5 deletions queue/service/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"time"

"github.com/daocloud/crproxy/queue/dao"
"github.com/daocloud/crproxy/queue/model"
Expand Down Expand Up @@ -124,14 +123,19 @@ func (s *MessageService) Cancel(ctx context.Context, id int64, lease string) err
return nil
}

func (s *MessageService) GetCompletedAndFailed(ctx context.Context, threshold time.Time) ([]model.Message, error) {
func (s *MessageService) CleanUp(ctx context.Context) error {
ctx = dao.WithDB(ctx, s.db)
return s.messageDao.GetCompletedAndFailed(ctx, threshold)
return s.messageDao.CleanUp(ctx)
}

func (s *MessageService) GetStale(ctx context.Context, threshold time.Time) ([]model.Message, error) {
func (s *MessageService) GetCompletedAndFailed(ctx context.Context) ([]model.Message, error) {
ctx = dao.WithDB(ctx, s.db)
return s.messageDao.GetStale(ctx, threshold)
return s.messageDao.GetCompletedAndFailed(ctx)
}

func (s *MessageService) GetStale(ctx context.Context) ([]model.Message, error) {
ctx = dao.WithDB(ctx, s.db)
return s.messageDao.GetStale(ctx)
}

func (s *MessageService) ResetToPending(ctx context.Context, id int64) error {
Expand Down

0 comments on commit bf6f98f

Please sign in to comment.