Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ver. 0.0.3 release. #99

Merged
merged 19 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# Changelog
## v0.0.3
- Update documentation
- Fix evaluation logic in operator check
- Fix operator checks to follow other check_type logic #85
- Add support for multiple ref key #92
- Update gt and lt logic in operator check #64
- Fix ref key in dict of dicts data type #91
- Support single value in expression #94
- Minor fixes

## v0.0.2

- Update operator logic for returned result
- Update docs

## v0.0.1

- Initial release

## v0.0.1-beta.1

- First beta release
7 changes: 6 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Let's have a look at a couple of examples:
```
We will define a JMESPath expression for the values we want to test and extract from the reference and comparison objects.
```python
>>> my_jmspath = "global.$peers$.*.*.ipv4.[accepted_prefixes,received_prefixes,sent_prefixes]"
>>> my_jmspath = "global.peers.$*$.*.ipv4.[accepted_prefixes,received_prefixes,sent_prefixes]"
>>> reference_value = extract_data_from_json(reference_data, my_jmspath)
>>> reference_value
[{'10.1.0.0': {'accepted_prefixes': 900,
Expand Down Expand Up @@ -478,7 +478,12 @@ The `operator` check is a collection of more specific checks divided into catego
2. `is-lt`: Check if the value of a specified element is lesser than a given numeric value.
- `is-lt: 55`: checks if value is lower than 55 or not.

3. `is-ge`: Check if the value of a specified element is greater than or equal to a given numeric value.
- `is-ge: 2`: checks if value should be greater or equal than 2.

4. `is-le`: Check if the value of a specified element is lesser than or equal a given numeric value.
- `is-le: 55`: checks if value is lower or equal than 55 or not.

Examples:

```python
Expand Down
8 changes: 4 additions & 4 deletions jdiff/check_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""CheckType Implementation."""
from typing import Mapping, Tuple, Dict, Any, Union
from typing import List, Tuple, Dict, Any, Union
from abc import ABC, abstractmethod
from .evaluators import diff_generator, parameter_evaluator, regex_evaluator, operator_evaluator

Expand Down Expand Up @@ -136,7 +136,7 @@ def _validate(params, mode) -> None: # type: ignore[override]
f"'mode' argument should be one of the following: {', '.join(mode_options)}. You have: {mode}"
)

def evaluate(self, params: Dict, value_to_compare: Mapping, mode: str) -> Tuple[Dict, bool]: # type: ignore[override]
def evaluate(self, params: Dict, value_to_compare: List[Dict], mode: str) -> Tuple[Dict, bool]: # type: ignore[override]
"""Parameter Match evaluator implementation."""
self._validate(params=params, mode=mode)
# TODO: we don't use the mode?
Expand All @@ -161,7 +161,7 @@ def _validate(regex, mode) -> None: # type: ignore[override]
if mode not in mode_options:
raise ValueError(f"'mode' argument should be {mode_options}. You have: {mode}")

def evaluate(self, regex: str, value_to_compare: Mapping, mode: str) -> Tuple[Dict, bool]: # type: ignore[override]
def evaluate(self, regex: str, value_to_compare: List[Dict[Any, Dict]], mode: str) -> Tuple[Dict, bool]: # type: ignore[override]
"""Regex Match evaluator implementation."""
self._validate(regex=regex, mode=mode)
evaluation_result = regex_evaluator(value_to_compare, regex, mode)
Expand All @@ -176,7 +176,7 @@ def _validate(params) -> None: # type: ignore[override]
"""Validate operator parameters."""
in_operators = ("is-in", "not-in", "in-range", "not-in-range")
bool_operators = ("all-same",)
number_operators = ("is-gt", "is-lt")
number_operators = ("is-gt", "is-lt", "is-ge", "is-le")
string_operators = ("contains", "not-contains")
valid_options = (
in_operators,
Expand Down
42 changes: 21 additions & 21 deletions jdiff/evaluators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Evaluators."""
import re
from typing import Any, Mapping, Dict, Tuple
from typing import Any, Mapping, Dict, Tuple, List
from deepdiff import DeepDiff
from .utils.diff_helpers import get_diff_iterables_items, fix_deepdiff_key_names
from .operator import Operator
Expand Down Expand Up @@ -36,7 +36,7 @@ def diff_generator(pre_result: Any, post_result: Any) -> Dict:
return fix_deepdiff_key_names(result)


def parameter_evaluator(values: Mapping, parameters: Mapping, mode: str) -> Dict:
def parameter_evaluator(values: List[Dict], parameters: Mapping, mode: str) -> Dict:
"""Parameter Match evaluator engine.

Args:
Expand All @@ -51,41 +51,41 @@ def parameter_evaluator(values: Mapping, parameters: Mapping, mode: str) -> Dict
Dictionary with all the items that have some value not matching the expectations from parameters
"""
if not isinstance(values, list):
raise TypeError("Something went wrong during jmespath parsing. 'values' must be of type List.")
raise TypeError("'values' must be of type List.")

result = {}
for value in values:
for index, value in enumerate(values):
# value: {'7.7.7.7': {'peerAddress': '7.7.7.7', 'localAsn': '65130.1101', 'linkType': 'externals
if not isinstance(value, dict):
raise TypeError(
"Something went wrong during jmespath parsing. ",
f"'value' ({value}) must be of type Dict, and it's {type(value)}",
)
raise TypeError(f"'value' ({value}) must be of type Dict, and it's {type(value)}")

result_item = {}

# TODO: Why the 'value' dict has always ONE single element? we have to explain
# inner_key: '7.7.7.7'
inner_key = list(value.keys())[0]
# inner_value: [{'peerAddress': '7.7.7.7', 'localAsn': '65130.1101', 'linkType': 'externals'}]
inner_value = list(value.values())[0]
# When data has been normalized with $key$, get inner key and value
if len(value) == 1:
# inner_key: '7.7.7.7'
inner_key = list(value.keys())[0]
# inner_value: [{'peerAddress': '7.7.7.7', 'localAsn': '65130.1101', 'linkType': 'externals'}]
value = list(value.values())[0]
else:
inner_key = index

for parameter_key, parameter_value in parameters.items():
if mode == "match" and inner_value[parameter_key] != parameter_value:
result_item[parameter_key] = inner_value[parameter_key]
elif mode == "no-match" and inner_value[parameter_key] == parameter_value:
result_item[parameter_key] = inner_value[parameter_key]
if mode == "match" and value[parameter_key] != parameter_value:
result_item[parameter_key] = value[parameter_key]
elif mode == "no-match" and value[parameter_key] == parameter_value:
result_item[parameter_key] = value[parameter_key]

if result_item:
result[inner_key] = result_item

return result


def regex_evaluator(values: Mapping, regex_expression: str, mode: str) -> Dict:
def regex_evaluator(values: List[Dict[Any, Dict]], regex_expression: str, mode: str) -> Dict:
"""Regex Match evaluator engine."""
# values: [{'7.7.7.7': {'peerGroup': 'EVPN-OVERLAY-SPINE'}}]
# parameter: {'regex': '.*UNDERLAY.*', 'mode': 'include'}
# parameter: {'regex': '.*UNDERLAY.*', 'mode': 'match'}
result = {}
if not isinstance(values, list):
raise TypeError("Something went wrong during JMSPath parsing. 'values' must be of type List.")
Expand All @@ -94,10 +94,10 @@ def regex_evaluator(values: Mapping, regex_expression: str, mode: str) -> Dict:
for founded_value in item.values():
for value in founded_value.values():
match_result = re.search(regex_expression, value)
# Fail if there is not regex match
# Fail if there is no regex match for "match" mode
if mode == "match" and not match_result:
result.update(item)
# Fail if there is regex match
# Fail if there is regex match for "no-match" mode.
elif mode == "no-match" and match_result:
result.update(item)

Expand Down
42 changes: 23 additions & 19 deletions jdiff/extract_data.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""Extract data from JSON. Based on custom JMSPath implementation."""
import re
import warnings
from typing import Mapping, List, Dict, Any, Union
from typing import Mapping, List, Dict, Any, Union, Optional
import jmespath
from .utils.data_normalization import exclude_filter, flatten_list
from .utils.jmespath_parsers import (
jmespath_value_parser,
jmespath_refkey_parser,
associate_key_of_my_value,
keys_values_zipper,
multi_reference_keys,
)


def extract_data_from_json(data: Union[Mapping, List], path: str = "*", exclude: List = None) -> Any:
def extract_data_from_json(data: Union[Mapping, List], path: str = "*", exclude: Optional[List] = None) -> Any:
"""Return wanted data from outpdevice data based on the check path. See unit test for complete example.

Get the wanted values to be evaluated if JMESPath expression is defined,
Expand Down Expand Up @@ -43,33 +44,36 @@ def extract_data_from_json(data: Union[Mapping, List], path: str = "*", exclude:
# return if path is not specified
return data

# Multi ref_key
if len(re.findall(r"\$.*?\$", path)) > 1:
clean_path = path.replace("$", "")
values = jmespath.search(f"{clean_path}{' | []' * (path.count('*') - 1)}", data)
return keys_values_zipper(multi_reference_keys(path, data), associate_key_of_my_value(clean_path, values))

values = jmespath.search(jmespath_value_parser(path), data)

if values is None:
raise TypeError("JMSPath returned 'None'. Please, verify your JMSPath regex.")

# check for multi-nested lists if not found return here
if not any(isinstance(i, list) for i in values):
return values

# process elements to check if lists should be flattened
for element in values:
for item in element:
# raise if there is a dict, path must be more specific to extract data
if isinstance(item, dict):
raise TypeError(
f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have "{values}".'
)
if isinstance(item, list):
values = flatten_list(values) # flatten list and rewrite values
break # items are the same, need to check only first to see if this is a nested list

paired_key_value = associate_key_of_my_value(jmespath_value_parser(path), values)
# check for multi-nested lists
if any(isinstance(i, list) for i in values):
# process elements to check if lists should be flattened
for element in values:
for item in element:
# raise if there is a dict, path must be more specific to extract data
if isinstance(item, dict):
raise TypeError(
f'Must be list of lists i.e. [["Idle", 75759616], ["Idle", 75759620]]. You have "{values}".'
)
if isinstance(item, list):
values = flatten_list(values) # flatten list and rewrite values
break # items are the same, need to check only first to see if this is a nested list

# We need to get a list of reference keys - list of strings.
# Based on the expression or data we might have different data types
# therefore we need to normalize.
if re.search(r"\$.*\$", path):
paired_key_value = associate_key_of_my_value(jmespath_value_parser(path), values)
wanted_reference_keys = jmespath.search(jmespath_refkey_parser(path), data)

if isinstance(wanted_reference_keys, dict): # when wanted_reference_keys is dict() type
Expand Down
10 changes: 10 additions & 0 deletions jdiff/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def call_evaluation_logic():

ops = {
">": operator.gt,
">=": operator.ge,
"<": operator.lt,
"<=": operator.le,
"is_in": operator.contains,
"not_in": operator.contains,
"contains": operator.contains,
Expand Down Expand Up @@ -99,10 +101,18 @@ def is_gt(self) -> Tuple[List, bool]:
"""Is greather than operator caller."""
return self._loop_through_wrapper(">")

def is_ge(self) -> Tuple[List, bool]:
"""Is greather or equal than operator caller."""
return self._loop_through_wrapper(">=")

def is_lt(self) -> Tuple[List, bool]:
"""Is lower than operator caller."""
return self._loop_through_wrapper("<")

def is_le(self) -> Tuple[List, bool]:
"""Is lower or equal than operator caller."""
return self._loop_through_wrapper("<=")

def is_in(self) -> Tuple[List, bool]:
"""Is in operator caller."""
return self._loop_through_wrapper("is_in")
Expand Down
65 changes: 55 additions & 10 deletions jdiff/utils/jmespath_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import re
from typing import Mapping, List, Union

import jmespath


def jmespath_value_parser(path: str):
"""
Expand All @@ -31,15 +33,15 @@ def jmespath_value_parser(path: str):
path_suffix = path.split(".")[-1]

if regex_match_ref_key:
reference_key = regex_match_ref_key.group()
if regex_ref_key.search(path_suffix):
# [$peerAddress$,prefixesReceived] --> [prefixesReceived]
reference_key = regex_match_ref_key.group()
return path.replace(reference_key, "")

# result[0].$vrfs$.default... --> result[0].vrfs.default....
regex_normalized_value = re.search(r"\$.*\$", regex_match_ref_key.group())
regex_normalized_value = re.search(r"\$.*\$", reference_key)
if regex_normalized_value:
normalized_value = regex_match_ref_key.group().split("$")[1]
normalized_value = reference_key.split("$")[1]
return path.replace(regex_normalized_value.group(), normalized_value)
return path

Expand All @@ -65,9 +67,9 @@ def jmespath_refkey_parser(path: str):
splitted_jmespath[number] = regex_match_anchor.group().replace("$", "")

if regex_match_anchor and not element.startswith("[") and not element.endswith("]"):
splitted_jmespath = splitted_jmespath[: number + 1]
splitted_jmespath = splitted_jmespath[:number]

return ".".join(splitted_jmespath)
return ".".join(splitted_jmespath) or "@"


def associate_key_of_my_value(paths: str, wanted_value: List) -> List:
Expand All @@ -85,13 +87,19 @@ def associate_key_of_my_value(paths: str, wanted_value: List) -> List:

final_list = []

for items in wanted_value:
if len(items) != len(my_key_value_list):
raise ValueError("Key's value len != from value len")
if not all(isinstance(item, list) for item in wanted_value) and len(my_key_value_list) == 1:
for item in wanted_value:
temp_dict = {my_key_value_list[0]: item}
final_list.append(temp_dict)

else:
for items in wanted_value:
if len(items) != len(my_key_value_list):
raise ValueError("Key's value len != from value len")

temp_dict = {my_key_value_list[my_index]: my_value for my_index, my_value in enumerate(items)}
temp_dict = {my_key_value_list[my_index]: my_value for my_index, my_value in enumerate(items)}

final_list.append(temp_dict)
final_list.append(temp_dict)

return final_list

Expand Down Expand Up @@ -120,3 +128,40 @@ def keys_values_zipper(list_of_reference_keys: List, wanted_value_with_key: List
final_result.append({my_key: wanted_value_with_key[my_index]})

return final_result


def multi_reference_keys(jmspath, data):
"""Build a list of concatenated reference keys.

Args:
jmspath: "$*$.peers.$*$.*.ipv4.[accepted_prefixes]"
data: tests/mock/napalm_get_bgp_neighbors/multi_vrf.json

Returns:
["global.10.1.0.0", "global.10.2.0.0", "global.10.64.207.255", "global.7.7.7.7", "vpn.10.1.0.0", "vpn.10.2.0.0"]
"""
ref_key_regex = re.compile(r"\$.*?\$")
mapping = []
split_path = jmspath.split(".")

ref_key_index = -1 # -1 as the starting value, so it will match split path list indexes
for index, element in enumerate(split_path):
if ref_key_regex.search(element):
ref_key_index += 1
key_path = (
".".join(split_path[:index]).replace("$", "") or "@"
) # @ is for top keys, as they are stripped with "*"
flat_path = f"{key_path}{' | []' * key_path.count('*')}" # | [] to flatten the data, nesting level is eq to "*" count
sub_data = jmespath.search(flat_path, data) # extract sub-data with up to the ref key
if isinstance(sub_data, dict):
keys = list(sub_data.keys())
elif isinstance(sub_data, list):
keys = []
for parent, children in zip(
mapping[ref_key_index - 1], sub_data
): # refer to previous keys as they are already present in mapping
keys.extend(f"{parent}.{child}" for child in children.keys()) # concatenate keys
else:
raise ValueError("Ref key anchor must return either a dict or a list.")
mapping.append(keys)
return mapping[-1] # return last element as it has all previous ref_keys concatenated.
Loading