-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathexecutor_test.go
133 lines (102 loc) · 3.27 KB
/
executor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package async
import (
"context"
"errors"
"runtime"
"testing"
"time"
"github.com/reugn/async/internal/assert"
)
func TestExecutor(t *testing.T) {
ctx := context.Background()
executor := NewExecutor[int](ctx, NewExecutorConfig(2, 2))
job := func(_ context.Context) (int, error) {
time.Sleep(time.Millisecond)
return 1, nil
}
jobLong := func(_ context.Context) (int, error) {
time.Sleep(10 * time.Millisecond)
return 1, nil
}
future1 := submitJob[int](t, executor, job)
future2 := submitJob[int](t, executor, job)
// wait for the first two jobs to complete
time.Sleep(3 * time.Millisecond)
// submit four more jobs
future3 := submitJob[int](t, executor, jobLong)
future4 := submitJob[int](t, executor, jobLong)
future5 := submitJob[int](t, executor, jobLong)
future6 := submitJob[int](t, executor, jobLong)
// the queue has reached its maximum capacity
future7, err := executor.Submit(job)
assert.ErrorIs(t, err, ErrExecutorQueueFull)
assert.IsNil(t, future7)
assert.Equal(t, executor.Status(), ExecutorStatusRunning)
routines := runtime.NumGoroutine()
// shut down the executor
_ = executor.Shutdown()
time.Sleep(time.Millisecond)
// verify that submit fails after the executor was shut down
_, err = executor.Submit(job)
assert.ErrorIs(t, err, ErrExecutorShutDown)
// validate the executor status
assert.Equal(t, executor.Status(), ExecutorStatusTerminating)
time.Sleep(10 * time.Millisecond)
assert.Equal(t, executor.Status(), ExecutorStatusShutDown)
assert.Equal(t, routines, runtime.NumGoroutine()+4)
assertFutureResult(t, 1, future1, future2, future3, future4)
assertFutureError(t, ErrExecutorShutDown, future5, future6)
}
func TestExecutor_context(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
executor := NewExecutor[int](ctx, NewExecutorConfig(2, 2))
job := func(_ context.Context) (int, error) {
return 0, errors.New("error")
}
future, err := executor.Submit(job)
assert.IsNil(t, err)
result, err := future.Join()
assert.Equal(t, result, 0)
assert.ErrorContains(t, err, "error")
cancel()
time.Sleep(5 * time.Millisecond)
_, err = executor.Submit(job)
assert.ErrorIs(t, err, ErrExecutorShutDown)
assert.Equal(t, executor.Status(), ExecutorStatusShutDown)
}
func TestExecutor_jobPanic(t *testing.T) {
ctx := context.Background()
executor := NewExecutor[int](ctx, NewExecutorConfig(2, 2))
job := func(_ context.Context) (int, error) {
var i int
return 1 / i, nil
}
future, err := executor.Submit(job)
assert.IsNil(t, err)
result, err := future.Join()
assert.Equal(t, result, 0)
assert.ErrorContains(t, err, "integer divide by zero")
_ = executor.Shutdown()
}
func submitJob[T any](t *testing.T, executor ExecutorService[T],
f func(context.Context) (T, error)) Future[T] {
future, err := executor.Submit(f)
assert.IsNil(t, err)
runtime.Gosched()
return future
}
func assertFutureResult[T any](t *testing.T, expected T, futures ...Future[T]) {
for _, future := range futures {
result, err := future.Join()
assert.IsNil(t, err)
assert.Equal(t, expected, result)
}
}
func assertFutureError[T any](t *testing.T, expected error, futures ...Future[T]) {
for _, future := range futures {
result, err := future.Join()
var zero T
assert.Equal(t, zero, result)
assert.ErrorIs(t, err, expected)
}
}