Skip to content

Commit 432a7d4

Browse files
authored
Merge pull request #1526 from ambuj-1211/add-cwe-support-in-multiple-importers
Add CWE support in multiple importers
2 parents fabe035 + 9847b2e commit 432a7d4

9 files changed

+231
-5
lines changed

vulnerabilities/importers/apache_httpd.py

+100
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#
99

1010
import logging
11+
import re
1112
import urllib
1213

1314
import requests
@@ -23,6 +24,8 @@
2324
from vulnerabilities.importer import Reference
2425
from vulnerabilities.importer import VulnerabilitySeverity
2526
from vulnerabilities.severity_systems import APACHE_HTTPD
27+
from vulnerabilities.utils import create_weaknesses_list
28+
from vulnerabilities.utils import cwe_regex
2629
from vulnerabilities.utils import get_item
2730

2831
logger = logging.getLogger(__name__)
@@ -102,11 +105,14 @@ def to_advisory(self, data):
102105
)
103106
)
104107

108+
weaknesses = get_weaknesses(data)
109+
105110
return AdvisoryData(
106111
aliases=[alias],
107112
summary=description or "",
108113
affected_packages=affected_packages,
109114
references=[reference],
115+
weaknesses=weaknesses,
110116
url=reference.url,
111117
)
112118

@@ -152,3 +158,97 @@ def fetch_links(url):
152158
continue
153159
links.append(urllib.parse.urljoin(url, link))
154160
return links
161+
162+
163+
def get_weaknesses(cve_data):
164+
"""
165+
Extract CWE IDs from CVE data.
166+
167+
Args:
168+
cve_data (dict): The CVE data in a dictionary format.
169+
170+
Returns:
171+
List[int]: A list of unique CWE IDs.
172+
173+
Examples:
174+
>>> mock_cve_data1 = {
175+
... "containers": {
176+
... "cna": {
177+
... "providerMetadata": {
178+
... "orgId": "f0158376-9dc2-43b6-827c-5f631a4d8d09"
179+
... },
180+
... "title": "mod_macro buffer over-read",
181+
... "problemTypes": [
182+
... {
183+
... "descriptions": [
184+
... {
185+
... "description": "CWE-125 Out-of-bounds Read",
186+
... "lang": "en",
187+
... "cweId": "CWE-125",
188+
... "type": "CWE"
189+
... }
190+
... ]
191+
... }
192+
... ]
193+
... }
194+
... }
195+
... }
196+
>>> mock_cve_data2 = {
197+
... "data_type": "CVE",
198+
... "data_format": "MITRE",
199+
... "data_version": "4.0",
200+
... "generator": {
201+
... "engine": "Vulnogram 0.0.9"
202+
... },
203+
... "CVE_data_meta": {
204+
... "ID": "CVE-2022-28614",
205+
... "ASSIGNER": "[email protected]",
206+
... "TITLE": "read beyond bounds via ap_rwrite() ",
207+
... "STATE": "PUBLIC"
208+
... },
209+
... "problemtype": {
210+
... "problemtype_data": [
211+
... {
212+
... "description": [
213+
... {
214+
... "lang": "eng",
215+
... "value": "CWE-190 Integer Overflow or Wraparound"
216+
... }
217+
... ]
218+
... },
219+
... {
220+
... "description": [
221+
... {
222+
... "lang": "eng",
223+
... "value": "CWE-200 Exposure of Sensitive Information to an Unauthorized Actor"
224+
... }
225+
... ]
226+
... }
227+
... ]
228+
... }
229+
... }
230+
231+
>>> get_weaknesses(mock_cve_data1)
232+
[125]
233+
234+
>>> get_weaknesses(mock_cve_data2)
235+
[190, 200]
236+
"""
237+
alias = get_item(cve_data, "CVE_data_meta", "ID")
238+
cwe_strings = []
239+
if alias:
240+
problemtype_data = get_item(cve_data, "problemtype", "problemtype_data") or []
241+
for problem in problemtype_data:
242+
for desc in problem.get("description", []):
243+
value = desc.get("value", "")
244+
cwe_id_string_list = re.findall(cwe_regex, value)
245+
cwe_strings.extend(cwe_id_string_list)
246+
else:
247+
problemTypes = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", [])
248+
descriptions = problemTypes[0].get("descriptions", []) if len(problemTypes) > 0 else []
249+
for description in descriptions:
250+
cwe_id_string = description.get("cweId", "")
251+
cwe_strings.append(cwe_id_string)
252+
253+
weaknesses = create_weaknesses_list(cwe_strings)
254+
return weaknesses

vulnerabilities/importers/debian.py

+23
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
#
99

1010
import logging
11+
import re
1112
from typing import Any
1213
from typing import Iterable
1314
from typing import List
1415
from typing import Mapping
1516

1617
import requests
18+
from cwe2.database import Database
1719
from packageurl import PackageURL
1820
from univers.version_range import DebianVersionRange
1921
from univers.versions import DebianVersion
@@ -22,6 +24,7 @@
2224
from vulnerabilities.importer import AffectedPackage
2325
from vulnerabilities.importer import Importer
2426
from vulnerabilities.importer import Reference
27+
from vulnerabilities.utils import create_weaknesses_list
2528
from vulnerabilities.utils import dedupe
2629
from vulnerabilities.utils import get_item
2730

@@ -93,6 +96,7 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
9396
yield from self.parse(pkg_name, records)
9497

9598
def parse(self, pkg_name: str, records: Mapping[str, Any]) -> Iterable[AdvisoryData]:
99+
96100
for cve_id, record in records.items():
97101
affected_versions = []
98102
fixed_versions = []
@@ -150,10 +154,29 @@ def parse(self, pkg_name: str, records: Mapping[str, Any]) -> Iterable[AdvisoryD
150154
fixed_version=DebianVersion(fixed_version),
151155
)
152156
)
157+
weaknesses = get_cwe_from_debian_advisory(record)
158+
153159
yield AdvisoryData(
154160
aliases=[cve_id],
155161
summary=record.get("description", ""),
156162
affected_packages=affected_packages,
157163
references=references,
164+
weaknesses=weaknesses,
158165
url=self.api_url,
159166
)
167+
168+
169+
def get_cwe_from_debian_advisory(record):
170+
"""
171+
Extracts CWE ID strings from the given raw_data and returns a list of CWE IDs.
172+
173+
>>> get_cwe_from_debian_advisory({"description":"PEAR HTML_QuickForm version 3.2.14 contains an eval injection (CWE-95) vulnerability in HTML_QuickForm's getSubmitValue method, HTML_QuickForm's validate method, HTML_QuickForm_hierselect's _setOptions method, HTML_QuickForm_element's _findValue method, HTML_QuickForm_element's _prepareValue method. that can result in Possible information disclosure, possible impact on data integrity and execution of arbitrary code. This attack appear to be exploitable via A specially crafted query string could be utilised, e.g. http://www.example.com/admin/add_practice_type_id[1]=fubar%27])%20OR%20die(%27OOK!%27);%20//&mode=live. This vulnerability appears to have been fixed in 3.2.15."})
174+
[95]
175+
>>> get_cwe_from_debian_advisory({"description":"There is no WEAKNESS DATA"})
176+
[]
177+
"""
178+
description = record.get("description") or ""
179+
pattern = r"CWE-\d+"
180+
cwe_strings = re.findall(pattern, description)
181+
weaknesses = create_weaknesses_list(cwe_strings)
182+
return weaknesses

vulnerabilities/importers/fireeye.py

+24
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from vulnerabilities.importer import Importer
1717
from vulnerabilities.importer import Reference
1818
from vulnerabilities.utils import build_description
19+
from vulnerabilities.utils import create_weaknesses_list
20+
from vulnerabilities.utils import cwe_regex
1921
from vulnerabilities.utils import dedupe
2022

2123
logger = logging.getLogger(__name__)
@@ -77,10 +79,13 @@ def parse_advisory_data(raw_data, file, base_path) -> AdvisoryData:
7779
disc_credits = md_dict.get("## Discovery Credits") # not used
7880
disc_timeline = md_dict.get("## Disclosure Timeline") # not used
7981
references = md_dict.get("## References") or []
82+
cwe_data = md_dict.get("## Common Weakness Enumeration") or []
83+
8084
return AdvisoryData(
8185
aliases=get_aliases(database_id, cve_ref),
8286
summary=build_description(" ".join(summary), " ".join(description)),
8387
references=get_references(references),
88+
weaknesses=get_weaknesses(cwe_data),
8489
url=advisory_url,
8590
)
8691

@@ -140,3 +145,22 @@ def md_list_to_dict(md_list):
140145
else:
141146
md_dict[md_key].append(md_line)
142147
return md_dict
148+
149+
150+
def get_weaknesses(cwe_data):
151+
"""
152+
Return the list of CWE IDs as integers from a list of weakness summaries, e.g., [379].
153+
154+
>>> get_weaknesses([
155+
... "CWE-379: Creation of Temporary File in Directory with Insecure Permissions",
156+
... "CWE-362: Concurrent Execution using Shared Resource with Improper Synchronization ('Race Condition')"
157+
... ])
158+
[379, 362]
159+
"""
160+
cwe_list = []
161+
for line in cwe_data:
162+
cwe_ids = re.findall(cwe_regex, line)
163+
cwe_list.extend(cwe_ids)
164+
165+
weaknesses = create_weaknesses_list(cwe_list)
166+
return weaknesses

vulnerabilities/tests/test_data/apache_httpd/CVE-2021-44224-apache-httpd-expected.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@
3232
}
3333
],
3434
"date_published": null,
35-
"weaknesses": [],
35+
"weaknesses": [476],
3636
"url": "https://httpd.apache.org/security/json/CVE-2021-44224.json"
3737
}

vulnerabilities/tests/test_data/apache_httpd/CVE-2022-28614-apache-httpd-expected.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@
3232
}
3333
],
3434
"date_published": null,
35-
"weaknesses": [],
35+
"weaknesses": [190, 200],
3636
"url": "https://httpd.apache.org/security/json/CVE-2022-28614.json"
3737
}

vulnerabilities/tests/test_data/apache_httpd/apache-httpd-improver-expected.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
]
5555
}
5656
],
57-
"weaknesses": []
57+
"weaknesses": [476]
5858
},
5959
{
6060
"vulnerability_id": null,
@@ -103,6 +103,6 @@
103103
]
104104
}
105105
],
106-
"weaknesses": []
106+
"weaknesses": [476]
107107
}
108108
]

vulnerabilities/tests/test_debian.py

+37-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
77
# See https://aboutcode.org for more information about nexB OSS projects.
88
#
9-
109
import json
1110
import os
11+
import re
1212
from unittest.mock import patch
1313

1414
from vulnerabilities.importer import AdvisoryData
1515
from vulnerabilities.importers.debian import DebianImporter
16+
from vulnerabilities.importers.debian import get_cwe_from_debian_advisory
1617
from vulnerabilities.improvers.default import DefaultImprover
1718
from vulnerabilities.improvers.valid_versions import DebianBasicImprover
1819
from vulnerabilities.tests import util_tests
@@ -55,3 +56,38 @@ def test_debian_improver(mock_response):
5556
result.extend(inference)
5657
expected_file = os.path.join(TEST_DATA, f"debian-improver-expected.json")
5758
util_tests.check_results_against_json(result, expected_file)
59+
60+
61+
def test_get_cwe_from_debian_advisories():
62+
record = {
63+
"description": "Legion of the Bouncy Castle Legion of the Bouncy Castle Java Cryptography APIs 1.58 up to but not including 1.60 contains a CWE-580: Use of Externally-Controlled Input to Select Classes or Code ('Unsafe Reflection') vulnerability in XMSS/XMSS^MT private key deserialization that can result in Deserializing an XMSS/XMSS^MT private key can result in the execution of unexpected code. This attack appear to be exploitable via A handcrafted private key can include references to unexpected classes which will be picked up from the class path for the executing application. This vulnerability appears to have been fixed in 1.60 and later.",
64+
"scope": "local",
65+
"releases": {
66+
"bookworm": {
67+
"status": "resolved",
68+
"repositories": {"bookworm": "1.72-2"},
69+
"fixed_version": "1.60-1",
70+
"urgency": "low",
71+
},
72+
"bullseye": {
73+
"status": "resolved",
74+
"repositories": {"bullseye": "1.68-2"},
75+
"fixed_version": "1.60-1",
76+
"urgency": "low",
77+
},
78+
"sid": {
79+
"status": "resolved",
80+
"repositories": {"sid": "1.77-1"},
81+
"fixed_version": "1.60-1",
82+
"urgency": "low",
83+
},
84+
"trixie": {
85+
"status": "resolved",
86+
"repositories": {"trixie": "1.77-1"},
87+
"fixed_version": "1.60-1",
88+
"urgency": "low",
89+
},
90+
},
91+
}
92+
result = get_cwe_from_debian_advisory(record)
93+
assert result == [580]

vulnerabilities/tests/test_fireeye.py

+17
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from vulnerabilities.importer import Reference
1414
from vulnerabilities.importers.fireeye import get_aliases
1515
from vulnerabilities.importers.fireeye import get_references
16+
from vulnerabilities.importers.fireeye import get_weaknesses
1617
from vulnerabilities.importers.fireeye import md_list_to_dict
1718
from vulnerabilities.importers.fireeye import parse_advisory_data
1819
from vulnerabilities.tests import util_tests
@@ -217,3 +218,19 @@ def test_md_list_to_dict_2(self):
217218
md_list = f.readlines()
218219
md_dict = md_list_to_dict(md_list)
219220
assert md_dict == expected_output
221+
222+
def test_get_weaknesses(self):
223+
assert get_weaknesses(
224+
[
225+
"CWE-379: Creation of Temporary File in Directory with Insecure Permissions",
226+
"CWE-362: Concurrent Execution using Shared Resource with Improper Synchronization ('Race Condition')",
227+
]
228+
) == [379, 362]
229+
assert (
230+
get_weaknesses(
231+
[
232+
"CWE-2345: This cwe id does not exist so it should generate Invalid CWE id error and return empty list."
233+
]
234+
)
235+
== []
236+
)

0 commit comments

Comments
 (0)