1
1
import json
2
2
import pathlib
3
3
from pathlib import Path
4
- from typing import List , Union , Callable
4
+ from typing import Any , List , Union , Callable
5
5
6
- import jsonschema
6
+ from jsonschema import Draft202012Validator , Validator , ValidationError
7
+
8
+ from referencing import Registry , Resource
7
9
8
10
import yaml
9
11
@@ -36,37 +38,60 @@ def backtrack_yaml_location(path: List[Union[int, str]], ast: YamlAST) -> yaml.e
36
38
assert 0 , "unreachable switch branch"
37
39
38
40
39
- def validate_schema (instance_object : Path , schema_entrypoint : Path , warn : Callable [[str ], None ]) -> list :
40
- subschemas_path = pathlib .Path (schema_entrypoint ).parent
41
+ def remove_unsupported_regex_patterns (data : Any ) -> dict :
42
+ if isinstance (data , dict ):
43
+ cleanedup_dict = {
44
+ k : remove_unsupported_regex_patterns (v ) for k , v in data .items ()
45
+ # if not (k == "pattern" and isinstance(v, str) and r"\\p" in v)
46
+ if not (k == "pattern" and isinstance (v , str ) and r"\p" in v )
47
+ }
48
+ return cleanedup_dict
49
+ elif isinstance (data , list ):
50
+ cleanedup_list = [
51
+ remove_unsupported_regex_patterns (i ) for i in data
52
+ ]
53
+ return cleanedup_list
54
+ else :
55
+ return data
56
+
57
+
58
+ def validate_schema (extension_yaml_path : Path , extension_schema_path : Path , warn : Callable [[str ], None ]) -> list :
59
+ schema_dir_path = pathlib .Path (extension_schema_path ).parent
41
60
42
- with open (instance_object , "r" ) as yaml_in :
61
+ with open (extension_yaml_path , "r" ) as yaml_in :
43
62
instance = yaml .safe_load (yaml_in )
44
63
45
- with open (schema_entrypoint ) as f :
46
- s = json .load (f )
64
+ with open (extension_schema_path ) as f :
65
+ schema_data = json .load (f )
66
+ cleanedup_schema_data = remove_unsupported_regex_patterns (schema_data )
47
67
48
- schema_store = {}
49
- for subschema_candidate in filter (lambda f : f .is_file , pathlib .Path (subschemas_path ).iterdir ()):
68
+ subschema_paths = [
69
+ p for p in schema_dir_path .iterdir () if p .is_file () and extension_schema_path .absolute () != p .absolute ()
70
+ ]
71
+
72
+ resources : list [tuple [str , dict ]] = []
73
+ for c in subschema_paths :
50
74
try :
51
- with open (subschema_candidate ) as f :
52
- sub = json .load (f )
75
+ with open (c ) as f :
76
+ subschema_data = json .load (f )
53
77
except json .decoder .JSONDecodeError :
54
- warn (f"skipping subschema { subschema_candidate } , malformed json" )
55
- # print(f"skipping subschema {subschema_candidate}, malformed json", file=sys.stderr)
78
+ warn (f"skipping subschema { c } , malformed json" )
56
79
else :
57
- sub_id = sub ["$id" ]
58
- schema_store [sub_id ] = sub
59
-
60
- resolver = jsonschema .validators .RefResolver .from_schema (s , store = schema_store )
61
-
62
- validator_cls = jsonschema .validators .validator_for (s )
63
- validator_cls .check_schema (s ) # against META_SCHEMA
64
- validator = validator_cls (s , resolver = resolver )
65
-
66
- with open (instance_object , "r" ) as f :
80
+ cleanedup_subschema_data = remove_unsupported_regex_patterns (subschema_data )
81
+ subschema_uri = cleanedup_subschema_data ["$id" ]
82
+ subschema = Resource .from_contents (cleanedup_subschema_data )
83
+ resources .append ((subschema_uri , subschema ))
84
+
85
+ registry = Registry ().with_resources (resources )
86
+ validator : Validator = Draft202012Validator (
87
+ cleanedup_schema_data ,
88
+ registry = registry ,
89
+ )
90
+
91
+ with open (extension_yaml_path , "r" ) as f :
67
92
file = yaml .compose (f )
68
93
69
- def process_validation_error (error ):
94
+ def process_validation_error (error : ValidationError ):
70
95
err_loc = backtrack_yaml_location (error .absolute_path , file )
71
96
72
97
# TODO: add typo finder for some cases - like enum mismatch
@@ -77,4 +102,10 @@ def process_validation_error(error):
77
102
"cause" : error .message
78
103
}
79
104
80
- return list (map (process_validation_error , validator .iter_errors (instance )))
105
+ detected_errors = list (validator .iter_errors (instance ))
106
+ errors_info : list [dict ] = []
107
+ for err in detected_errors :
108
+ error_info = process_validation_error (err )
109
+ errors_info .append (error_info )
110
+
111
+ return errors_info
0 commit comments