-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathinet.py
196 lines (153 loc) · 5.41 KB
/
inet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
###############################################################################
#
# Copyright (c) 2017-2020 Master AI, Inc.
# ALL RIGHTS RESERVED
#
# Use of this library, in source or binary form, is prohibited without written
# approval from Master AI, Inc.
#
###############################################################################
"""
This module provides utilities for querying and controlling the network
interfaces of your device.
This is a **synchronous** interface.
"""
import subprocess
import socket
import fcntl
import struct
import time
import requests
import re
import os
PING_PROTO = os.environ.get('MAI_PING_PROTO', 'https')
PING_HOST = os.environ.get('MAI_PING_HOST', 'ws.autoauto.ai')
class Wireless:
"""
Simple python interface to the `nmcli` utility.
See: https://developer.gnome.org/NetworkManager/stable/nmcli.html
"""
def __init__(self, interface=None):
self.interface = interface
def _error_in_response(self, response):
for line in response.splitlines():
if line.startswith('Error'):
return True
return False
def connect(self, ssid, password):
self.delete_connection(ssid)
response = _run_cmd("nmcli dev wifi connect".split(' ') +
[ssid, 'password', password, 'ifname', self.interface,
'name', ssid]) # , 'hidden', 'yes'
did_connect = not self._error_in_response(response)
if not did_connect:
self.delete_connection(ssid)
time.sleep(2) # Allow connection (and DHCP) to settle.
os.sync()
return did_connect
def current(self):
response = _run_cmd("nmcli --terse --field DEVICE,CONNECTION dev".split(' '))
for line in response.splitlines():
lst = line.split(':')
if len(lst) != 2:
continue
iface, name = lst
if iface == self.interface:
if name in ('', '--'):
return None
return name
return None
def delete_connection(self, ssid_to_delete):
response = _run_cmd("nmcli --terse --fields UUID,NAME,TYPE con show".split(' '))
for line in response.splitlines():
lst = line.split(':')
if len(lst) != 3:
continue
uuid, name, type_ = lst
if type_ == '802-11-wireless' and ssid_to_delete in name:
_run_cmd("nmcli con delete uuid".split(' ') + [uuid])
def radio_power(self, on=None):
if on is True:
_run_cmd('nmcli radio wifi on'.split(' '))
elif on is False:
_run_cmd('nmcli radio wifi off'.split(' '))
else:
response = _run_cmd('nmcli radio wifi'.split(' '))
return 'enabled' in response
def get_ip_address(ifname):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ret = socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15].encode('utf-8'))
)[20:24])
s.close()
return ret
except:
# Humm...
return None
def get_mac_address(ifname):
output = _run_cmd(['ip', 'link', 'show', 'dev', ifname])
match = re.search(r'ether ([^ ]+)', output)
if match is not None:
return match.group(1)
def has_internet_access():
try:
# Consider instead: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.getaddrinfo
params = socket.getaddrinfo(PING_HOST, PING_PROTO, proto=socket.IPPROTO_TCP)[0]
except:
return False
family, type_, proto = params[:3]
sockaddr = params[4]
sock = socket.socket(family, type_, proto)
sock.settimeout(20.0)
try:
sock.connect(sockaddr) # <-- blocking, but respects the `settimeout()` call above
except (socket.timeout, OSError):
sock.close()
return False
sock.close()
try:
req = requests.get(f'{PING_PROTO}://{PING_HOST}/ping', timeout=80.0)
data = req.json()
return req.status_code == 200 and data['text'] == 'pong'
except:
return False
def list_ifaces():
response = _run_cmd('ip link show up'.split(' '))
interfaces = []
for line in response.splitlines():
match = re.search(r'^\d+: ([^:@]+)', line)
if match is not None:
iface = match[1]
interfaces.append(iface)
return interfaces
def list_wifi_ifaces():
try:
response = _run_cmd('nmcli --terse --fields DEVICE,TYPE dev'.split(' '))
except FileNotFoundError:
# nmcli not installed on this system -- assume no wifi in this case
return []
interfaces = []
for line in response.splitlines():
lst = line.split(':')
if len(lst) != 2:
continue
iface, type_ = lst
if type_ == 'wifi':
interfaces.append(iface)
return interfaces
def _run_cmd(cmd):
output = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT).stdout.decode('utf-8')
return output
if __name__ == '__main__':
all_ifaces = list_ifaces()
wifi_ifaces = list_wifi_ifaces()
for iface in all_ifaces:
print(iface, get_ip_address(iface))
wireless = Wireless(wifi_ifaces[0])
print('WiFi interface', wireless.interface)
print('WiFi SSID', wireless.current())