Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python 3.10 and Python 3.9 compatibility; disutils deprecation; asyncio method deprecation #517

Merged
merged 15 commits into from
Jan 28, 2022
Merged
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
dist: bionic
dist: bionic
language: python
python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "3.10"
before_install:
- sudo apt-get update -qq
- sudo apt-get install -y tshark
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
py
pytest
mock
lxml
lxml
packaging
84 changes: 56 additions & 28 deletions src/pyshark/capture/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import concurrent.futures
import sys
import logging
from distutils.version import LooseVersion
from packaging import version

from pyshark.tshark.tshark import get_process_path, get_tshark_display_filter_flag, \
tshark_supports_json, TSharkVersionException, get_tshark_version, tshark_supports_duplicate_keys
Expand All @@ -18,6 +18,7 @@
else:
asyncTimeoutError = asyncio.exceptions.TimeoutError


class TSharkCrashException(Exception):
pass

Expand Down Expand Up @@ -64,14 +65,16 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
self._decode_as = decode_as
self._disable_protocol = disable_protocol
self._json_has_duplicate_keys = True
self._log = logging.Logger(self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL)
self._log = logging.Logger(
self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL)
self._closed = False
self._custom_parameters = custom_parameters
self._eof_reached = False
self.__tshark_version = None

if include_raw and not use_json:
raise RawMustUseJsonException("use_json must be True if include_raw")
raise RawMustUseJsonException(
"use_json must be True if include_raw")

if self.debug:
self.set_debug()
Expand Down Expand Up @@ -134,7 +137,8 @@ def keep_packet(pkt):
raise StopCapture()

try:
self.apply_on_packets(keep_packet, timeout=timeout, packet_count=packet_count)
self.apply_on_packets(
keep_packet, timeout=timeout, packet_count=packet_count)
self.loaded = True
except asyncTimeoutError:
pass
Expand All @@ -143,7 +147,8 @@ def set_debug(self, set_to=True, log_level=logging.DEBUG):
"""Sets the capture to debug mode (or turns it off if specified)."""
if set_to:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
self._log.addHandler(handler)
self._log.level = log_level
self.debug = set_to
Expand All @@ -154,16 +159,25 @@ def _setup_eventloop(self):
self.eventloop = asyncio.ProactorEventLoop()
else:
try:
self.eventloop = asyncio.get_event_loop()
self.eventloop = asyncio.get_event_loop_policy().get_event_loop()
except RuntimeError:
if threading.current_thread() != threading.main_thread():
# Ran not in main thread, make a new eventloop
self.eventloop = asyncio.new_event_loop()
asyncio.set_event_loop(self.eventloop)
else:
raise
if os.name == "posix" and isinstance(threading.current_thread(), threading._MainThread):
asyncio.get_child_watcher().attach_loop(self.eventloop)
if os.name == "posix" and isinstance(threading.current_thread(), threading._MainThread):
# The default child watchers (ThreadedChildWatcher) attach_loop method is empty!
# While using pyshark with ThreadedChildWatcher, asyncio could raise a ChildProcessError
# "Unknown child process pid %d, will report returncode 255"
# This led to a TSharkCrashException in _cleanup_subprocess.
# Using the SafeChildWatcher fixes this issue, but it is slower.
# SafeChildWatcher O(n) -> large numbers of processes are slow
# ThreadedChildWatcher O(1) -> independent of process number
# asyncio.get_child_watcher().attach_loop(self.eventloop)
asyncio.set_child_watcher(asyncio.SafeChildWatcher())
asyncio.get_child_watcher().attach_loop(self.eventloop)

def _get_json_separators(self):
""""Returns the separators between packets in a JSON output
Expand All @@ -172,9 +186,9 @@ def _get_json_separators(self):
The latter variable being the number of characters to ignore in order to pass the packet (i.e. extra newlines,
commas, parenthesis).
"""
if self._get_tshark_version() >= LooseVersion("3.0.0"):
if self._get_tshark_version() >= version.parse("3.0.0"):
return ("%s },%s" % (os.linesep, os.linesep)).encode(), ("}%s]" % os.linesep).encode(), (
1 + len(os.linesep))
1 + len(os.linesep))
else:
return ("}%s%s ," % (os.linesep, os.linesep)).encode(), ("}%s%s]" % (os.linesep, os.linesep)).encode(), 1

Expand Down Expand Up @@ -229,8 +243,10 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None):
:param packet_count: If given, stops after this amount of packets is captured.
"""
# NOTE: This has code duplication with the async version, think about how to solve this
tshark_process = existing_process or self.eventloop.run_until_complete(self._get_tshark_process())
psml_structure, data = self.eventloop.run_until_complete(self._get_psml_struct(tshark_process.stdout))
tshark_process = existing_process or self.eventloop.run_until_complete(
self._get_tshark_process())
psml_structure, data = self.eventloop.run_until_complete(
self._get_psml_struct(tshark_process.stdout))
packets_captured = 0

data = b""
Expand All @@ -253,11 +269,12 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None):
break
finally:
if tshark_process in self._running_processes:
self.eventloop.run_until_complete(self._cleanup_subprocess(tshark_process))
self.eventloop.run_until_complete(
self._cleanup_subprocess(tshark_process))

def apply_on_packets(self, callback, timeout=None, packet_count=None):
"""Runs through all packets and calls the given callback (a function) with each one as it is read.

If the capture is infinite (i.e. a live capture), it will run forever, otherwise it will complete after all
packets have been read.

Expand Down Expand Up @@ -330,7 +347,8 @@ async def _get_psml_struct(self, fd):
while not psml_struct:
new_data = await fd.read(self.SUMMARIES_BATCH_SIZE)
data += new_data
psml_struct, data = self._extract_tag_from_data(data, b"structure")
psml_struct, data = self._extract_tag_from_data(
data, b"structure")
if psml_struct:
psml_struct = psml_structure_from_xml(psml_struct)
elif not new_data:
Expand All @@ -355,9 +373,11 @@ async def _get_packet_from_stream(self, stream, existing_data, got_first_packet=

if packet:
if self.use_json:
packet = packet_from_json_packet(packet, deduplicate_fields=self._json_has_duplicate_keys)
packet = packet_from_json_packet(
packet, deduplicate_fields=self._json_has_duplicate_keys)
else:
packet = packet_from_xml_packet(packet, psml_structure=psml_structure)
packet = packet_from_xml_packet(
packet, psml_structure=psml_structure)
return packet, existing_data

new_data = await stream.read(self.DEFAULT_BATCH_SIZE)
Expand Down Expand Up @@ -387,7 +407,8 @@ async def _get_tshark_process(self, packet_count=None, stdin=None):
if self.use_json:
output_type = "json"
if not tshark_supports_json(self._get_tshark_version()):
raise TSharkVersionException("JSON only supported on Wireshark >= 2.2.0")
raise TSharkVersionException(
"JSON only supported on Wireshark >= 2.2.0")
if tshark_supports_duplicate_keys(self._get_tshark_version()):
output_parameters.append("--no-duplicate-keys")
self._json_has_duplicate_keys = False
Expand All @@ -396,7 +417,8 @@ async def _get_tshark_process(self, packet_count=None, stdin=None):
parameters = [self._get_tshark_path(), "-l", "-n", "-T", output_type] + \
self.get_parameters(packet_count=packet_count) + output_parameters

self._log.debug("Creating TShark subprocess with parameters: " + " ".join(parameters))
self._log.debug(
"Creating TShark subprocess with parameters: " + " ".join(parameters))
self._log.debug("Executable: %s" % parameters[0])
tshark_process = await asyncio.create_subprocess_exec(*parameters,
stdout=subprocess.PIPE,
Expand All @@ -406,7 +428,8 @@ async def _get_tshark_process(self, packet_count=None, stdin=None):
return tshark_process

def _created_new_process(self, parameters, process, process_name="TShark"):
self._log.debug(process_name + " subprocess created")
self._log.debug(
process_name + f" subprocess (pid {process.pid}) created")
if process.returncode is not None and process.returncode != 0:
raise TSharkCrashException(
"%s seems to have crashed. Try updating it. (command ran: '%s')" % (
Expand All @@ -415,25 +438,26 @@ def _created_new_process(self, parameters, process, process_name="TShark"):

async def _cleanup_subprocess(self, process):
"""Kill the given process and properly closes any pipes connected to it."""
self._log.debug(f"Cleanup Subprocess (pid {process.pid})")
if process.returncode is None:
try:
process.kill()
return await asyncio.wait_for(process.wait(), 1)
except asyncTimeoutError:
self._log.debug("Waiting for process to close failed, may have zombie process.")
self._log.debug(
"Waiting for process to close failed, may have zombie process.")
except ProcessLookupError:
pass
except OSError:
if os.name != "nt":
raise
elif process.returncode > 0:
if process.returncode != 1 or self._eof_reached:
raise TSharkCrashException("TShark seems to have crashed (retcode: %d). "
"Try rerunning in debug mode [ capture_obj.set_debug() ] or try updating tshark."
% process.returncode)
raise TSharkCrashException(f"TShark (pid {process.pid}) seems to have crashed (retcode: {process.returncode}). "
"Try rerunning in debug mode [ capture_obj.set_debug() ] or try updating tshark.")

def close(self):
self.eventloop.run_until_complete(self.close_async())
self.eventloop.create_task(self.close_async())

async def close_async(self):
for process in self._running_processes.copy():
Expand All @@ -447,7 +471,9 @@ def __del__(self):
def __enter__(self): return self
async def __aenter__(self): return self
def __exit__(self, exc_type, exc_val, exc_tb): self.close()
async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close_async()

async def __aexit__(self, exc_type, exc_val,
exc_tb): await self.close_async()

def get_parameters(self, packet_count=None):
"""Returns the special tshark parameters to be used according to the configuration of this class."""
Expand Down Expand Up @@ -479,14 +505,16 @@ def get_parameters(self, packet_count=None):
for preference_name, preference_value in self._override_prefs.items():
if all(self.encryption) and preference_name in ("wlan.enable_decryption", "uat:80211_keys"):
continue # skip if override preferences also given via --encryption options
params += ["-o", "{0}:{1}".format(preference_name, preference_value)]
params += ["-o",
"{0}:{1}".format(preference_name, preference_value)]

if self._output_file:
params += ["-w", self._output_file]

if self._decode_as:
for criterion, decode_as_proto in self._decode_as.items():
params += ["-d", ",".join([criterion.strip(), decode_as_proto.strip()])]
params += ["-d",
",".join([criterion.strip(), decode_as_proto.strip()])]

if self._disable_protocol:
params += ["--disable-protocol", self._disable_protocol.strip()]
Expand Down
20 changes: 13 additions & 7 deletions src/pyshark/capture/inmem_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import struct
import time
import warnings
from distutils.version import LooseVersion
from packaging import version

from pyshark.capture.capture import Capture, StopCapture

Expand Down Expand Up @@ -59,7 +59,8 @@ def __init__(self, bpf_filter=None, display_filter=None, only_summaries=False,

def get_parameters(self, packet_count=None):
"""Returns the special tshark parameters to be used according to the configuration of this class."""
params = super(InMemCapture, self).get_parameters(packet_count=packet_count)
params = super(InMemCapture, self).get_parameters(
packet_count=packet_count)
params += ['-i', '-']
return params

Expand All @@ -70,7 +71,8 @@ async def _get_tshark_process(self, packet_count=None):
self._current_tshark = proc

# Create PCAP header
header = struct.pack("IHHIIII", 0xa1b2c3d4, 2, 4, 0, 0, 0x7fff, self._current_linktype)
header = struct.pack("IHHIIII", 0xa1b2c3d4, 2, 4,
0, 0, 0x7fff, self._current_linktype)
proc.stdin.write(header)

return proc
Expand All @@ -82,7 +84,7 @@ def _get_json_separators(self):
The latter variable being the number of characters to ignore in order to pass the packet (i.e. extra newlines,
commas, parenthesis).
"""
if self._get_tshark_version() >= LooseVersion("2.6.7"):
if self._get_tshark_version() >= version.parse("2.6.7"):
return ("%s }" % os.linesep).encode(), ("}%s]" % os.linesep).encode(), 0
else:
return ("}%s%s" % (os.linesep, os.linesep)).encode(), ("}%s%s]" % (os.linesep, os.linesep)).encode(), 1
Expand All @@ -97,7 +99,8 @@ def _write_packet(self, packet, sniff_time):
secs = int(now)
usecs = int((now * 1000000) % 1000000)
# Write packet header
self._current_tshark.stdin.write(struct.pack("IIII", secs, usecs, len(packet), len(packet)))
self._current_tshark.stdin.write(struct.pack(
"IIII", secs, usecs, len(packet), len(packet)))
self._current_tshark.stdin.write(packet)

def parse_packet(self, binary_packet, sniff_time=None, timeout=DEFAULT_TIMEOUT):
Expand All @@ -117,7 +120,9 @@ def parse_packets(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOU
DOES NOT CLOSE tshark. It must be closed manually by calling close() when you're done
working with it.
"""
return asyncio.get_event_loop().run_until_complete(self.parse_packets_async(binary_packets, sniff_times, timeout))
if self.eventloop is None:
self._setup_eventloop()
return self.eventloop.run_until_complete(self.parse_packets_async(binary_packets, sniff_times, timeout))

async def parse_packets_async(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOUT):
"""A coroutine which parses binary packets and return a list of parsed packets.
Expand Down Expand Up @@ -170,7 +175,8 @@ def feed_packet(self, binary_packet, linktype=LinkTypes.ETHERNET, timeout=DEFAUL
By default, assumes the packet is an ethernet packet. For another link type, supply the linktype argument (most
can be found in the class LinkTypes)
"""
warnings.warn("Deprecated method. Use InMemCapture.parse_packet() instead.")
warnings.warn(
"Deprecated method. Use InMemCapture.parse_packet() instead.")
self._current_linktype = linktype
pkt = self.parse_packet(binary_packet, timeout=timeout)
self.close()
Expand Down
4 changes: 2 additions & 2 deletions src/pyshark/capture/live_capture.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import asyncio
import sys
from distutils.version import LooseVersion
from packaging import version

from pyshark.capture.capture import Capture
from pyshark.tshark.tshark import get_tshark_interfaces, get_process_path
Expand Down Expand Up @@ -68,7 +68,7 @@ def get_parameters(self, packet_count=None):
def _get_dumpcap_parameters(self):
# Don't report packet counts.
params = ["-q"]
if self._get_tshark_version() < LooseVersion("2.5.0"):
if self._get_tshark_version() < version.parse("2.5.0"):
# Tshark versions older than 2.5 don't support pcapng. This flag forces dumpcap to output pcap.
params += ["-P"]
if self.bpf_filter:
Expand Down
10 changes: 5 additions & 5 deletions src/pyshark/tshark/tshark.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Module used for the actual running of TShark"""
from distutils.version import LooseVersion
from packaging import version
import os
import subprocess
import sys
Expand Down Expand Up @@ -73,20 +73,20 @@ def get_tshark_version(tshark_path=None):
raise TSharkVersionException("Unable to parse TShark version from: {}".format(version_line))
version_string = m.groups()[0] # Use first match found

return LooseVersion(version_string)
return version.parse(version_string)


def tshark_supports_duplicate_keys(tshark_version):
return tshark_version >= LooseVersion("2.6.7")
return tshark_version >= version.parse("2.6.7")


def tshark_supports_json(tshark_version):
return tshark_version >= LooseVersion("2.2.0")
return tshark_version >= version.parse("2.2.0")


def get_tshark_display_filter_flag(tshark_version):
"""Returns '-Y' for tshark versions >= 1.10.0 and '-R' for older versions."""
if tshark_version >= LooseVersion("1.10.0"):
if tshark_version >= version.parse("1.10.0"):
return "-Y"
else:
return "-R"
Expand Down
4 changes: 3 additions & 1 deletion src/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
version="0.4.3",
packages=find_packages(),
package_data={'': ['*.ini', '*.pcapng']},
install_requires=['lxml', 'py'],
install_requires=['lxml', 'py', 'packaging'],
tests_require=['pytest'],
url="https://github.com/KimiNewt/pyshark",
license="MIT",
Expand All @@ -24,5 +24,7 @@
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
],
)
Loading