-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathfunction_app.py
3818 lines (3335 loc) · 167 KB
/
function_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import abc
import asyncio
import json
import logging
from abc import ABC
from datetime import time
from typing import Any, Callable, Dict, List, Optional, Union, \
Iterable
from azure.functions.decorators.blob import BlobTrigger, BlobInput, BlobOutput
from azure.functions.decorators.core import Binding, Trigger, DataType, \
AuthLevel, SCRIPT_FILE_NAME, Cardinality, AccessRights, Setting, BlobSource
from azure.functions.decorators.cosmosdb import CosmosDBTrigger, \
CosmosDBOutput, CosmosDBInput, CosmosDBTriggerV3, CosmosDBInputV3, \
CosmosDBOutputV3
from azure.functions.decorators.dapr import DaprBindingOutput, \
DaprBindingTrigger, DaprInvokeOutput, DaprPublishOutput, \
DaprSecretInput, DaprServiceInvocationTrigger, DaprStateInput, \
DaprStateOutput, DaprTopicTrigger
from azure.functions.decorators.eventgrid import EventGridTrigger, \
EventGridOutput
from azure.functions.decorators.eventhub import EventHubTrigger, EventHubOutput
from azure.functions.decorators.http import HttpTrigger, HttpOutput, \
HttpMethod
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod
from azure.functions.decorators.queue import QueueTrigger, QueueOutput
from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \
ServiceBusQueueOutput, ServiceBusTopicTrigger, \
ServiceBusTopicOutput
from azure.functions.decorators.sql import SqlTrigger, SqlInput, SqlOutput
from azure.functions.decorators.table import TableInput, TableOutput
from azure.functions.decorators.timer import TimerTrigger
from azure.functions.decorators.utils import parse_singular_param_to_enum, \
parse_iterable_param_to_enums, StringifyEnumJsonEncoder
from azure.functions.http import HttpRequest
from .generic import GenericInputBinding, GenericTrigger, GenericOutputBinding
from .openai import AssistantSkillTrigger, OpenAIModels, TextCompletionInput, \
AssistantCreateOutput, \
AssistantQueryInput, AssistantPostInput, InputType, EmbeddingsInput, \
semantic_search_system_prompt, \
SemanticSearchInput, EmbeddingsStoreOutput
from .retry_policy import RetryPolicy
from .function_name import FunctionName
from .warmup import WarmUpTrigger
from .._http_asgi import AsgiMiddleware
from .._http_wsgi import WsgiMiddleware, Context
class Function(object):
"""The function object represents a function in Function App. It
encapsulates function metadata and callable and used in the worker
function indexing model. Ref: https://aka.ms/azure-function-ref
"""
def __init__(self, func: Callable[..., Any], script_file: str):
"""Constructor of :class:`FunctionBuilder` object.
:param func: User defined python function instance.
:param script_file: File name indexed by worker to find function.
:param trigger: The trigger object of the function.
:param bindings: The list of binding objects of a function.
:param settings: The list of setting objects of a function.
:param http_type: Http function type.
:param is_http_function: Whether the function is a http function.
"""
self._name: str = func.__name__
self._func = func
self._trigger: Optional[Trigger] = None
self._bindings: List[Binding] = []
self._settings: List[Setting] = []
self.function_script_file = script_file
self.http_type = 'function'
self._is_http_function = False
def __str__(self):
"""Return the function.json representation of the function"""
return self.get_function_json()
def __call__(self, *args, **kwargs):
"""This would allow the Function object to be directly
callable and runnable directly using the interpreter
locally.
Example:
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
return "Hello, World!"
print(http_trigger(None))
➜ python function_app.py
Hello, World!
"""
return self._func(*args, **kwargs)
def add_binding(self, binding: Binding) -> None:
"""Add a binding instance to the function.
:param binding: The binding object to add.
"""
self._bindings.append(binding)
def add_trigger(self, trigger: Trigger) -> None:
"""Add a trigger instance to the function.
:param trigger: The trigger object to add.
:raises ValueError: Raises trigger already exists error if a trigger is
being added to a function which has trigger attached.
"""
if self._trigger:
raise ValueError("A trigger was already registered to this "
"function. Adding another trigger is not the "
"correct behavior as a function can only have one"
" trigger. Existing registered trigger "
f"is {self._trigger.get_dict_repr()} and New "
f"trigger "
f"being added is {trigger.get_dict_repr()}")
self._trigger = trigger
# We still add the trigger info to the bindings to ensure that
# function.json is complete
self._bindings.append(trigger)
def add_setting(self, setting: Setting) -> None:
"""Add a setting instance to the function.
:param setting: The setting object to add
"""
self._settings.append(setting)
def set_http_type(self, http_type: str) -> None:
"""Set or update the http type for the function if :param:`http_type`
.
:param http_type: Http function type.
"""
self.http_type = http_type
def is_http_function(self) -> bool:
return self._is_http_function
def get_trigger(self) -> Optional[Trigger]:
"""Get attached trigger instance of the function.
:return: Trigger instance or None.
"""
return self._trigger
def get_bindings(self) -> List[Binding]:
"""Get all the bindings attached to the function.
:return: Bindings attached to the function.
"""
return self._bindings
def get_setting(self, setting_name: str) -> Optional[Setting]:
"""Get a specific setting attached to the function.
:param setting_name: The name of the setting to search for.
:return: The setting attached to the function (or None if not found).
"""
for setting in self._settings:
if setting.setting_name == setting_name:
return setting
return None
def get_settings_dict(self, setting_name) -> Optional[Dict]:
"""Get a dictionary representation of a setting.
:param: setting_name: The name of the setting to search for.
:return: The dictionary representation of the setting (or None if not
found).
"""
setting = self.get_setting(setting_name)
return setting.get_dict_repr() if setting else None
def get_function_name(self) -> Optional[str]:
"""Get the name of the function.
:return: The name of the function.
"""
function_name_setting = \
self.get_setting("function_name")
return function_name_setting.get_settings_value("function_name") \
if function_name_setting else self._name
def get_raw_bindings(self) -> List[str]:
return [json.dumps(b.get_dict_repr(), cls=StringifyEnumJsonEncoder)
for b in self._bindings]
def get_bindings_dict(self) -> Dict:
"""Get dictionary representation of the bindings of the function.
:return: Dictionary representation of the bindings.
"""
return {"bindings": [b.get_dict_repr() for b in self._bindings]}
def get_dict_repr(self) -> Dict:
"""Get the dictionary representation of the function.
:return: The dictionary representation of the function.
"""
stub_f_json = {
"scriptFile": self.function_script_file
}
stub_f_json.update(self.get_bindings_dict()) # NoQA
return stub_f_json
def get_user_function(self) -> Callable[..., Any]:
"""Get the python function customer defined.
:return: The python function customer defined.
"""
return self._func
def get_function_json(self) -> str:
"""Get the json stringified form of function.
:return: The json stringified form of function.
"""
return json.dumps(self.get_dict_repr(), cls=StringifyEnumJsonEncoder)
class FunctionBuilder(object):
def __init__(self, func, function_script_file):
self._function = Function(func, function_script_file)
def __call__(self, *args, **kwargs):
"""Call the Function object directly"""
return self._function(*args, **kwargs)
def configure_http_type(self, http_type: str) -> 'FunctionBuilder':
self._function.set_http_type(http_type)
return self
def add_trigger(self, trigger: Trigger) -> 'FunctionBuilder':
self._function.add_trigger(trigger=trigger)
return self
def add_binding(self, binding: Binding) -> 'FunctionBuilder':
self._function.add_binding(binding=binding)
return self
def add_setting(self, setting: Setting) -> 'FunctionBuilder':
self._function.add_setting(setting=setting)
return self
def _validate_function(self,
auth_level: Optional[AuthLevel] = None) -> None:
"""
Validates the function information before building the function.
Functions with the same function name are not supported and should
fail indexing. If a function name is not defined, the default is the
method name. This also means that two functions with the same
method name will also fail indexing.
https://github.com/Azure/azure-functions-python-worker/issues/1489
:param auth_level: Http auth level that will be set if http
trigger function auth level is None.
"""
function_name = self._function.get_function_name()
trigger = self._function.get_trigger()
if trigger is None:
raise ValueError(
f"Function {function_name} does not have a trigger. A valid "
f"function must have one and only one trigger registered.")
bindings = self._function.get_bindings()
if trigger not in bindings:
raise ValueError(
f"Function {function_name} trigger {trigger} not present"
f" in bindings {bindings}")
# Set route to function name if unspecified in the http trigger
# Set auth level to function app auth level if unspecified in the
# http trigger
if Trigger.is_supported_trigger_type(trigger, HttpTrigger):
if getattr(trigger, 'route', None) is None:
getattr(trigger, 'init_params').append('route')
setattr(trigger, 'route', function_name)
if getattr(trigger, 'auth_level',
None) is None and auth_level is not None:
getattr(trigger, 'init_params').append('auth_level')
setattr(trigger, 'auth_level',
parse_singular_param_to_enum(auth_level, AuthLevel))
self._function._is_http_function = True
def build(self, auth_level: Optional[AuthLevel] = None) -> Function:
"""
Validates and builds the function object.
:param auth_level: Http auth level that will be set if http
trigger function auth level is None.
"""
self._validate_function(auth_level)
return self._function
class DecoratorApi(ABC):
"""Interface which contains essential decorator function building blocks
to extend for creating new function app or blueprint classes.
"""
def __init__(self, *args, **kwargs):
self._function_builders: List[FunctionBuilder] = []
self._app_script_file: str = SCRIPT_FILE_NAME
def _invoke_df_decorator(self, df_decorator):
"""
Invoke a Durable Functions decorator from the DF SDK, and store the
resulting :class:`FunctionBuilder` object within the `DecoratorApi`.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
function_builder = df_decorator(fb._function._func)
# remove old function builder from `self` and replace
# it with the result of the DF decorator
self._function_builders.pop()
self._function_builders.append(function_builder)
return function_builder
return decorator()
return wrap
def _get_durable_blueprint(self):
"""Attempt to import the Durable Functions SDK from which DF
decorators are implemented.
"""
try:
import azure.durable_functions as df
df_bp = df.Blueprint()
return df_bp
except ImportError:
error_message = \
"Attempted to use a Durable Functions decorator, " \
"but the `azure-functions-durable` SDK package could not be " \
"found. Please install `azure-functions-durable` to use " \
"Durable Functions."
raise Exception(error_message)
@property
def app_script_file(self) -> str:
"""Name of function app script file in which all the functions
are defined. \n
Script file defined here is for placeholder purpose, please refer to
worker defined script file path as the single point of truth.
:return: Script file name.
"""
return self._app_script_file
def function_name(self, name: str,
setting_extra_fields: Optional[Dict[str, Any]] = None,
) -> Callable[..., Any]:
"""Optional: Sets name of the :class:`Function` object. If not set,
it will default to the name of the method name.
:param name: Name of the function.
:param setting_extra_fields: Keyword arguments for specifying
additional setting fields
:return: Decorator function.
"""
if setting_extra_fields is None:
setting_extra_fields = {}
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_setting(setting=FunctionName(
function_name=name,
**setting_extra_fields))
return fb
return decorator()
return wrap
def _validate_type(self,
func: Union[Callable[..., Any], FunctionBuilder]) \
-> FunctionBuilder:
"""Validate the type of the function object and return the created
:class:`FunctionBuilder` object.
:param func: Function object passed to
:meth:`_configure_function_builder`
:raises ValueError: Raise error when func param is neither
:class:`Callable` nor :class:`FunctionBuilder`.
:return: :class:`FunctionBuilder` object.
"""
if isinstance(func, FunctionBuilder):
fb = self._function_builders.pop()
elif callable(func):
fb = FunctionBuilder(func, self._app_script_file)
else:
raise ValueError(
"Unsupported type for function app decorator found.")
return fb
def _configure_function_builder(self, wrap) -> Callable[..., Any]:
"""Decorator function on user defined function to create and return
:class:`FunctionBuilder` object from :class:`Callable` func.
"""
def decorator(func):
fb = self._validate_type(func)
self._function_builders.append(fb)
return wrap(fb)
return decorator
def http_type(self, http_type: str) -> Callable[..., Any]:
"""Set http type of the :class:`Function` object.
:param http_type: Http type of the function.
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.configure_http_type(http_type)
return fb
return decorator()
return wrap
class HttpFunctionsAuthLevelMixin(ABC):
"""Interface to extend for enabling function app level http
authorization level setting"""
def __init__(self, auth_level: Union[AuthLevel, str], *args, **kwargs):
self._auth_level = AuthLevel[auth_level] \
if isinstance(auth_level, str) else auth_level
@property
def auth_level(self) -> AuthLevel:
"""Authorization level of the function app. Will be applied to the http
trigger functions which do not have authorization level specified.
:return: Authorization level of the function app.
"""
return self._auth_level
class TriggerApi(DecoratorApi, ABC):
"""Interface to extend for using existing trigger decorator functions."""
def route(self,
route: Optional[str] = None,
trigger_arg_name: str = 'req',
binding_arg_name: str = '$return',
methods: Optional[
Union[Iterable[str], Iterable[HttpMethod]]] = None,
auth_level: Optional[Union[AuthLevel, str]] = None,
trigger_extra_fields: Optional[Dict[str, Any]] = None,
binding_extra_fields: Optional[Dict[str, Any]] = None
) -> Callable[..., Any]:
"""The route decorator adds :class:`HttpTrigger` and
:class:`HttpOutput` binding to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining HttpTrigger
and HttpOutput binding in the function.json which enables your
function be triggered when http requests hit the specified route.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-http
:param route: Route for the http endpoint, if None, it will be set
to function name if present or user defined python function name.
:param trigger_arg_name: Argument name for :class:`HttpRequest`,
defaults to 'req'.
:param binding_arg_name: Argument name for :class:`HttpResponse`,
defaults to '$return'.
:param methods: A tuple of the HTTP methods to which the function
responds.
:param auth_level: Determines what keys, if any, need to be present
on the request in order to invoke the function.
:return: Decorator function.
:param trigger_extra_fields: Additional fields to include in trigger
json. For example,
>>> data_type='STRING' # 'dataType': 'STRING' in trigger json
:param binding_extra_fields: Additional fields to include in binding
json. For example,
>>> data_type='STRING' # 'dataType': 'STRING' in binding json
"""
if trigger_extra_fields is None:
trigger_extra_fields = {}
if binding_extra_fields is None:
binding_extra_fields = {}
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(trigger=HttpTrigger(
name=trigger_arg_name,
methods=parse_iterable_param_to_enums(methods, HttpMethod),
auth_level=parse_singular_param_to_enum(auth_level,
AuthLevel),
route=route, **trigger_extra_fields))
fb.add_binding(binding=HttpOutput(
name=binding_arg_name, **binding_extra_fields))
return fb
return decorator()
return wrap
def orchestration_trigger(self, context_name: str,
orchestration: Optional[str] = None):
"""Register an Orchestrator Function.
Parameters
----------
context_name: str
Parameter name of the DurableOrchestrationContext object.
orchestration: Optional[str]
Name of Orchestrator Function.
By default, the name of the method is used.
"""
df_bp = self._get_durable_blueprint()
df_decorator = df_bp.orchestration_trigger(context_name,
orchestration)
result = self._invoke_df_decorator(df_decorator)
return result
def entity_trigger(self, context_name: str,
entity_name: Optional[str] = None):
"""Register an Entity Function.
Parameters
----------
context_name: str
Parameter name of the Entity input.
entity_name: Optional[str]
Name of Entity Function.
"""
df_bp = self._get_durable_blueprint()
df_decorator = df_bp.entity_trigger(context_name,
entity_name)
result = self._invoke_df_decorator(df_decorator)
return result
def activity_trigger(self, input_name: str,
activity: Optional[str] = None):
"""Register an Activity Function.
Parameters
----------
input_name: str
Parameter name of the Activity input.
activity: Optional[str]
Name of Activity Function.
"""
df_bp = self._get_durable_blueprint()
df_decorator = df_bp.activity_trigger(input_name, activity)
result = self._invoke_df_decorator(df_decorator)
return result
def timer_trigger(self,
arg_name: str,
schedule: str,
run_on_startup: Optional[bool] = None,
use_monitor: Optional[bool] = None,
data_type: Optional[Union[DataType, str]] = None,
**kwargs: Any) -> Callable[..., Any]:
"""The schedule or timer decorator adds :class:`TimerTrigger` to the
:class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining TimerTrigger
in the function.json which enables your function be triggered on the
specified schedule.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-timer
:param arg_name: The name of the variable that represents the
:class:`TimerRequest` object in function code.
:param schedule: A string representing a CRON expression that will
be used to schedule a function to run.
:param run_on_startup: If true, the function is invoked when the
runtime starts.
:param use_monitor: Set to true or false to indicate whether the
schedule should be monitored.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=TimerTrigger(
name=arg_name,
schedule=schedule,
run_on_startup=run_on_startup,
use_monitor=use_monitor,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb
return decorator()
return wrap
schedule = timer_trigger
def warm_up_trigger(self,
arg_name: str,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable:
"""The warm up decorator adds :class:`WarmUpTrigger` to the
:class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining WarmUpTrigger
in the function.json which enables your function be triggered on the
specified schedule.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-warmup
:param arg_name: The name of the variable that represents the
:class:`TimerRequest` object in function code.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=WarmUpTrigger(
name=arg_name,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb
return decorator()
return wrap
def service_bus_queue_trigger(
self,
arg_name: str,
connection: str,
queue_name: str,
data_type: Optional[Union[DataType, str]] = None,
access_rights: Optional[Union[AccessRights, str]] = None,
is_sessions_enabled: Optional[bool] = None,
cardinality: Optional[Union[Cardinality, str]] = None,
**kwargs: Any) -> Callable[..., Any]:
"""The on_service_bus_queue_change decorator adds
:class:`ServiceBusQueueTrigger` to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining ServiceBusQueueTrigger
in the function.json which enables your function be triggered when
new message(s) are sent to the service bus queue.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-service-bus
:param arg_name: The name of the variable that represents the
:class:`ServiceBusMessage` object in function code.
:param connection: The name of an app setting or setting collection
that specifies how to connect to Service Bus.
:param queue_name: Name of the queue to monitor.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param access_rights: Access rights for the connection string.
:param is_sessions_enabled: True if connecting to a session-aware
queue or subscription.
:param cardinality: Set to many in order to enable batching.
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=ServiceBusQueueTrigger(
name=arg_name,
connection=connection,
queue_name=queue_name,
data_type=parse_singular_param_to_enum(data_type,
DataType),
access_rights=parse_singular_param_to_enum(
access_rights,
AccessRights),
is_sessions_enabled=is_sessions_enabled,
cardinality=parse_singular_param_to_enum(cardinality,
Cardinality),
**kwargs))
return fb
return decorator()
return wrap
def service_bus_topic_trigger(
self,
arg_name: str,
connection: str,
topic_name: str,
subscription_name: str,
data_type: Optional[Union[DataType, str]] = None,
access_rights: Optional[Union[AccessRights, str]] = None,
is_sessions_enabled: Optional[bool] = None,
cardinality: Optional[Union[Cardinality, str]] = None,
**kwargs: Any) -> Callable[..., Any]:
"""The on_service_bus_topic_change decorator adds
:class:`ServiceBusTopicTrigger` to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining ServiceBusTopicTrigger
in the function.json which enables function to be triggered when new
message(s) are sent to the service bus topic.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-service-bus
:param arg_name: The name of the variable that represents the
:class:`ServiceBusMessage` object in function code.
:param connection: The name of an app setting or setting collection
that specifies how to connect to Service Bus.
:param topic_name: Name of the topic to monitor.
:param subscription_name: Name of the subscription to monitor.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param access_rights: Access rights for the connection string.
:param is_sessions_enabled: True if connecting to a session-aware
queue or subscription.
:param cardinality: Set to many in order to enable batching.
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=ServiceBusTopicTrigger(
name=arg_name,
connection=connection,
topic_name=topic_name,
subscription_name=subscription_name,
data_type=parse_singular_param_to_enum(data_type,
DataType),
access_rights=parse_singular_param_to_enum(
access_rights,
AccessRights),
is_sessions_enabled=is_sessions_enabled,
cardinality=parse_singular_param_to_enum(cardinality,
Cardinality),
**kwargs))
return fb
return decorator()
return wrap
def queue_trigger(self,
arg_name: str,
queue_name: str,
connection: str,
data_type: Optional[DataType] = None,
**kwargs) -> Callable[..., Any]:
"""The queue_trigger decorator adds :class:`QueueTrigger` to the
:class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining QueueTrigger
in the function.json which enables function to be triggered when new
message(s) are sent to the storage queue.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-queue
:param arg_name: The name of the variable that represents the
:class:`QueueMessage` object in function code.
:param queue_name: The name of the queue to poll.
:param connection: The name of an app setting or setting collection
that specifies how to connect to Azure Queues.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json.
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=QueueTrigger(
name=arg_name,
queue_name=queue_name,
connection=connection,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb
return decorator()
return wrap
def event_hub_message_trigger(self,
arg_name: str,
connection: str,
event_hub_name: str,
data_type: Optional[
Union[DataType, str]] = None,
cardinality: Optional[
Union[Cardinality, str]] = None,
consumer_group: Optional[
str] = None,
**kwargs: Any) -> Callable[..., Any]:
"""The event_hub_message_trigger decorator adds
:class:`EventHubTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining EventHubTrigger
in the function.json which enables function to be triggered when new
message(s) are sent to the event hub.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-event-hubs
:param arg_name: The name of the variable that represents
:class:`EventHubEvent` object in function code.
:param connection: The name of an app setting or setting collection
that specifies how to connect to Event Hubs.
:param event_hub_name: The name of the event hub.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param cardinality: Set to many in order to enable batching.
:param consumer_group: An optional property that sets the consumer
group used to subscribe to events in the hub.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json.
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=EventHubTrigger(
name=arg_name,
connection=connection,
event_hub_name=event_hub_name,
data_type=parse_singular_param_to_enum(data_type,
DataType),
cardinality=parse_singular_param_to_enum(cardinality,
Cardinality),
consumer_group=consumer_group,
**kwargs))
return fb
return decorator()
return wrap
def cosmos_db_trigger_v3(self,
arg_name: str,
database_name: str,
collection_name: str,
connection_string_setting: str,
lease_collection_name: Optional[str] = None,
lease_connection_string_setting: Optional[
str] = None,
lease_database_name: Optional[str] = None,
create_lease_collection_if_not_exists: Optional[
bool] = None,
leases_collection_throughput: Optional[int] =
None,
lease_collection_prefix: Optional[str] = None,
checkpoint_interval: Optional[int] = None,
checkpoint_document_count: Optional[int] = None,
feed_poll_delay: Optional[int] = None,
lease_renew_interval: Optional[int] = None,
lease_acquire_interval: Optional[int] = None,
lease_expiration_interval: Optional[int] = None,
max_items_per_invocation: Optional[int] = None,
start_from_beginning: Optional[bool] = None,
preferred_locations: Optional[str] = None,
data_type: Optional[
Union[DataType, str]] = None,
**kwargs: Any) -> \
Callable[..., Any]:
"""The cosmos_db_trigger_v3 decorator adds :class:`CosmosDBTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This decorator will work only with extension bundle 2.x
or 3.x. For additional details, please refer
https://aka.ms/cosmosdb-v4-update.
This is equivalent to defining CosmosDBTrigger in the function.json
which enables function to be triggered when CosmosDB data is changed.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/azure-function-binding-cosmosdb-v2
:param arg_name: The name of the variable that represents
:class:`DocumentList` object in function code.
:param database_name: The name of the Azure Cosmos DB database with
the collection being monitored.
:param collection_name: The name of the collection being monitored.
:param connection_string_setting: The name of an app setting or
setting collection that specifies how to connect to the Azure Cosmos
DB account being monitored.
:param lease_collection_name: The name of the collection used to
store leases.
:param lease_connection_string_setting: The name of an app setting
or setting collection that specifies how to connect to the Azure
Cosmos DB account that holds the lease collection.
:param lease_database_name: The name of the database that holds the
collection used to store leases.
:param create_lease_collection_if_not_exists: When set to true,
the leases collection is automatically created when it doesn't
already exist.
:param leases_collection_throughput: Defines the number of Request
Units to assign when the leases collection is created.
:param lease_collection_prefix: When set, the value is added as a
prefix to the leases created in the Lease collection for this
Function.
:param checkpoint_interval: When set, it defines, in milliseconds,
the interval between lease checkpoints. Default is always after a
Function call.
:param checkpoint_document_count: Customizes the amount of documents
between lease checkpoints. Default is always after a Function call.
:param feed_poll_delay: The time (in milliseconds) for the delay
between polling a partition for new changes on the feed, after all
current changes are drained.
:param lease_renew_interval: When set, it defines, in milliseconds,
the renew interval for all leases for partitions currently held by
an instance.
:param lease_acquire_interval: When set, it defines,
in milliseconds, the interval to kick off a task to compute if
partitions are distributed evenly among known host instances.
:param lease_expiration_interval: When set, it defines,
in milliseconds, the interval for which the lease is taken on a
lease representing a partition.
:param max_items_per_invocation: When set, this property sets the
maximum number of items received per Function call.
:param start_from_beginning: This option tells the Trigger to read
changes from the beginning of the collection's change history
instead of starting at the current time.
:param preferred_locations: Defines preferred locations (regions)
for geo-replicated database accounts in the Azure Cosmos DB service.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json.
:return: Decorator function.
"""
trigger = CosmosDBTriggerV3(
name=arg_name,
database_name=database_name,
collection_name=collection_name,
connection_string_setting=connection_string_setting,
lease_collection_name=lease_collection_name,
lease_connection_string_setting=lease_connection_string_setting,
lease_database_name=lease_database_name,
create_lease_collection_if_not_exists=create_lease_collection_if_not_exists, # NoQA
leases_collection_throughput=leases_collection_throughput,
lease_collection_prefix=lease_collection_prefix,
checkpoint_interval=checkpoint_interval,
checkpoint_document_count=checkpoint_document_count,
feed_poll_delay=feed_poll_delay,
lease_renew_interval=lease_renew_interval,
lease_acquire_interval=lease_acquire_interval,
lease_expiration_interval=lease_expiration_interval,