-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsignals_test.go
156 lines (121 loc) · 3.92 KB
/
signals_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// SPDX-FileCopyrightText: © 2024 Kevin Conway <[email protected]>
// SPDX-License-Identifier: Apache-2.0
package cdc
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFSNotifySignal(t *testing.T) {
t.Parallel()
// Create a test database
db := testDB(t)
t.Cleanup(func() { db.Close() })
_, err := db.Exec("CREATE TABLE test (col INT)")
require.NoError(t, err)
// Create our signal
signal, err := NewFSNotifySignal(db)
require.NoError(t, err)
t.Cleanup(func() { signal.Close() })
// Start watching
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, signal.Start(ctx))
// Get the waker channel for receiving signals
waker := signal.Waker()
_, err = db.Exec("INSERT INTO test (col) VALUES (1)")
require.NoError(t, err)
assert.True(t, wait(t, waker), "should receive wake signal for db file change")
}
func TestTimeSignal(t *testing.T) {
t.Parallel()
interval := 10 * time.Millisecond
signal, err := NewTimeSignal(interval)
require.NoError(t, err)
t.Cleanup(func() { signal.Close() })
// Start watching
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, signal.Start(ctx))
// Get the waker channel for receiving signals
waker := signal.Waker()
// Should receive multiple signals
for x := 0; x < 3; x = x + 1 {
assert.True(t, wait(t, waker), "should receive wake signal on interval")
}
// Test cancellation
cancel()
time.Sleep(interval * 2)
require.False(t, wait(t, waker), "should not receive wake signal after cancellation")
}
func TestChannelSignal(t *testing.T) {
t.Parallel()
input := make(chan SignalEvent, 1)
signal, err := NewChannelSignal(input)
require.NoError(t, err)
t.Cleanup(func() { signal.Close() })
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, signal.Start(ctx))
waker := signal.Waker()
// Should receive all signals
for x := 0; x < 3; x = x + 1 {
input <- SignalEvent{Wake: true}
assert.True(t, wait(t, waker), "should receive wake signal from input channel")
}
// Test cancellation
cancel()
require.False(t, wait(t, waker), "should not receive wake signal after cancellation")
}
func TestMultiSignal(t *testing.T) {
t.Parallel()
// Create multiple input channels
input1 := make(chan SignalEvent, 1)
signal1, err := NewChannelSignal(input1)
require.NoError(t, err)
input2 := make(chan SignalEvent, 1)
signal2, err := NewChannelSignal(input2)
require.NoError(t, err)
input3 := make(chan SignalEvent, 1)
signal3, err := NewChannelSignal(input3)
require.NoError(t, err)
// Create multi signal
multi, err := NewMultiSignal(signal1, signal2, signal3)
require.NoError(t, err)
t.Cleanup(func() { multi.Close() })
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, multi.Start(ctx))
waker := multi.Waker()
// Test signals from first channel
input1 <- SignalEvent{Wake: true}
assert.True(t, wait(t, waker), "should receive wake signal from first channel")
// Test signals from second channel
input2 <- SignalEvent{Wake: true}
assert.True(t, wait(t, waker), "should receive wake signal from second channel")
// Test signals from third channel
input3 <- SignalEvent{Wake: true}
assert.True(t, wait(t, waker), "should receive wake signal from third channel")
// Test multiple concurrent signals
input1 <- SignalEvent{Wake: true}
input2 <- SignalEvent{Wake: true}
input3 <- SignalEvent{Wake: true}
for x := 0; x < 3; x = x + 1 {
assert.True(t, wait(t, waker), "should receive wake signal from concurrent inputs")
}
// Test cancellation
cancel()
require.False(t, wait(t, waker), "should not receive wake signal after cancellation")
}
func wait(tb tOrB, ch <-chan SignalEvent) bool {
tb.Helper()
select {
case event := <-ch:
require.NoError(tb, event.Err)
return event.Wake
case <-time.After(time.Second):
return false
}
}