Skip to content

Commit c7595a6

Browse files
authoredMar 4, 2025
Validate if subnet CIDRs are valid or overlap (#3985)
- Add rule E3059 to validate if subnet CIDRs are within the VPC networks - Add rule E3060 to validate if subnet CIDRs do not overlap with eachother
1 parent 85e1576 commit c7595a6

File tree

5 files changed

+864
-1
lines changed

5 files changed

+864
-1
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
"""
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: MIT-0
4+
"""
5+
6+
from __future__ import annotations
7+
8+
import logging
9+
from collections import deque
10+
from ipaddress import IPv4Network, IPv6Network, ip_network
11+
from typing import Any, Iterator
12+
13+
from cfnlint.context import Path
14+
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
15+
from cfnlint.rules.helpers import get_value_from_path
16+
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword
17+
18+
LOGGER = logging.getLogger(__name__)
19+
20+
21+
class VpcSubnetCidr(CfnLintKeyword):
22+
id = "E3059"
23+
shortdesc = "Validate subnet CIDRs are within the CIDRs of the VPC"
24+
description = (
25+
"When specifying subnet CIDRs for a VPC the subnet CIDRs "
26+
"most be within the VPC CIDRs"
27+
)
28+
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html"
29+
tags = ["resources", "ec2", "vpc", "subnet"]
30+
31+
def __init__(self) -> None:
32+
super().__init__(
33+
keywords=[
34+
"Resources/AWS::EC2::VPC/Properties",
35+
],
36+
)
37+
38+
def _validate_subnets(
39+
self,
40+
source: IPv4Network | IPv6Network,
41+
destination: IPv4Network | IPv6Network,
42+
) -> bool:
43+
if isinstance(source, IPv4Network) and isinstance(destination, IPv4Network):
44+
if source.subnet_of(destination):
45+
return True
46+
return False
47+
elif isinstance(source, IPv6Network) and isinstance(destination, IPv6Network):
48+
if source.subnet_of(destination):
49+
return True
50+
return False
51+
return False
52+
53+
def _create_network(self, cidr: Any) -> IPv4Network | IPv6Network | None:
54+
if not isinstance(cidr, str):
55+
return None
56+
57+
try:
58+
return ip_network(cidr)
59+
except Exception as e:
60+
LOGGER.debug(f"Unable to create network from {cidr}", e)
61+
62+
return None
63+
64+
def _get_vpc_cidrs(
65+
self, validator: Validator, instance: dict[str, Any]
66+
) -> Iterator[tuple[IPv4Network | IPv6Network | None, Validator]]:
67+
for key in [
68+
"Ipv4IpamPoolId",
69+
"Ipv6IpamPoolId",
70+
"Ipv6Pool",
71+
"AmazonProvidedIpv6CidrBlock",
72+
]:
73+
for value, value_validator in get_value_from_path(
74+
validator,
75+
instance,
76+
deque([key]),
77+
):
78+
if value is None:
79+
continue
80+
yield None, value_validator
81+
82+
for key in ["CidrBlock", "Ipv6CidrBlock"]:
83+
for cidr, cidr_validator in get_value_from_path(
84+
validator,
85+
instance,
86+
deque([key]),
87+
):
88+
89+
if cidr is None:
90+
continue
91+
yield self._create_network(cidr), cidr_validator
92+
93+
def validate(
94+
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any]
95+
) -> ValidationResult:
96+
97+
if not validator.cfn.graph:
98+
return
99+
100+
vpc_ipv4_networks: list[IPv4Network] = []
101+
vpc_ipv6_networks: list[IPv6Network] = []
102+
for vpc_network, _ in self._get_vpc_cidrs(validator, instance):
103+
if not vpc_network:
104+
return
105+
if isinstance(vpc_network, IPv4Network):
106+
vpc_ipv4_networks.append(vpc_network)
107+
# you can't specify IPV6 networks on a VPC
108+
109+
template_validator = validator.evolve(
110+
context=validator.context.evolve(path=Path())
111+
)
112+
113+
# dynamic vpc network (using IPAM or AWS provided)
114+
# allows to validate subnet overlapping even if using
115+
# dynamic networks
116+
has_dynamic_network = False
117+
118+
for source, _ in validator.cfn.graph.graph.in_edges(
119+
validator.context.path.path[1]
120+
):
121+
if (
122+
validator.cfn.graph.graph.nodes[source].get("resource_type")
123+
== "AWS::EC2::VPCCidrBlock"
124+
):
125+
for cidr_props, cidr_validator in get_value_from_path(
126+
template_validator,
127+
validator.cfn.template,
128+
deque(["Resources", source, "Properties"]),
129+
):
130+
for cidr_network, _ in self._get_vpc_cidrs(
131+
cidr_validator, cidr_props
132+
):
133+
if not cidr_network:
134+
has_dynamic_network = True
135+
continue
136+
if isinstance(cidr_network, IPv4Network):
137+
vpc_ipv4_networks.append(cidr_network)
138+
else:
139+
vpc_ipv6_networks.append(cidr_network)
140+
141+
subnets: list[tuple[IPv4Network | IPv6Network, deque]] = []
142+
for source, _ in validator.cfn.graph.graph.in_edges(
143+
validator.context.path.path[1]
144+
):
145+
if (
146+
validator.cfn.graph.graph.nodes[source].get("resource_type")
147+
== "AWS::EC2::Subnet"
148+
):
149+
for subnet_props, source_validator in get_value_from_path(
150+
template_validator,
151+
validator.cfn.template,
152+
deque(["Resources", source, "Properties"]),
153+
):
154+
for subnet_network, subnet_validator in self._get_vpc_cidrs(
155+
source_validator, subnet_props
156+
):
157+
if not subnet_network:
158+
continue
159+
160+
subnets.append(
161+
(subnet_network, subnet_validator.context.path.path)
162+
)
163+
if has_dynamic_network:
164+
continue
165+
if not any(
166+
self._validate_subnets(
167+
subnet_network,
168+
vpc_network,
169+
)
170+
for vpc_network in vpc_ipv4_networks + vpc_ipv6_networks
171+
):
172+
if isinstance(subnet_network, IPv4Network):
173+
# Every VPC has to have a ipv4 network
174+
# we continue if there isn't one
175+
if not vpc_ipv4_networks:
176+
continue
177+
reprs = (
178+
"is not a valid subnet of "
179+
f"{[f'{str(v)}' for v in vpc_ipv4_networks]!r}"
180+
)
181+
else:
182+
if not vpc_ipv6_networks:
183+
reprs = (
184+
"is specified on a VPC that has "
185+
"no ipv6 networks defined"
186+
)
187+
else:
188+
reprs = (
189+
"is not a valid subnet of "
190+
f"{[f'{str(v)}' for v in vpc_ipv6_networks]!r}"
191+
)
192+
yield ValidationError(
193+
(f"{str(subnet_network)!r} {reprs}"),
194+
rule=self,
195+
path_override=subnet_validator.context.path.path,
196+
)
197+
continue
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
"""
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
SPDX-License-Identifier: MIT-0
4+
"""
5+
6+
from __future__ import annotations
7+
8+
import logging
9+
from collections import deque
10+
from ipaddress import IPv4Network, IPv6Network, ip_network
11+
from typing import Any
12+
13+
from cfnlint.context.conditions import Unsatisfiable
14+
from cfnlint.helpers import ensure_list, is_function
15+
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
16+
from cfnlint.rules.helpers import get_value_from_path
17+
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword
18+
19+
LOGGER = logging.getLogger(__name__)
20+
21+
22+
class VpcSubnetOverlap(CfnLintKeyword):
23+
id = "E3060"
24+
shortdesc = "Validate subnet CIDRs do not overlap with other subnets"
25+
description = (
26+
"When specifying subnet CIDRs for a VPC the subnet CIDRs "
27+
"most not overlap with eachother"
28+
)
29+
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html"
30+
tags = ["resources", "ec2", "vpc", "subnet"]
31+
32+
def __init__(self) -> None:
33+
super().__init__(
34+
keywords=[
35+
"Resources/AWS::EC2::Subnet/Properties",
36+
],
37+
)
38+
self._subnets: dict[str, list[tuple[IPv4Network | IPv6Network, dict]]] = {}
39+
40+
def initialize(self, cfn):
41+
self._subnets = {}
42+
return super().initialize(cfn)
43+
44+
def _validate_subnets(
45+
self,
46+
source: IPv4Network | IPv6Network,
47+
destination: IPv4Network | IPv6Network,
48+
) -> bool:
49+
if isinstance(source, IPv4Network) and isinstance(destination, IPv4Network):
50+
if source.overlaps(destination):
51+
return True
52+
return False
53+
elif isinstance(source, IPv6Network) and isinstance(destination, IPv6Network):
54+
if source.overlaps(destination):
55+
return True
56+
return False
57+
return False
58+
59+
def _create_network(self, cidr: Any) -> IPv4Network | IPv6Network | None:
60+
if not isinstance(cidr, str):
61+
return None
62+
63+
try:
64+
return ip_network(cidr)
65+
except Exception as e:
66+
LOGGER.debug(f"Unable to create network from {cidr}", e)
67+
68+
return None
69+
70+
def validate(
71+
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any]
72+
) -> ValidationResult:
73+
74+
for vpc_id, vpc_validator in get_value_from_path(
75+
validator=validator, instance=instance, path=deque(["VpcId"])
76+
):
77+
78+
if not isinstance(vpc_id, (str, dict)):
79+
return
80+
81+
fn_k, fn_v = is_function(vpc_id)
82+
if fn_k == "Fn::GetAtt":
83+
vpc_id = ensure_list(fn_v)[0].split(".")[0]
84+
elif fn_k == "Ref":
85+
vpc_id = fn_v
86+
elif fn_k:
87+
# its a function that we can't resolve
88+
return
89+
90+
if not validator.is_type(vpc_id, "string"):
91+
return
92+
if vpc_id not in self._subnets:
93+
self._subnets[vpc_id] = []
94+
95+
for key in ["CidrBlock", "Ipv6CidrBlock"]:
96+
for cidr_block, cidr_block_validator in get_value_from_path(
97+
validator=vpc_validator, instance=instance, path=deque([key])
98+
):
99+
100+
cidr_network = self._create_network(cidr_block)
101+
if not cidr_network:
102+
continue
103+
104+
for saved_subnet, conditions in self._subnets[vpc_id]:
105+
# attempt to validate if the saved conditions comply
106+
# with these conditions
107+
try:
108+
cidr_block_validator.evolve(
109+
context=cidr_block_validator.context.evolve(
110+
conditions=cidr_block_validator.context.conditions.evolve(
111+
conditions
112+
)
113+
)
114+
)
115+
except Unsatisfiable:
116+
continue
117+
118+
# now we can evaluate if they overlap
119+
if self._validate_subnets(
120+
cidr_network,
121+
saved_subnet,
122+
):
123+
yield ValidationError(
124+
(
125+
f"{str(cidr_network)!r} overlaps "
126+
f"with {str(saved_subnet)!r}"
127+
),
128+
rule=self,
129+
path=deque(
130+
list(cidr_block_validator.context.path.path)[1:]
131+
),
132+
)
133+
134+
self._subnets[vpc_id].append(
135+
(cidr_network, cidr_block_validator.context.conditions.status)
136+
)

‎test/fixtures/templates/integration/aws-ec2-subnet.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ Resources:
2727
Type: AWS::EC2::Subnet
2828
Properties:
2929
VpcId: !Ref Vpc
30-
CidrBlock: 10.0.1.0/24
30+
CidrBlock: 10.0.2.0/24
3131
Ipv4IpamPoolId: test
3232
Ipv4NetmaskLength: 10

0 commit comments

Comments
 (0)