1
- from typing import Any , Dict , List , Tuple
1
+ from typing import Any , Dict , List , cast
2
2
3
- import click
4
3
from pygitguardian .client import VERSIONS
5
4
from pygitguardian .models import Match , PolicyBreak
6
5
13
12
from ggshield .utils import Filemode , find_match_indices , get_lines_from_content
14
13
15
14
16
- class JSONHandler (OutputHandler ):
17
- def process_scan (
15
+ class JSONOutputHandler (OutputHandler ):
16
+ def _process_scan_impl (self , scan : ScanCollection ) -> str :
17
+ scan_dict = self .create_scan_dict (scan , top = True )
18
+ text = JSONScanCollectionSchema ().dumps (scan_dict )
19
+ # dumps() return type is not defined, so cast `text`, otherwise mypy complains
20
+ return cast (str , text )
21
+
22
+ def create_scan_dict (
18
23
self , scan : ScanCollection , top : bool = True
19
- ) -> Tuple [ Dict [str , Any ], int ]:
24
+ ) -> Dict [str , Any ]:
20
25
scan_dict : Dict [str , Any ] = {
21
26
"id" : scan .id ,
22
27
"type" : scan .type ,
23
28
"total_incidents" : 0 ,
24
29
"total_occurrences" : 0 ,
25
30
}
26
- return_code = 0
27
-
28
31
if scan .extra_info :
29
32
scan_dict ["extra_info" ] = scan .extra_info
30
33
31
34
if top and scan .results :
32
35
scan_dict ["secrets_engine_version" ] = VERSIONS .secrets_engine_version
33
36
34
37
if scan .results :
35
- return_code = 1
36
38
for result in scan .results :
37
39
result_dict = self .process_result (result )
38
40
scan_dict .setdefault ("results" , []).append (result_dict )
@@ -41,22 +43,12 @@ def process_scan(
41
43
42
44
if scan .scans :
43
45
for inner_scan in scan .scans_with_results :
44
- inner_scan_dict , inner_return_code = self .process_scan (
45
- inner_scan , top = False
46
- )
46
+ inner_scan_dict = self .create_scan_dict (inner_scan , top = False )
47
47
scan_dict .setdefault ("scans" , []).append (inner_scan_dict )
48
48
scan_dict ["total_incidents" ] += inner_scan_dict ["total_incidents" ]
49
49
scan_dict ["total_occurrences" ] += inner_scan_dict ["total_occurrences" ]
50
- return_code = max (return_code , inner_return_code )
51
-
52
- if top :
53
- if self .output :
54
- with open (self .output , "w+" ) as f :
55
- f .write (JSONScanCollectionSchema ().dumps (scan_dict ))
56
- else :
57
- click .echo (JSONScanCollectionSchema ().dumps (scan_dict ))
58
50
59
- return scan_dict , return_code
51
+ return scan_dict
60
52
61
53
def process_result (self , result : Result ) -> Dict [str , Any ]:
62
54
result_dict : Dict [str , Any ] = {
@@ -110,7 +102,9 @@ def flattened_policy_break(
110
102
flattened_dict ["validity" ] = policy_breaks [0 ].validity
111
103
112
104
for policy_break in policy_breaks :
113
- matches = JSONHandler .make_matches (policy_break .matches , lines , is_patch )
105
+ matches = JSONOutputHandler .make_matches (
106
+ policy_break .matches , lines , is_patch
107
+ )
114
108
flattened_dict ["occurrences" ].extend (matches )
115
109
116
110
return flattened_dict
0 commit comments