-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaction.go
127 lines (113 loc) · 2.85 KB
/
action.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package workflow
import (
"sync"
)
// A simple function definition with a single input and output.
type Action func(any) (any, error)
// Contains an action output and associated error, if any.
type Result struct {
Out any
Err error
}
// Encapsulate a function with types into an action.
func Do[T1 any, T2 any](action func(T1) (T2, error)) Action {
return func(in any) (any, error) {
input := in.(T1)
return action(input)
}
}
// Combines multiple actions into a single action that will execute based on the order the actions were passed.
func Sequential(actions ...Action) Action {
if len(actions) == 0 {
return NoOp()
}
sequential := actions[0]
for i := 1; i < len(actions); i++ {
sequential = wrap(sequential, actions[i])
}
return sequential
}
// Execute multiple actions in parallel. The reduce function should combine all parallel results into a single result.
func Parallel[T any](reduce func(in []Result) (T, error), actions ...Action) Action {
return func(in any) (any, error) {
var outputs []Result
var lock sync.Mutex
var wg sync.WaitGroup
for _, v := range actions {
wg.Add(1)
go func(in any) {
defer wg.Done()
out, err := v(in)
lock.Lock()
defer lock.Unlock()
outputs = append(outputs, Result{
Out: out,
Err: err,
})
}(in)
}
wg.Wait()
return reduce(outputs)
}
}
// Conditionally execute another action. Only one action will be executed.
func If[T any](condition func(in T) (bool, error), ifTrue Action, ifFalse Action) Action {
// all functions must be valid
if condition == nil || ifTrue == nil || ifFalse == nil {
return NoOp()
}
return func(in any) (any, error) {
input := in.(T)
condition, err := condition(input)
if err != nil {
return nil, err
}
if condition {
return ifTrue(in)
} else {
return ifFalse(in)
}
}
}
// Executes an action and calls the handle function if an error occurs.
func Catch(action Action, handle func(any, error) (any, error)) Action {
return func(in any) (any, error) {
out, err := action(in)
if err != nil {
return handle(out, err)
}
return out, nil
}
}
// Executes an action and then, regardless of whether an error occurred, calls the finally function.
func Finally(action Action, finally func(any, error) (any, error)) Action {
return func(in any) (any, error) {
out, err := action(in)
return finally(out, err)
}
}
// Returns an action that does nothing and returns nil.
func NoOp() Action {
return func(in any) (any, error) {
return nil, nil
}
}
// Wraps provided actions so that "action" is called first and then "next" is called.
func wrap(action Action, next Action) Action {
if action == nil && next == nil {
return NoOp()
}
if action == nil {
return next
}
if next == nil {
return action
}
return func(in any) (any, error) {
out, err := action(in)
if err != nil {
return out, err
}
return next(out)
}
}