|
| 1 | +from jupyter_server.base.zmqhandlers import AuthenticatedZMQStreamHandler |
| 2 | +from jupyter_server.services.kernels.handlers import _kernel_id_regex |
| 3 | +from jupyter_server.services.kernels.handlers import ZMQChannelsHandler |
| 4 | + |
| 5 | +from data_studio_jupyter_extensions.hubble.metrics import HUBBLE_METRICS |
| 6 | + |
| 7 | + |
| 8 | +def hubble_execution_time_and_count(msg): |
| 9 | + """Emit the time it takes to run a cell and the execution cound to hubble.""" |
| 10 | + msg_type = msg.get("msg_type", None) |
| 11 | + if msg_type == "execute_reply": |
| 12 | + tdelta = msg["header"]["date"] - msg["parent_header"]["date"] |
| 13 | + seconds = tdelta.total_seconds() |
| 14 | + HUBBLE_METRICS.KERNEL_EXECUTION_LATENCY.emit(value=seconds) |
| 15 | + count = msg["content"]["execution_count"] |
| 16 | + HUBBLE_METRICS.KERNEL_EXECUTION_COUNT.emit(value=count) |
| 17 | + |
| 18 | + |
| 19 | +class OverrideZMQChannelsHandler(ZMQChannelsHandler): |
| 20 | + """Adds hubble metrics to Jupyter websocket handler.""" |
| 21 | + |
| 22 | + # This is a complete fork of the Jupyter Websocket handler |
| 23 | + # method. We don't want to reply on this long time, but should |
| 24 | + # work on getting this into Jupyter Telemetry. |
| 25 | + def _on_zmq_reply(self, stream, msg_list): |
| 26 | + idents, fed_msg_list = self.session.feed_identities(msg_list) |
| 27 | + |
| 28 | + if self.subprotocol == "v1.kernel.websocket.jupyter.org": |
| 29 | + msg = {"header": None, "parent_header": None, "content": None} |
| 30 | + else: |
| 31 | + msg = self.session.deserialize(fed_msg_list) |
| 32 | + |
| 33 | + # This chunk is (the only) DataStudio customization for collecting hubble metrics. |
| 34 | + # if this chunk fails, don't stop messages from going through. just log a warning. |
| 35 | + try: |
| 36 | + hubble_execution_time_and_count(msg) |
| 37 | + except Exception as err: |
| 38 | + self.log.warning(err) |
| 39 | + |
| 40 | + channel = getattr(stream, "channel", None) |
| 41 | + parts = fed_msg_list[1:] |
| 42 | + |
| 43 | + self._on_error(channel, msg, parts) |
| 44 | + |
| 45 | + if self._limit_rate(channel, msg, parts): |
| 46 | + return |
| 47 | + |
| 48 | + if self.subprotocol == "v1.kernel.websocket.jupyter.org": |
| 49 | + AuthenticatedZMQStreamHandler._on_zmq_reply(self, stream, parts) |
| 50 | + else: |
| 51 | + AuthenticatedZMQStreamHandler._on_zmq_reply(self, stream, msg) |
| 52 | + |
| 53 | + |
| 54 | +handlers = [ |
| 55 | + ( |
| 56 | + rf"/api/kernels/{_kernel_id_regex}/channels", |
| 57 | + OverrideZMQChannelsHandler, |
| 58 | + ), |
| 59 | +] |
0 commit comments