Skip to content

Commit 6dd5845

Browse files
authored
Make monitoring radio send() signature consistent with radio.send (#3672)
This is an API tidyup that works towards all monitoring message sends taking a TaggedMonitoringMessage: callers of send are responsible for forming a tagged tuple. ## Type of change - Code maintenance/cleanup
1 parent 7fe576e commit 6dd5845

File tree

2 files changed

+9
-11
lines changed

2 files changed

+9
-11
lines changed

parsl/dataflow/dflow.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ def __init__(self, config: Config) -> None:
162162
}
163163

164164
if self.monitoring:
165-
self.monitoring.send(MessageType.WORKFLOW_INFO,
166-
workflow_info)
165+
self.monitoring.send((MessageType.WORKFLOW_INFO,
166+
workflow_info))
167167

168168
if config.checkpoint_files is not None:
169169
checkpoints = self.load_checkpoints(config.checkpoint_files)
@@ -238,7 +238,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None:
238238
def _send_task_log_info(self, task_record: TaskRecord) -> None:
239239
if self.monitoring:
240240
task_log_info = self._create_task_log_info(task_record)
241-
self.monitoring.send(MessageType.TASK_INFO, task_log_info)
241+
self.monitoring.send((MessageType.TASK_INFO, task_log_info))
242242

243243
def _create_task_log_info(self, task_record: TaskRecord) -> Dict[str, Any]:
244244
"""
@@ -1295,12 +1295,12 @@ def cleanup(self) -> None:
12951295

12961296
if self.monitoring:
12971297
logger.info("Sending final monitoring message")
1298-
self.monitoring.send(MessageType.WORKFLOW_INFO,
1298+
self.monitoring.send((MessageType.WORKFLOW_INFO,
12991299
{'tasks_failed_count': self.task_state_counts[States.failed],
13001300
'tasks_completed_count': self.task_state_counts[States.exec_done],
13011301
"time_began": self.time_began,
13021302
'time_completed': self.time_completed,
1303-
'run_id': self.run_id, 'rundir': self.run_dir})
1303+
'run_id': self.run_id, 'rundir': self.run_dir}))
13041304

13051305
logger.info("Terminating monitoring")
13061306
self.monitoring.close()

parsl/monitoring/monitoring.py

+4-6
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@
77
import time
88
from multiprocessing import Event, Process
99
from multiprocessing.queues import Queue
10-
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast
10+
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union, cast
1111

1212
import typeguard
1313

1414
from parsl.log_utils import set_file_logger
1515
from parsl.monitoring.errors import MonitoringHubStartError
16-
from parsl.monitoring.message_type import MessageType
1716
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
1817
from parsl.monitoring.router import router_starter
1918
from parsl.monitoring.types import TaggedMonitoringMessage
@@ -202,10 +201,9 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
202201

203202
self.hub_zmq_port = zmq_port
204203

205-
# TODO: tighten the Any message format
206-
def send(self, mtype: MessageType, message: Any) -> None:
207-
logger.debug("Sending message type {}".format(mtype))
208-
self.radio.send((mtype, message))
204+
def send(self, message: TaggedMonitoringMessage) -> None:
205+
logger.debug("Sending message type {}".format(message[0]))
206+
self.radio.send(message)
209207

210208
def close(self) -> None:
211209
logger.info("Terminating Monitoring Hub")

0 commit comments

Comments
 (0)