Skip to content

Commit 0f52efc

Browse files
gavin-aguiarGavin Aguiar
and
Gavin Aguiar
authoredJul 6, 2023
Retry policy support for v2 programming model (#1268)
* Retry policies * Additional changes for retry policy * Added retry policy support for v2 function * Minor updates and added e2e tests * Reverted change to worker config * Added e2e tests * Updated according to new libary * Added tests * Fixed existing tests * Added unit tests * Updated time format * Updated time format --------- Co-authored-by: Gavin Aguiar <gavin@GavinPC>
1 parent 6872fc0 commit 0f52efc

File tree

12 files changed

+270
-31
lines changed

12 files changed

+270
-31
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,28 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
33

4+
from dataclasses import dataclass
5+
from enum import Enum
6+
47
from . import rpcexception
58

69

10+
class RetryPolicy(Enum):
11+
"""Retry policy for the function invocation"""
12+
13+
MAX_RETRY_COUNT = "max_retry_count"
14+
STRATEGY = "strategy"
15+
DELAY_INTERVAL = "delay_interval"
16+
MINIMUM_INTERVAL = "minimum_interval"
17+
MAXIMUM_INTERVAL = "maximum_interval"
18+
19+
20+
@dataclass
721
class RetryContext:
8-
"""Check https://docs.microsoft.com/en-us/azure/azure-functions/
9-
functions-bindings-error-pages?tabs=python#retry-policies-preview"""
10-
11-
def __init__(self,
12-
retry_count: int,
13-
max_retry_count: int,
14-
rpc_exception: rpcexception.RpcException) -> None:
15-
self.__retry_count = retry_count
16-
self.__max_retry_count = max_retry_count
17-
self.__rpc_exception = rpc_exception
18-
19-
@property
20-
def retry_count(self) -> int:
21-
"""Gets the current retry count from retry-context"""
22-
return self.__retry_count
23-
24-
@property
25-
def max_retry_count(self) -> int:
26-
"""Gets the max retry count from retry-context"""
27-
return self.__max_retry_count
28-
29-
@property
30-
def exception(self) -> rpcexception.RpcException:
31-
return self.__rpc_exception
22+
"""Gets the current retry count from retry-context"""
23+
retry_count: int
24+
25+
"""Gets the max retry count from retry-context"""
26+
max_retry_count: int
27+
28+
rpc_exception: rpcexception.RpcException

‎azure_functions_worker/constants.py

+3
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,6 @@
5151
SCRIPT_FILE_NAME = "function_app.py"
5252

5353
PYTHON_LANGUAGE_RUNTIME = "python"
54+
55+
# Settings for V2 programming model
56+
RETRY_POLICY = "retry_policy"

‎azure_functions_worker/dispatcher.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,10 @@ def index_functions(self, function_path: str):
609609
len(indexed_functions))
610610

611611
if indexed_functions:
612+
fx_metadata_results = loader.process_indexed_function(
613+
self._functions,
614+
indexed_functions)
615+
612616
indexed_function_logs: List[str] = []
613617
for func in indexed_functions:
614618
function_log = "Function Name: {}, Function Binding: {}" \
@@ -621,10 +625,6 @@ def index_functions(self, function_path: str):
621625
'Successfully processed FunctionMetadataRequest for '
622626
'functions: %s', " ".join(indexed_function_logs))
623627

624-
fx_metadata_results = loader.process_indexed_function(
625-
self._functions,
626-
indexed_functions)
627-
628628
return fx_metadata_results
629629

630630
async def _handle__close_shared_memory_resources_request(self, request):

‎azure_functions_worker/loader.py

+47-1
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@
88
import os.path
99
import pathlib
1010
import sys
11+
import time
12+
from datetime import timedelta
1113
from os import PathLike, fspath
1214
from typing import Optional, Dict
1315

16+
from google.protobuf.duration_pb2 import Duration
17+
1418
from . import protos, functions
19+
from .bindings.retrycontext import RetryPolicy
1520
from .constants import MODULE_NOT_FOUND_TS_URL, SCRIPT_FILE_NAME, \
16-
PYTHON_LANGUAGE_RUNTIME
21+
PYTHON_LANGUAGE_RUNTIME, RETRY_POLICY
1722
from .utils.wrappers import attach_message_to_exception
1823

1924
_AZURE_NAMESPACE = '__app__'
@@ -45,6 +50,12 @@ def install() -> None:
4550
sys.modules[_AZURE_NAMESPACE] = ns_pkg
4651

4752

53+
def convert_to_seconds(timestr: str):
54+
x = time.strptime(timestr, '%H:%M:%S')
55+
return int(timedelta(hours=x.tm_hour, minutes=x.tm_min,
56+
seconds=x.tm_sec).total_seconds())
57+
58+
4859
def uninstall() -> None:
4960
pass
5061

@@ -60,6 +71,39 @@ def build_binding_protos(indexed_function) -> Dict:
6071
return binding_protos
6172

6273

74+
def build_retry_protos(indexed_function) -> Dict:
75+
retry = indexed_function.get_settings_dict(RETRY_POLICY)
76+
if not retry:
77+
return None
78+
79+
strategy = retry.get(RetryPolicy.STRATEGY.value)
80+
if strategy == "fixed_delay":
81+
delay_interval = Duration(
82+
seconds=convert_to_seconds(
83+
retry.get(RetryPolicy.DELAY_INTERVAL.value)))
84+
retry_protos = protos.RpcRetryOptions(
85+
max_retry_count=int(retry.get(RetryPolicy.MAX_RETRY_COUNT.value)),
86+
retry_strategy=retry.get(RetryPolicy.STRATEGY.value),
87+
delay_interval=delay_interval,
88+
)
89+
else:
90+
minimum_interval = Duration(
91+
seconds=convert_to_seconds(
92+
retry.get(RetryPolicy.MINIMUM_INTERVAL.value)))
93+
maximum_interval = Duration(
94+
seconds=convert_to_seconds(
95+
retry.get(RetryPolicy.MAXIMUM_INTERVAL.value)))
96+
97+
retry_protos = protos.RpcRetryOptions(
98+
max_retry_count=int(retry.get(RetryPolicy.MAX_RETRY_COUNT.value)),
99+
retry_strategy=retry.get(RetryPolicy.STRATEGY.value),
100+
minimum_interval=minimum_interval,
101+
maximum_interval=maximum_interval
102+
)
103+
104+
return retry_protos
105+
106+
63107
def process_indexed_function(functions_registry: functions.Registry,
64108
indexed_functions):
65109
fx_metadata_results = []
@@ -68,6 +112,7 @@ def process_indexed_function(functions_registry: functions.Registry,
68112
function=indexed_function)
69113

70114
binding_protos = build_binding_protos(indexed_function)
115+
retry_protos = build_retry_protos(indexed_function)
71116

72117
function_metadata = protos.RpcFunctionMetadata(
73118
name=function_info.name,
@@ -80,6 +125,7 @@ def process_indexed_function(functions_registry: functions.Registry,
80125
language=PYTHON_LANGUAGE_RUNTIME,
81126
bindings=binding_protos,
82127
raw_bindings=indexed_function.get_raw_bindings(),
128+
retry_options=retry_protos,
83129
properties={"worker_indexed": "True"})
84130

85131
fx_metadata_results.append(function_metadata)

‎azure_functions_worker/protos/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
CloseSharedMemoryResourcesResponse,
3333
FunctionsMetadataRequest,
3434
FunctionMetadataResponse,
35-
WorkerMetadata)
35+
WorkerMetadata,
36+
RpcRetryOptions)
3637

3738
from .shared.NullableTypes_pb2 import (
3839
NullableString,

‎azure_functions_worker/protos/_src/src/proto/FunctionRpc.proto

+46
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,13 @@ message StreamingMessage {
8686

8787
// Host gets the list of function load responses
8888
FunctionLoadResponseCollection function_load_response_collection = 32;
89+
90+
// Host sends required metadata to worker to warmup the worker
91+
WorkerWarmupRequest worker_warmup_request = 33;
92+
93+
// Worker responds after warming up with the warmup result
94+
WorkerWarmupResponse worker_warmup_response = 34;
95+
8996
}
9097
}
9198

@@ -330,6 +337,9 @@ message RpcFunctionMetadata {
330337
// A flag indicating if managed dependency is enabled or not
331338
bool managed_dependency_enabled = 14;
332339

340+
// The optional function execution retry strategy to use on invocation failures.
341+
RpcRetryOptions retry_options = 15;
342+
333343
// Properties for function metadata
334344
// They're usually specific to a worker and largely passed along to the controller API for use
335345
// outside the host
@@ -423,6 +433,15 @@ message InvocationResponse {
423433
StatusResult result = 3;
424434
}
425435

436+
message WorkerWarmupRequest {
437+
// Full path of worker.config.json location
438+
string worker_directory = 1;
439+
}
440+
441+
message WorkerWarmupResponse {
442+
StatusResult result = 1;
443+
}
444+
426445
// Used to encapsulate data which could be a variety of types
427446
message TypedData {
428447
oneof data {
@@ -681,4 +700,31 @@ message ModelBindingData
681700
// Used to encapsulate collection model_binding_data
682701
message CollectionModelBindingData {
683702
repeated ModelBindingData model_binding_data = 1;
703+
}
704+
705+
// Retry policy which the worker sends the host when the worker indexes
706+
// a function.
707+
message RpcRetryOptions
708+
{
709+
// The retry strategy to use. Valid values are fixed delay or exponential backoff.
710+
enum RetryStrategy
711+
{
712+
exponential_backoff = 0;
713+
fixed_delay = 1;
714+
}
715+
716+
// The maximum number of retries allowed per function execution.
717+
// -1 means to retry indefinitely.
718+
int32 max_retry_count = 2;
719+
720+
// The delay that's used between retries when you're using a fixed delay strategy.
721+
google.protobuf.Duration delay_interval = 3;
722+
723+
// The minimum retry delay when you're using an exponential backoff strategy
724+
google.protobuf.Duration minimum_interval = 4;
725+
726+
// The maximum retry delay when you're using an exponential backoff strategy
727+
google.protobuf.Duration maximum_interval = 5;
728+
729+
RetryStrategy retry_strategy = 6;
684730
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from azure.functions import FunctionApp, TimerRequest, Context, AuthLevel
2+
import logging
3+
4+
app = FunctionApp(http_auth_level=AuthLevel.ANONYMOUS)
5+
6+
7+
@app.timer_trigger(schedule="*/1 * * * * *", arg_name="mytimer",
8+
run_on_startup=False,
9+
use_monitor=False)
10+
@app.retry(strategy="exponential_backoff", max_retry_count="3",
11+
minimum_interval="00:00:01",
12+
maximum_interval="00:00:02")
13+
def mytimer(mytimer: TimerRequest, context: Context) -> None:
14+
logging.info(f'Current retry count: {context.retry_context.retry_count}')
15+
16+
if context.retry_context.retry_count == \
17+
context.retry_context.max_retry_count:
18+
logging.info(
19+
f"Max retries of {context.retry_context.max_retry_count} for "
20+
f"function {context.function_name} has been reached")
21+
else:
22+
raise Exception("This is a retryable exception")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from azure.functions import FunctionApp, TimerRequest, Context, AuthLevel
2+
import logging
3+
4+
app = FunctionApp(http_auth_level=AuthLevel.ANONYMOUS)
5+
6+
7+
@app.timer_trigger(schedule="*/1 * * * * *", arg_name="mytimer",
8+
run_on_startup=False,
9+
use_monitor=False)
10+
@app.retry(strategy="fixed_delay", max_retry_count="3",
11+
delay_interval="00:00:01")
12+
def mytimer(mytimer: TimerRequest, context: Context) -> None:
13+
logging.info(f'Current retry count: {context.retry_context.retry_count}')
14+
15+
if context.retry_context.retry_count == \
16+
context.retry_context.max_retry_count:
17+
logging.info(
18+
f"Max retries of {context.retry_context.max_retry_count} for "
19+
f"function {context.function_name} has been reached")
20+
else:
21+
raise Exception("This is a retryable exception")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import time
4+
import typing
5+
6+
from tests.utils import testutils
7+
8+
9+
class TestFixedRetryPolicyFunctions(testutils.WebHostTestCase):
10+
11+
@classmethod
12+
def get_script_dir(cls):
13+
return testutils.E2E_TESTS_FOLDER / 'retry_policy_functions' / \
14+
'fixed_strategy'
15+
16+
def test_fixed_retry_policy(self):
17+
# Checking webhost status.
18+
time.sleep(5)
19+
r = self.webhost.request('GET', '', no_prefix=True)
20+
self.assertTrue(r.ok)
21+
22+
def check_log_fixed_retry_policy(self, host_out: typing.List[str]):
23+
self.assertIn('Current retry count: 0', host_out)
24+
self.assertIn('Current retry count: 1', host_out)
25+
self.assertIn("Max retries of 3 for function mytimer"
26+
" has been reached", host_out)
27+
28+
29+
class TestExponentialRetryPolicyFunctions(testutils.WebHostTestCase):
30+
31+
@classmethod
32+
def get_script_dir(cls):
33+
return testutils.E2E_TESTS_FOLDER / 'retry_policy_functions' / \
34+
'exponential_strategy'
35+
36+
def test_retry_policy(self):
37+
# Checking webhost status.
38+
r = self.webhost.request('GET', '', no_prefix=True,
39+
timeout=5)
40+
time.sleep(5)
41+
self.assertTrue(r.ok)
42+
43+
def check_log_retry_policy(self, host_out: typing.List[str]):
44+
self.assertIn('Current retry count: 1', host_out)
45+
self.assertIn('Current retry count: 2', host_out)
46+
self.assertIn('Current retry count: 3', host_out)
47+
self.assertIn("Max retries of 3 for function mytimer"
48+
" has been reached", host_out)

‎tests/unittests/test_dispatcher.py

+11
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,17 @@ async def test_dispatcher_functions_metadata_request(self):
572572
self.assertEqual(r.response.result.status,
573573
protos.StatusResult.Success)
574574

575+
async def test_dispatcher_functions_metadata_request_with_retry(self):
576+
"""Test if the functions metadata response will be sent correctly
577+
when a functions metadata request is received
578+
"""
579+
async with self._ctrl as host:
580+
r = await host.get_functions_metadata()
581+
self.assertIsInstance(r.response, protos.FunctionMetadataResponse)
582+
self.assertFalse(r.response.use_default_metadata_indexing)
583+
self.assertEqual(r.response.result.status,
584+
protos.StatusResult.Success)
585+
575586

576587
class TestDispatcherSteinLegacyFallback(testutils.AsyncTestCase):
577588

‎tests/unittests/test_loader.py

+44
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,55 @@
66
import sys
77
import textwrap
88

9+
from azure.functions import Function
10+
from azure.functions.decorators.retry_policy import RetryPolicy
11+
from azure.functions.decorators.timer import TimerTrigger
12+
13+
from azure_functions_worker import functions
14+
from azure_functions_worker.loader import build_retry_protos
915
from tests.utils import testutils
1016

1117

1218
class TestLoader(testutils.WebHostTestCase):
1319

20+
def setUp(self) -> None:
21+
def test_function():
22+
return "Test"
23+
24+
self.test_function = test_function
25+
self.func = Function(self.test_function, script_file="test.py")
26+
self.function_registry = functions.Registry()
27+
28+
def test_building_fixed_retry_protos(self):
29+
trigger = TimerTrigger(schedule="*/1 * * * * *", arg_name="mytimer",
30+
name="mytimer")
31+
self.func.add_trigger(trigger=trigger)
32+
setting = RetryPolicy(strategy="fixed_delay", max_retry_count="1",
33+
delay_interval="00:02:00")
34+
self.func.add_setting(setting=setting)
35+
36+
protos = build_retry_protos(self.func)
37+
self.assertEqual(protos.max_retry_count, 1)
38+
self.assertEqual(protos.retry_strategy, 1) # 1 enum for fixed delay
39+
self.assertEqual(protos.delay_interval.seconds, 120)
40+
41+
def test_building_exponential_retry_protos(self):
42+
trigger = TimerTrigger(schedule="*/1 * * * * *", arg_name="mytimer",
43+
name="mytimer")
44+
self.func.add_trigger(trigger=trigger)
45+
setting = RetryPolicy(strategy="exponential_backoff",
46+
max_retry_count="1",
47+
minimum_interval="00:01:00",
48+
maximum_interval="00:02:00")
49+
self.func.add_setting(setting=setting)
50+
51+
protos = build_retry_protos(self.func)
52+
self.assertEqual(protos.max_retry_count, 1)
53+
self.assertEqual(protos.retry_strategy,
54+
0) # 0 enum for exponential backoff
55+
self.assertEqual(protos.minimum_interval.seconds, 60)
56+
self.assertEqual(protos.maximum_interval.seconds, 120)
57+
1458
@classmethod
1559
def get_script_dir(cls):
1660
return testutils.UNIT_TESTS_FOLDER / 'load_functions'

‎tests/utils/testutils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,7 @@ def popen_webhost(*, stdout, stderr, script_root=FUNCS_PATH, port=None):
815815
if coretools_exe:
816816
coretools_exe = coretools_exe.strip()
817817
if pathlib.Path(coretools_exe).exists():
818-
hostexe_args = [str(coretools_exe), 'host', 'start']
818+
hostexe_args = [str(coretools_exe), 'host', 'start', '--verbose']
819819
if port is not None:
820820
hostexe_args.extend(['--port', str(port)])
821821

0 commit comments

Comments
 (0)
Please sign in to comment.