Skip to content

Commit 5505338

Browse files
committed
Add support for ServiceBus bindings
Fixes: #133
1 parent e0dab63 commit 5505338

File tree

18 files changed

+424
-1
lines changed

18 files changed

+424
-1
lines changed

azure/functions_worker/bindings/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from . import eventhub # NoQA
1414
from . import http # NoQA
1515
from . import queue # NoQA
16+
from . import servicebus # NoQA
1617
from . import timer # NoQA
1718

1819

azure/functions_worker/bindings/meta.py

+18
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,19 @@ def _parse_datetime_metadata(
145145
else:
146146
return cls._parse_datetime(datetime_str)
147147

148+
@classmethod
149+
def _parse_timedelta_metadata(
150+
cls, trigger_metadata: typing.Mapping[str, protos.TypedData],
151+
field: str) -> typing.Optional[datetime.timedelta]:
152+
153+
timedelta_str = cls._decode_trigger_metadata_field(
154+
trigger_metadata, field, python_type=str)
155+
156+
if timedelta_str is None:
157+
return None
158+
else:
159+
return cls._parse_timedelta(timedelta_str)
160+
148161
@classmethod
149162
def _parse_datetime(
150163
cls, datetime_str: str) -> datetime.datetime:
@@ -167,6 +180,11 @@ def _parse_datetime(
167180

168181
return dt.replace(tzinfo=datetime.timezone.utc)
169182

183+
@classmethod
184+
def _parse_timedelta(
185+
cls, timedelta_str: str) -> datetime.timedelta:
186+
raise NotImplementedError
187+
170188

171189
class InConverter(_BaseConverter, binding=None):
172190

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import datetime
2+
import typing
3+
4+
from azure.functions import _servicebus as azf_sbus
5+
6+
from . import meta
7+
from .. import protos
8+
9+
10+
class ServiceBusMessage(azf_sbus.ServiceBusMessage):
11+
"""An HTTP response object."""
12+
13+
def __init__(
14+
self, *,
15+
body: bytes,
16+
content_type: typing.Optional[str]=None,
17+
correlation_id: typing.Optional[str]=None,
18+
expiration_time: typing.Optional[datetime.datetime]=None,
19+
label: typing.Optional[str]=None,
20+
message_id: str,
21+
partition_key: typing.Optional[str]=None,
22+
reply_to: typing.Optional[str]=None,
23+
reply_to_session_id: typing.Optional[str]=None,
24+
scheduled_enqueue_time: typing.Optional[datetime.datetime]=None,
25+
session_id: typing.Optional[str]=None,
26+
time_to_live: typing.Optional[datetime.timedelta]=None,
27+
to: typing.Optional[str]=None,
28+
user_properties: typing.Dict[str, object]) -> None:
29+
30+
self.__body = body
31+
self.__content_type = content_type
32+
self.__correlation_id = correlation_id
33+
self.__expiration_time = expiration_time
34+
self.__label = label
35+
self.__message_id = message_id
36+
self.__partition_key = partition_key
37+
self.__reply_to = reply_to
38+
self.__reply_to_session_id = reply_to_session_id
39+
self.__scheduled_enqueue_time = scheduled_enqueue_time
40+
self.__session_id = session_id
41+
self.__time_to_live = time_to_live
42+
self.__to = to
43+
self.__user_properties = user_properties
44+
45+
def get_body(self) -> bytes:
46+
return self.__body
47+
48+
@property
49+
def content_type(self) -> typing.Optional[str]:
50+
return self.__content_type
51+
52+
@property
53+
def correlation_id(self) -> typing.Optional[str]:
54+
return self.__correlation_id
55+
56+
@property
57+
def expiration_time(self) -> typing.Optional[datetime.datetime]:
58+
return self.__expiration_time
59+
60+
@property
61+
def label(self) -> typing.Optional[str]:
62+
return self.__label
63+
64+
@property
65+
def message_id(self) -> str:
66+
return self.__message_id
67+
68+
@property
69+
def partition_key(self) -> typing.Optional[str]:
70+
return self.__partition_key
71+
72+
@property
73+
def reply_to(self) -> typing.Optional[str]:
74+
return self.__reply_to
75+
76+
@property
77+
def reply_to_session_id(self) -> typing.Optional[str]:
78+
return self.__reply_to_session_id
79+
80+
@property
81+
def scheduled_enqueue_time(self) -> typing.Optional[datetime.datetime]:
82+
return self.__scheduled_enqueue_time
83+
84+
@property
85+
def session_id(self) -> typing.Optional[str]:
86+
return self.__session_id
87+
88+
@property
89+
def time_to_live(self) -> typing.Optional[datetime.timedelta]:
90+
return self.__time_to_live
91+
92+
@property
93+
def to(self) -> typing.Optional[str]:
94+
return self.__to
95+
96+
@property
97+
def user_properties(self) -> typing.Dict[str, object]:
98+
return self.__user_properties
99+
100+
def __repr__(self) -> str:
101+
return (
102+
f'<azure.functions.ServiceBusMessage '
103+
f'message_id={self.message_id} '
104+
f'at 0x{id(self):0x}>'
105+
)
106+
107+
108+
class ServiceBusMessageInConverter(meta.InConverter,
109+
binding='serviceBusTrigger', trigger=True):
110+
111+
@classmethod
112+
def check_input_type_annotation(cls, pytype: type) -> bool:
113+
return issubclass(pytype, azf_sbus.ServiceBusMessage)
114+
115+
@classmethod
116+
def from_proto(cls, data: protos.TypedData, *,
117+
pytype: typing.Optional[type],
118+
trigger_metadata) -> typing.Any:
119+
data_type = data.WhichOneof('data')
120+
121+
if data_type == 'string':
122+
body = data.string.encode('utf-8')
123+
124+
elif data_type == 'bytes':
125+
body = data.bytes
126+
127+
elif data_type == 'json':
128+
body = data.json.encode('utf-8')
129+
130+
else:
131+
raise NotImplementedError(
132+
f'unsupported queue payload type: {data_type}')
133+
134+
if trigger_metadata is None:
135+
raise NotImplementedError(
136+
f'missing trigger metadata for ServiceBus message input')
137+
138+
return ServiceBusMessage(
139+
body=body,
140+
content_type=cls._decode_trigger_metadata_field(
141+
trigger_metadata, 'ContentType', python_type=str),
142+
correlation_id=cls._decode_trigger_metadata_field(
143+
trigger_metadata, 'CorrelationId', python_type=str),
144+
expiration_time=cls._parse_datetime_metadata(
145+
trigger_metadata, 'ExpirationTime'),
146+
label=cls._decode_trigger_metadata_field(
147+
trigger_metadata, 'Label', python_type=str),
148+
message_id=cls._decode_trigger_metadata_field(
149+
trigger_metadata, 'MessageId', python_type=str),
150+
partition_key=cls._decode_trigger_metadata_field(
151+
trigger_metadata, 'PartitionKey', python_type=str),
152+
reply_to=cls._decode_trigger_metadata_field(
153+
trigger_metadata, 'ReplyTo', python_type=str),
154+
reply_to_session_id=cls._decode_trigger_metadata_field(
155+
trigger_metadata, 'ReplyToSessionId', python_type=str),
156+
scheduled_enqueue_time=cls._parse_datetime_metadata(
157+
trigger_metadata, 'ScheduledEnqueueTime'),
158+
session_id=cls._decode_trigger_metadata_field(
159+
trigger_metadata, 'SessionId', python_type=str),
160+
time_to_live=cls._parse_timedelta_metadata(
161+
trigger_metadata, 'TimeToLive'),
162+
to=cls._decode_trigger_metadata_field(
163+
trigger_metadata, 'To', python_type=str),
164+
user_properties=cls._decode_trigger_metadata_field(
165+
trigger_metadata, 'UserProperties', python_type=dict),
166+
)
167+
168+
169+
class ServiceBusMessageOutConverter(meta.OutConverter, binding='serviceBus'):
170+
171+
@classmethod
172+
def check_output_type_annotation(cls, pytype: type) -> bool:
173+
return issubclass(pytype, (str, bytes))
174+
175+
@classmethod
176+
def to_proto(cls, obj: typing.Any, *,
177+
pytype: typing.Optional[type]) -> protos.TypedData:
178+
if isinstance(obj, str):
179+
return protos.TypedData(string=obj)
180+
181+
elif isinstance(obj, bytes):
182+
return protos.TypedData(bytes=obj)
183+
184+
raise NotImplementedError

azure/functions_worker/testutils.py

+4
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,10 @@ def popen_webhost(*, stdout, stderr, script_root=FUNCS_PATH, port=None):
547547
if eventhub:
548548
extra_env['AzureWebJobsEventHubConnectionString'] = eventhub
549549

550+
servicebus = testconfig['azure'].get('servicebus_key')
551+
if servicebus:
552+
extra_env['AzureWebJobsServiceBusConnectionString'] = servicebus
553+
550554
if port is not None:
551555
extra_env['ASPNETCORE_URLS'] = f'http://*:{port}'
552556

setup.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
"id": "Microsoft.Azure.WebJobs.Extensions.EventGrid",
3333
"version": "2.0.0-beta1"
3434
},
35+
{
36+
"id": "Microsoft.Azure.WebJobs.ServiceBus",
37+
"version": "3.0.0-beta5"
38+
},
3539
]
3640

3741

@@ -192,7 +196,7 @@ def run(self):
192196
install_requires=[
193197
'grpcio~=1.14.0',
194198
'grpcio-tools~=1.14.0',
195-
'azure-functions==1.0.0a1',
199+
'azure-functions==1.0.0a3',
196200
],
197201
extras_require={
198202
'dev': [
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import azure.functions as func
2+
3+
4+
def main(req: func.HttpRequest, file: func.InputStream) -> str:
5+
return func.HttpResponse(
6+
file.read().decode('utf-8'), mimetype='application/json')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"disabled": false,
4+
"bindings": [
5+
{
6+
"type": "httpTrigger",
7+
"direction": "in",
8+
"name": "req"
9+
},
10+
{
11+
"type": "blob",
12+
"direction": "in",
13+
"name": "file",
14+
"connection": "AzureWebJobsStorage",
15+
"path": "python-worker-tests/test-servicebus-triggered.txt"
16+
},
17+
{
18+
"type": "http",
19+
"direction": "out",
20+
"name": "$return",
21+
}
22+
]
23+
}

tests/servicebus_functions/host.json

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
"http": {
3+
"routePrefix": "api",
4+
"maxConcurrentRequests": 5,
5+
"maxOutstandingRequests": 30
6+
},
7+
"logger": {
8+
"categoryFilter": {
9+
"defaultLevel": "Info",
10+
},
11+
"categoryLevels": {
12+
"Worker": "Trace"
13+
}
14+
},
15+
"queues": {
16+
"visibilityTimeout": "00:00:10"
17+
},
18+
"swagger": {
19+
"enabled": true
20+
},
21+
"eventHub": {
22+
"maxBatchSize": 1000,
23+
"prefetchCount": 1000,
24+
"batchCheckpointFrequency": 1
25+
},
26+
"healthMonitor": {
27+
"enabled": true,
28+
"healthCheckInterval": "00:00:10",
29+
"healthCheckWindow": "00:02:00",
30+
"healthCheckThreshold": 6,
31+
"counterThreshold": 0.80
32+
},
33+
"functionTimeout": "00:05:00",
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
This function is used to check the host availability in tests.
2+
Please do not remove.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"scriptFile": "main.py",
3+
"disabled": false,
4+
"bindings": [
5+
{
6+
"type": "httpTrigger",
7+
"direction": "in",
8+
"name": "req"
9+
}
10+
]
11+
}
+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
def main(req):
2+
return
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import azure.functions as azf
2+
3+
4+
def main(req: azf.HttpRequest, msg: azf.Out[str]):
5+
msg.set(req.get_body().decode('utf-8'))
6+
7+
return 'OK'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"disabled": false,
4+
5+
"bindings": [
6+
{
7+
"type": "httpTrigger",
8+
"direction": "in",
9+
"name": "req"
10+
},
11+
{
12+
"direction": "out",
13+
"name": "msg",
14+
"queueName": "testqueue",
15+
"connection": "AzureWebJobsServiceBusConnectionString",
16+
"type": "serviceBus"
17+
},
18+
{
19+
"direction": "out",
20+
"name": "$return",
21+
"type": "http"
22+
}
23+
]
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import azure.functions as azf
2+
3+
4+
def main(req: azf.HttpRequest) -> bytes:
5+
return req.get_body()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"disabled": false,
4+
5+
"bindings": [
6+
{
7+
"type": "httpTrigger",
8+
"direction": "in",
9+
"name": "req"
10+
},
11+
{
12+
"direction": "out",
13+
"name": "$return",
14+
"queueName": "testqueue-return",
15+
"connection": "AzureWebJobsServiceBusConnectionString",
16+
"type": "serviceBus"
17+
}
18+
]
19+
}

0 commit comments

Comments
 (0)