17
17
import traceback
18
18
19
19
from types import CodeType , FrameType
20
- from typing import Any , Callable , Dict , List , Optional , Set , cast
20
+ from typing import (
21
+ Any ,
22
+ Callable ,
23
+ Dict ,
24
+ List ,
25
+ Optional ,
26
+ Set ,
27
+ TYPE_CHECKING ,
28
+ cast ,
29
+ )
21
30
22
31
from coverage .debug import short_filename , short_stack
23
32
from coverage .types import (
27
36
TLineNo ,
28
37
TTraceData ,
29
38
TTraceFileData ,
30
- TTraceFn ,
31
- TTracer ,
39
+ TracerCore ,
32
40
TWarnFn ,
33
41
)
34
42
35
43
# pylint: disable=unused-argument
36
- # As of mypy 1.7.1, sys.monitoring isn't in typeshed stubs.
37
- # mypy: ignore-errors
38
44
39
45
LOG = False
40
46
41
47
# This module will be imported in all versions of Python, but only used in 3.12+
48
+ # It will be type-checked for 3.12, but not for earlier versions.
42
49
sys_monitoring = getattr (sys , "monitoring" , None )
43
50
44
- if LOG : # pragma: debugging
51
+ if TYPE_CHECKING :
52
+ assert sys_monitoring is not None
53
+ # I want to say this but it's not allowed:
54
+ # MonitorReturn = Literal[sys.monitoring.DISABLE] | None
55
+ MonitorReturn = Any
56
+
57
+
58
+ if LOG : # pragma: debugging
45
59
46
60
class LoggingWrapper :
47
61
"""Wrap a namespace to log all its functions."""
@@ -58,6 +72,7 @@ def _wrapped(*args: Any, **kwargs: Any) -> Any:
58
72
return _wrapped
59
73
60
74
sys_monitoring = LoggingWrapper (sys_monitoring , "sys.monitoring" )
75
+ assert sys_monitoring is not None
61
76
62
77
short_stack = functools .partial (
63
78
short_stack , full = True , short_filenames = True , frame_ids = True
@@ -114,8 +129,9 @@ def _wrapped(self: Any, *args: Any) -> Any:
114
129
return ret
115
130
except Exception as exc :
116
131
log (f"!!{ exc .__class__ .__name__ } : { exc } " )
117
- log ("" .join (traceback .format_exception (exc ))) # pylint: disable=no-value-for-parameter
132
+ log ("" .join (traceback .format_exception (exc ))) # pylint: disable=[ no-value-for-parameter]
118
133
try :
134
+ assert sys_monitoring is not None
119
135
sys_monitoring .set_events (sys .monitoring .COVERAGE_ID , 0 )
120
136
except ValueError :
121
137
# We might have already shut off monitoring.
@@ -146,13 +162,14 @@ class CodeInfo:
146
162
147
163
tracing : bool
148
164
file_data : Optional [TTraceFileData ]
149
- byte_to_line : Dict [int , int ]
165
+ # TODO: what is byte_to_line for?
166
+ byte_to_line : Dict [int , int ] | None
150
167
151
168
152
169
def bytes_to_lines (code : CodeType ) -> Dict [int , int ]:
153
170
"""Make a dict mapping byte code offsets to line numbers."""
154
171
b2l = {}
155
- cur_line = None
172
+ cur_line = 0
156
173
for inst in dis .get_instructions (code ):
157
174
if inst .starts_line is not None :
158
175
cur_line = inst .starts_line
@@ -161,7 +178,7 @@ def bytes_to_lines(code: CodeType) -> Dict[int, int]:
161
178
return b2l
162
179
163
180
164
- class SysMonitor (TTracer ):
181
+ class SysMonitor (TracerCore ):
165
182
"""Python implementation of the raw data tracer for PEP669 implementations."""
166
183
167
184
# One of these will be used across threads. Be careful.
@@ -185,7 +202,7 @@ def __init__(self) -> None:
185
202
self .code_infos : Dict [int , CodeInfo ] = {}
186
203
# A list of code_objects, just to keep them alive so that id's are
187
204
# useful as identity.
188
- self .code_objects : List [CodeInfo ] = []
205
+ self .code_objects : List [CodeType ] = []
189
206
self .last_lines : Dict [FrameType , int ] = {}
190
207
# Map id(code_object) -> code_object
191
208
self .local_event_codes : Dict [int , CodeType ] = {}
@@ -205,15 +222,14 @@ def __init__(self) -> None:
205
222
def __repr__ (self ) -> str :
206
223
points = sum (len (v ) for v in self .data .values ())
207
224
files = len (self .data )
208
- return (
209
- f"<SysMonitor at { id (self ):#x} : { points } data points in { files } files>"
210
- )
225
+ return f"<SysMonitor at { id (self ):#x} : { points } data points in { files } files>"
211
226
212
227
@panopticon ()
213
- def start (self ) -> TTraceFn :
228
+ def start (self ) -> None :
214
229
"""Start this Tracer."""
215
230
self .stopped = False
216
231
232
+ assert sys_monitoring is not None
217
233
sys_monitoring .use_tool_id (self .myid , "coverage.py" )
218
234
register = functools .partial (sys_monitoring .register_callback , self .myid )
219
235
events = sys .monitoring .events
@@ -237,6 +253,7 @@ def start(self) -> TTraceFn:
237
253
@panopticon ()
238
254
def stop (self ) -> None :
239
255
"""Stop this Tracer."""
256
+ assert sys_monitoring is not None
240
257
sys_monitoring .set_events (self .myid , 0 )
241
258
for code in self .local_event_codes .values ():
242
259
sys_monitoring .set_local_events (self .myid , code , 0 )
@@ -266,36 +283,39 @@ def get_stats(self) -> Optional[Dict[str, int]]:
266
283
267
284
def callers_frame (self ) -> FrameType :
268
285
"""Get the frame of the Python code we're monitoring."""
269
- return inspect .currentframe ().f_back .f_back .f_back
286
+ return (
287
+ inspect .currentframe ().f_back .f_back .f_back # type: ignore[union-attr,return-value]
288
+ )
289
+
270
290
else :
271
291
272
292
def callers_frame (self ) -> FrameType :
273
293
"""Get the frame of the Python code we're monitoring."""
274
- return inspect .currentframe ().f_back .f_back
294
+ return inspect .currentframe ().f_back .f_back # type: ignore[union-attr,return-value]
275
295
276
296
@panopticon ("code" , "@" )
277
- def sysmon_py_start (self , code : CodeType , instruction_offset : int ):
297
+ def sysmon_py_start (self , code : CodeType , instruction_offset : int ) -> MonitorReturn :
278
298
"""Handle sys.monitoring.events.PY_START events."""
279
299
# Entering a new frame. Decide if we should trace in this file.
280
300
self ._activity = True
281
301
self .stats ["starts" ] += 1
282
302
283
303
code_info = self .code_infos .get (id (code ))
304
+ tracing_code : bool | None = None
305
+ file_data : TTraceFileData | None = None
284
306
if code_info is not None :
285
307
tracing_code = code_info .tracing
286
308
file_data = code_info .file_data
287
- else :
288
- tracing_code = file_data = None
289
309
290
310
if tracing_code is None :
291
311
filename = code .co_filename
292
312
disp = self .should_trace_cache .get (filename )
293
313
if disp is None :
294
- frame = inspect .currentframe ().f_back
314
+ frame = inspect .currentframe ().f_back # type: ignore[union-attr]
295
315
if LOG :
296
316
# @panopticon adds a frame.
297
- frame = frame .f_back
298
- disp = self .should_trace (filename , frame )
317
+ frame = frame .f_back # type: ignore[union-attr]
318
+ disp = self .should_trace (filename , frame ) # type: ignore[arg-type]
299
319
self .should_trace_cache [filename ] = disp
300
320
301
321
tracing_code = disp .trace
@@ -320,10 +340,12 @@ def sysmon_py_start(self, code: CodeType, instruction_offset: int):
320
340
if tracing_code :
321
341
events = sys .monitoring .events
322
342
if self .sysmon_on :
343
+ assert sys_monitoring is not None
323
344
sys_monitoring .set_local_events (
324
345
self .myid ,
325
346
code ,
326
347
events .PY_RETURN
348
+ #
327
349
| events .PY_RESUME
328
350
# | events.PY_YIELD
329
351
| events .LINE ,
@@ -340,15 +362,17 @@ def sysmon_py_start(self, code: CodeType, instruction_offset: int):
340
362
return sys .monitoring .DISABLE
341
363
342
364
@panopticon ("code" , "@" )
343
- def sysmon_py_resume_arcs (self , code : CodeType , instruction_offset : int ):
365
+ def sysmon_py_resume_arcs (
366
+ self , code : CodeType , instruction_offset : int
367
+ ) -> MonitorReturn :
344
368
"""Handle sys.monitoring.events.PY_RESUME events for branch coverage."""
345
369
frame = self .callers_frame ()
346
370
self .last_lines [frame ] = frame .f_lineno
347
371
348
372
@panopticon ("code" , "@" , None )
349
373
def sysmon_py_return_arcs (
350
374
self , code : CodeType , instruction_offset : int , retval : object
351
- ):
375
+ ) -> MonitorReturn :
352
376
"""Handle sys.monitoring.events.PY_RETURN events for branch coverage."""
353
377
frame = self .callers_frame ()
354
378
code_info = self .code_infos .get (id (code ))
@@ -360,7 +384,9 @@ def sysmon_py_return_arcs(
360
384
self .last_lines .pop (frame , None )
361
385
362
386
@panopticon ("code" , "@" , None )
363
- def sysmon_py_unwind_arcs (self , code : CodeType , instruction_offset : int , exception ):
387
+ def sysmon_py_unwind_arcs (
388
+ self , code : CodeType , instruction_offset : int , exception : BaseException
389
+ ) -> MonitorReturn :
364
390
"""Handle sys.monitoring.events.PY_UNWIND events for branch coverage."""
365
391
frame = self .callers_frame ()
366
392
code_info = self .code_infos .get (id (code ))
@@ -372,7 +398,7 @@ def sysmon_py_unwind_arcs(self, code: CodeType, instruction_offset: int, excepti
372
398
self .last_lines .pop (frame , None )
373
399
374
400
@panopticon ("code" , "line" )
375
- def sysmon_line_lines (self , code : CodeType , line_number : int ):
401
+ def sysmon_line_lines (self , code : CodeType , line_number : int ) -> MonitorReturn :
376
402
"""Handle sys.monitoring.events.LINE events for line coverage."""
377
403
code_info = self .code_infos [id (code )]
378
404
if code_info .file_data is not None :
@@ -381,7 +407,7 @@ def sysmon_line_lines(self, code: CodeType, line_number: int):
381
407
return sys .monitoring .DISABLE
382
408
383
409
@panopticon ("code" , "line" )
384
- def sysmon_line_arcs (self , code : CodeType , line_number : int ):
410
+ def sysmon_line_arcs (self , code : CodeType , line_number : int ) -> MonitorReturn :
385
411
"""Handle sys.monitoring.events.LINE events for branch coverage."""
386
412
code_info = self .code_infos [id (code )]
387
413
ret = None
0 commit comments