Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redis decorators #179

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions azure/functions/decorators/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@
EVENT_GRID_TRIGGER = "eventGridTrigger"
EVENT_GRID = "eventGrid"
TABLE = "table"
REDIS_PUBSUB_TRIGGER = "RedisPubSubTrigger"
REDIS_LIST_TRIGGER = "RedisListTrigger"
REDIS_STREAM_TRIGGER = "RedisStreamTrigger"
165 changes: 162 additions & 3 deletions azure/functions/decorators/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
ServiceBusTopicOutput
from azure.functions.decorators.table import TableInput, TableOutput
from azure.functions.decorators.timer import TimerTrigger
from azure.functions.decorators.redis import RedisPubSubTrigger, RedisListTrigger, \
RedisStreamTrigger
from azure.functions.decorators.utils import parse_singular_param_to_enum, \
parse_iterable_param_to_enums, StringifyEnumJsonEncoder
from azure.functions.http import HttpRequest
Expand Down Expand Up @@ -1028,9 +1030,9 @@ def event_grid_trigger(self,
:class:`EventGridTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining event grid trigger
in the function.json which enables function to be triggered to
respond to an event sent to an event grid topic.
indexing model. This is equivalent to defining RedisPubSubTrigger
in the function.json which enables function to be triggered when
messages are published to a redis pubsub channel.
All optional fields will be given default value by function host when
they are parsed by function host.

Expand Down Expand Up @@ -1058,6 +1060,163 @@ def decorator():

return wrap

def redis_pubsub_trigger(self,
arg_name: str,
connectionStringSetting: str,
channel: str,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The redis_pubsub_trigger decorator adds
:class:`RedisPubSubTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining RedisPubSubTrigger
in the function.json which enables function to be triggered when
messages are published to a redis pubsub channel.
All optional fields will be given default value by function host when
they are parsed by function host.

:param arg_name: the variable name used in function code for the
parameter that receives the event data.
:param connectionStringSetting: Redis connection string setting.
:param channel: Redis pubsub channel name.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:return: Decorator function.
"""

@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=RedisPubSubTrigger(
name=arg_name,
connectionStringSetting=connectionStringSetting,
channel=channel,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb

return decorator()

return wrap

def redis_list_trigger(self,
arg_name: str,
connectionStringSetting: str,
key: str,
pollingIntervalInMs: Optional[int] = 1000,
messagesPerWorker: Optional[int] = 100,
count: Optional[int] = 10,
listPopFromBeginning: Optional[bool] = True,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The redis_pubsub_trigger decorator adds
:class:`RedisListTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining RedisListTrigger
in the function.json which enables function to be triggered when
entries are added to a redis list.
All optional fields will be given default value by function host when
they are parsed by function host.

:param arg_name: the variable name used in function code for the
parameter that receives the event data.
:param connectionStringSetting: Redis connection string setting.
:param key: Key to read from.
:param pollingIntervalInMs: How often to poll Redis in ms.
:param messagesPerWorker: The number of messages each functions
instance is expected to handle.
:param count: Number of elements to pull from Redis at one time.
:param listPopFromBeginning: Decides if the function will pop elements
from the front or end of the list.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:return: Decorator function.
"""

@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=RedisListTrigger(
name=arg_name,
connectionStringSetting=connectionStringSetting,
key=key,
pollingIntervalInMs=pollingIntervalInMs,
messagesPerWorker=messagesPerWorker,
count=count,
listPopFromBeginning=listPopFromBeginning,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb

return decorator()

return wrap

def redis_stream_trigger(self,
arg_name: str,
connectionStringSetting: str,
key: str,
pollingIntervalInMs: Optional[int] = 1000,
messagesPerWorker: Optional[int] = 100,
count: Optional[int] = 10,
deleteAfterProcess: Optional[bool] = False,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The redis_pubsub_trigger decorator adds
:class:`RedisStreamTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining RedisStreamTrigger
in the function.json which enables function to be triggered when
messages are published to a redis pubsub channel.
All optional fields will be given default value by function host when
they are parsed by function host.

:param arg_name: the variable name used in function code for the
parameter that receives the event data.
:param connectionStringSetting: Redis connection string setting.
:param key: Key to read from.
:param pollingIntervalInMs: How often to poll Redis in ms.
:param messagesPerWorker: The number of messages each functions
instance is expected to handle.
:param count: Number of elements to pull from Redis at one time.
:param deleteAfterProcess: Decides if the function will delete the
stream entries after processing.
parameter value.
:return: Decorator function.
"""

@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=RedisStreamTrigger(
name=arg_name,
connectionStringSetting=connectionStringSetting,
connectionStringSetting=connectionStringSetting,
key=key,
pollingIntervalInMs=pollingIntervalInMs,
messagesPerWorker=messagesPerWorker,
count=count,
deleteAfterProcess=deleteAfterProcess,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb

return decorator()

return wrap

def generic_trigger(self,
arg_name: str,
type: str,
Expand Down
67 changes: 67 additions & 0 deletions azure/functions/decorators/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Optional

from azure.functions.decorators.constants import REDIS_PUBSUB_TRIGGER, REDIS_LIST_TRIGGER, REDIS_STREAM_TRIGGER
from azure.functions.decorators.core import Trigger, DataType

class RedisPubSubTrigger(Trigger):
@staticmethod
def get_binding_name() -> str:
return REDIS_PUBSUB_TRIGGER

def __init__(self,
name: str,
connectionStringSetting: str,
channel: str,
data_type: Optional[DataType] = None,
**kwargs) -> None:
self.connectionStringSetting = connectionStringSetting
self.channel = channel
super().__init__(name=name, data_type=data_type)

class RedisListTrigger(Trigger):
@staticmethod
def get_binding_name() -> str:
return REDIS_LIST_TRIGGER

def __init__(self,
name: str,
connectionStringSetting: str,
key: str,
pollingIntervalInMs: Optional[int] = 1000,
messagesPerWorker: Optional[int] = 100,
count: Optional[int] = 10,
listPopFromBeginning: Optional[bool] = True,
data_type: Optional[DataType] = None,
**kwargs) -> None:
self.connectionStringSetting = connectionStringSetting
self.key = key
self.pollingIntervalInMs = pollingIntervalInMs
self.messagesPerWorker = messagesPerWorker
self.count = count
self.listPopFromBeginning = listPopFromBeginning
super().__init__(name=name, data_type=data_type)

class RedisStreamTrigger(Trigger):
@staticmethod
def get_binding_name() -> str:
return REDIS_STREAM_TRIGGER

def __init__(self,
name: str,
connectionStringSetting: str,
key: str,
pollingIntervalInMs: Optional[int] = 1000,
messagesPerWorker: Optional[int] = 100,
count: Optional[int] = 10,
deleteAfterProcess: Optional[bool] = True,
data_type: Optional[DataType] = None,
**kwargs) -> None:
self.connectionStringSetting = connectionStringSetting
self.key = key
self.pollingIntervalInMs = pollingIntervalInMs
self.messagesPerWorker = messagesPerWorker
self.count = count
self.deleteAfterProcess = deleteAfterProcess
super().__init__(name=name, data_type=data_type)
79 changes: 79 additions & 0 deletions tests/decorators/test_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import unittest

from azure.functions.decorators.constants import REDIS_PUBSUB_TRIGGER, REDIS_LIST_TRIGGER, REDIS_STREAM_TRIGGER
from azure.functions.decorators.core import BindingDirection, DataType
from azure.functions.decorators.redis import RedisPubSubTrigger, RedisListTrigger, RedisStreamTrigger


class TestRedis(unittest.TestCase):
def test_pubsub_trigger_valid_creation(self):
trigger = RedisPubSubTrigger(name="req",
connectionStringSetting="dummy_connection",
channel="dummy_channel",
data_type=DataType.UNDEFINED,
dummy_field="dummy")

self.assertEqual(trigger.get_binding_name(), "redisPubSubTrigger")
self.assertEqual(trigger.get_dict_repr(), {
"type": REDIS_PUBSUB_TRIGGER,
"direction": BindingDirection.IN,
'dummyField': 'dummy',
"name": "req",
"dataType": DataType.UNDEFINED,
"connectionStringSetting": "dummy_connection",
"channel": "dummy_channel"
})

def test_list_trigger_valid_creation(self):
trigger = RedisListTrigger(name="req",
connectionStringSetting="dummy_connection",
key="dummy_key",
pollingIntervalInMs=1,
messagesPerWorker=2,
count=3,
listPopFromBeginning=False,
data_type=DataType.UNDEFINED,
dummy_field="dummy")

self.assertEqual(trigger.get_binding_name(), "redisListTrigger")
self.assertEqual(trigger.get_dict_repr(), {
"type": REDIS_LIST_TRIGGER,
"direction": BindingDirection.IN,
'dummyField': 'dummy',
"name": "req",
"dataType": DataType.UNDEFINED,
"connectionStringSetting": "dummy_connection",
"channel": "dummy_channel",
"pollingIntervalInMs": 1,
"messagesPerWorker": 2,
"count": 3,
"listPopFromBeginning": False,
})

def test_stream_trigger_valid_creation(self):
trigger = RedisStreamTrigger(name="req",
connectionStringSetting="dummy_connection",
key="dummy_key",
pollingIntervalInMs=1,
messagesPerWorker=2,
count=3,
deleteAfterProcess=True,
data_type=DataType.UNDEFINED,
dummy_field="dummy")

self.assertEqual(trigger.get_binding_name(), "redisStreamTrigger")
self.assertEqual(trigger.get_dict_repr(), {
"type": REDIS_STREAM_TRIGGER,
"direction": BindingDirection.IN,
'dummyField': 'dummy',
"name": "req",
"dataType": DataType.UNDEFINED,
"connectionStringSetting": "dummy_connection",
"channel": "dummy_channel",
"pollingIntervalInMs": 1,
"messagesPerWorker": 2,
"count": 3,
"deleteAfterProcess": True,
})