forked from hibiken/asynq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservemux.go
158 lines (138 loc) · 4.56 KB
/
servemux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"context"
"fmt"
"sort"
"strings"
"sync"
)
// ServeMux is a multiplexer for asynchronous tasks.
// It matches the type of each task against a list of registered patterns
// and calls the handler for the pattern that most closely matches the
// task's type name.
//
// Longer patterns take precedence over shorter ones, so that if there are
// handlers registered for both "images" and "images:thumbnails",
// the latter handler will be called for tasks with a type name beginning with
// "images:thumbnails" and the former will receive tasks with type name beginning
// with "images".
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
mws []MiddlewareFunc
}
type muxEntry struct {
h Handler
pattern string
}
// MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler.
// Typically, the returned handler is a closure which does something with the context and task passed
// to it, and then calls the handler passed as parameter to the MiddlewareFunc.
type MiddlewareFunc func(Handler) Handler
// NewServeMux allocates and returns a new ServeMux.
func NewServeMux() *ServeMux {
return new(ServeMux)
}
// ProcessTask dispatches the task to the handler whose
// pattern most closely matches the task type.
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error {
h, _ := mux.Handler(task)
return h.ProcessTask(ctx, task)
}
// Handler returns the handler to use for the given task.
// It always return a non-nil handler.
//
// Handler also returns the registered pattern that matches the task.
//
// If there is no registered handler that applies to the task,
// handler returns a 'not found' handler which returns an error.
func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
h, pattern = mux.match(t.Type())
if h == nil {
h, pattern = NotFoundHandler(), ""
}
for i := len(mux.mws) - 1; i >= 0; i-- {
h = mux.mws[i](h)
}
return h, pattern
}
// Find a handler on a handler map given a typename string.
// Most-specific (longest) pattern wins.
func (mux *ServeMux) match(typename string) (h Handler, pattern string) {
// Check for exact match first.
v, ok := mux.m[typename]
if ok {
return v.h, v.pattern
}
// Check for longest valid match.
// mux.es contains all patterns from longest to shortest.
for _, e := range mux.es {
if strings.HasPrefix(typename, e.pattern) {
return e.h, e.pattern
}
}
return nil, ""
}
// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()
if strings.TrimSpace(pattern) == "" {
panic("asynq: invalid pattern")
}
if handler == nil {
panic("asynq: nil handler")
}
if _, exist := mux.m[pattern]; exist {
panic("asynq: multiple registrations for " + pattern)
}
if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
mux.es = appendSorted(mux.es, e)
}
func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
n := len(es)
i := sort.Search(n, func(i int) bool {
return len(es[i].pattern) < len(e.pattern)
})
if i == n {
return append(es, e)
}
// we now know that i points at where we want to insert.
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
copy(es[i+1:], es[i:]) // shift shorter entries down.
es[i] = e
return es
}
// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) {
if handler == nil {
panic("asynq: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}
// Use appends a MiddlewareFunc to the chain.
// Middlewares are executed in the order that they are applied to the ServeMux.
func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
mux.mu.Lock()
defer mux.mu.Unlock()
for _, fn := range mws {
mux.mws = append(mux.mws, fn)
}
}
// NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("handler not found for task %q", task.Type())
}
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }