-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharduinoCommsModule.py
executable file
·68 lines (55 loc) · 2.03 KB
/
arduinoCommsModule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3
import os
import robomodules as rm
import serial
from messages import *
ADDRESS = os.environ.get("BIND_ADDRESS","localhost")
PORT = os.environ.get("BIND_PORT", 11297)
FREQUENCY = 10
class ArduinoCommsModule(rm.ProtoModule):
def __init__(self, addr, port):
self.subscriptions = [MsgType.CTRL_MSG]
super().__init__(addr, port, message_buffers, MsgType, FREQUENCY, self.subscriptions)
try:
self.serialConnection = serial.Serial('/dev/ttyACM0', 9600, timeout=0)
except Exception:
raise RuntimeError("serial connection to Arduino failed")
def msg_received(self, msg, msg_type):
if msg_type == MsgType.CTRL_MSG:
arduino_msg = self._stringToBinary(self._messageToString(msg))
print(arduino_msg)
self.serialConnection.write(arduino_msg)
def tick(self):
line = self.serialConnection.readline()
if len(line) > 0:
msg = self._stringToMessage(self._binaryToString(line))
if msg:
self.write(msg.SerializeToString(), MsgType.ORIENTATION_MSG)
def _messageToString(self, m):
ans = "$"
for prop in ["x", "y", "z", "roll", "pitch", "yaw", "cameraTilt", "cameraPan"]:
ans += (str(getattr(m, prop)) + ";")
ans += "\0"
return ans
def _stringToBinary(self, s):
return bytes(s, "ascii")
def _binaryToString(self, b):
return b.decode('ascii')
def _stringToMessage(self, s):
ans = OrientationMsg()
if s[0] == '$':
s = s[1:]
else:
return None
numbers = (s.split(";"))[0:-1]
if len(numbers) != 3:
return None
# assumes the values are coming in in that order
for (number, name) in zip(numbers, ["roll", "pitch", "yaw"]):
setattr(ans, name, float(number))
return ans
def main():
module = ArduinoCommsModule(ADDRESS, PORT)
module.run()
if __name__ == "__main__":
main()