-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathlabs.py
67 lines (52 loc) · 1.62 KB
/
labs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
###############################################################################
#
# Copyright (c) 2017-2020 Master AI, Inc.
# ALL RIGHTS RESERVED
#
# Use of this library, in source or binary form, is prohibited without written
# approval from Master AI, Inc.
#
###############################################################################
"""
Module to interface with your AutoAuto Labs account.
This is a **synchronous** interface.
"""
import os
from threading import Lock
from auto.asyncio_tools import get_loop, thread_safe
from auto.services.labs.rpc.client_sync import LabsService
def send_message_to_labs(msg):
"""
Send a message `msg` to your AutoAuto Labs account.
Return True if the message was sent, else return False.
"""
client = _global_client()
if 'to_vin' not in msg:
to_user_session = os.environ.get('TO_USER_SESSION', None)
to_username = os.environ.get('TO_USERNAME', None)
if to_user_session:
msg['to_user_session'] = to_user_session
elif to_username:
msg['to_username'] = to_username
did_send = client.send(msg)
return did_send
def receive_message_from_labs():
"""
Wait for the next message from the Labs server, then return it.
"""
client = _global_client()
return client.receive()
# Alias
send = send_message_to_labs
receive = receive_message_from_labs
_GLOBAL_LOCK = Lock()
@thread_safe
def _global_client():
global _CLIENT
with _GLOBAL_LOCK:
try:
_CLIENT
except NameError:
_CLIENT = LabsService(get_loop())
_CLIENT.connect()
return _CLIENT