Skip to content

Commit f04f6d0

Browse files
committed
(events): Add re-subscribe mechanism
If we got HTTP error, likely we lost subscription due remote server restart. Instead of crashing, try to resubscribe several times, with delay, to allow remote server restart. Signed-off-by: Denys Fedoryshchenko <[email protected]>
1 parent 3ca3e35 commit f04f6d0

File tree

4 files changed

+68
-8
lines changed

4 files changed

+68
-8
lines changed

src/monitor.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import datetime
1010
import sys
11-
11+
import time
1212
import kernelci
1313
import kernelci.config
1414
from kernelci.legacy.cli import Args, Command, parse_opts
@@ -53,9 +53,21 @@ def _run(self, sub_id):
5353
self.log.info("Monitor: Listening for events... ")
5454
self.log.info("Press Ctrl-C to stop.")
5555
print(self._log_titles, flush=True)
56-
56+
subscribe_retries = 0
5757
while True:
58-
event = self._api.receive_event(sub_id)
58+
event = None
59+
try:
60+
event = self._api.receive_event(sub_id)
61+
except Exception as e:
62+
self.log.error(f"Error receiving event: {e}")
63+
time.sleep(10)
64+
sub_id = self._api.subscribe('node')
65+
subscribe_retries += 1
66+
if subscribe_retries > 3:
67+
self.log.error("Failed to subscribe to node events")
68+
return False
69+
continue
70+
subscribe_retries = 0
5971
obj = event.data
6072
dt = datetime.datetime.fromisoformat(event['time'])
6173
try:

src/scheduler.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import requests
1515
import re
1616
import datetime
17+
import time
1718

1819
import kernelci
1920
import kernelci.config
@@ -304,9 +305,22 @@ def _verify_architecture_filter(self, job, node):
304305
def _run(self, sub_id):
305306
self.log.info("Listening for available checkout events")
306307
self.log.info("Press Ctrl-C to stop.")
308+
subscribe_retries = 0
307309

308310
while True:
309-
event = self._api_helper.receive_event_data(sub_id)
311+
event = None
312+
try:
313+
event = self._api_helper.receive_event_data(sub_id)
314+
except Exception as e:
315+
self.log.error(f"Error receiving event: {e}")
316+
time.sleep(10)
317+
sub_id = self._api.subscribe('node')
318+
subscribe_retries += 1
319+
if subscribe_retries > 3:
320+
self.log.error("Failed to re-subscribe to node events")
321+
return False
322+
continue
323+
subscribe_retries = 0
310324
for job, runtime, platform, rules in self._sched.get_schedule(event):
311325
input_node = self._api.node.get(event['id'])
312326
jobfilter = event.get('jobfilter')

src/send_kcidb.py

+18-2
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ def _run(self, context):
656656
self.log.info("Listening for events... Press Ctrl-C to stop.")
657657

658658
chunksize = 20
659-
659+
subscribe_retries = 0
660660
while True:
661661
is_hierarchy = False
662662

@@ -666,7 +666,23 @@ def _run(self, context):
666666
if not nodes:
667667
# Switch to event mode if no unprocessed nodes
668668
# Listen and wait for a node instead of processing the queue
669-
node, is_hierarchy = self._api_helper.receive_event_node(context['sub_id'])
669+
node = None
670+
try:
671+
node, is_hierarchy = self._api_helper.receive_event_node(context['sub_id'])
672+
except Exception as e:
673+
self.log.error(f"Error receiving event: {e}")
674+
time.sleep(10)
675+
context['sub_id'] = self._api_helper.subscribe_filters({
676+
'op': 'created',
677+
'kind': 'node',
678+
'state': 'done',
679+
})
680+
subscribe_retries += 1
681+
if subscribe_retries > 3:
682+
self.log.error("Failed to re-subscribe to node events")
683+
return False
684+
continue
685+
subscribe_retries = 0
670686
self.log.info(f"Processing event node: {node['id']}")
671687
nodes = [node]
672688
else:

src/tarball.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import sys
1515
import json
1616
import requests
17-
17+
import time
1818
import kernelci
1919
import kernelci.build
2020
import kernelci.config
@@ -206,9 +206,27 @@ def _stop(self, sub_id):
206206
def _run(self, sub_id):
207207
self.log.info("Listening for new trigger events")
208208
self.log.info("Press Ctrl-C to stop.")
209+
subscribe_retries = 0
209210

210211
while True:
211-
checkout_node, _ = self._api_helper.receive_event_node(sub_id)
212+
checkout_node = None
213+
try:
214+
checkout_node, _ = self._api_helper.receive_event_node(sub_id)
215+
except Exception as e:
216+
self.log.error(f"Error receiving event: {e}")
217+
time.sleep(10)
218+
# try to resubscribe
219+
sub_id = self._api_helper.subscribe_filters({
220+
'op': 'created',
221+
'kind': 'checkout',
222+
'state': 'running',
223+
})
224+
subscribe_retries += 1
225+
if subscribe_retries > 3:
226+
self.log.error("Failed to re-subscribe to checkout events")
227+
return False
228+
continue
229+
subscribe_retries = 0
212230

213231
build_config = self._find_build_config(checkout_node)
214232
if build_config is None:

0 commit comments

Comments
 (0)