-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtask.go
83 lines (68 loc) · 1.52 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package sqlx
import (
"fmt"
"runtime/debug"
"github.com/go-courier/logr"
"github.com/pkg/errors"
)
type Task func(db DBExecutor) error
func (task Task) Run(db DBExecutor) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic: %s; calltrace:%s", fmt.Sprint(e), string(debug.Stack()))
}
}()
return task(db)
}
func NewTasks(db DBExecutor) *Tasks {
return &Tasks{
db: db,
}
}
type Tasks struct {
db DBExecutor
tasks []Task
}
func (tasks Tasks) With(task ...Task) *Tasks {
tasks.tasks = append(tasks.tasks, task...)
return &tasks
}
func (tasks *Tasks) Do() (err error) {
if len(tasks.tasks) == 0 {
return nil
}
db := tasks.db
log := logr.FromContext(db.Context())
if maybeTx, ok := db.(MaybeTxExecutor); ok {
inTxScope := false
if !maybeTx.IsTx() {
db, err = maybeTx.Begin()
if err != nil {
return err
}
maybeTx = db.(MaybeTxExecutor)
inTxScope = true
}
for _, task := range tasks.tasks {
if runErr := task.Run(db); runErr != nil {
if inTxScope {
// err will bubble up,just handle and rollback in outermost layer
log.Error(errors.Wrap(err, "SQL FAILED"))
if rollBackErr := maybeTx.Rollback(); rollBackErr != nil {
log.Warn(errors.Wrap(rollBackErr, "ROLLBACK FAILED"))
err = rollBackErr
return
}
}
return runErr
}
}
if inTxScope {
if commitErr := maybeTx.Commit(); commitErr != nil {
log.Warn(errors.Wrap(commitErr, "TRANSACTION COMMIT FAILED"))
return commitErr
}
}
}
return nil
}