From 1d9d553a212000955bd20f67d0f5e8ae70d9c4a3 Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Wed, 1 Jun 2022 11:20:04 +1000 Subject: [PATCH 1/4] Move invocation logging into a TPE --- azure_functions_worker/dispatcher.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index eaf43a66..09a663c6 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -92,6 +92,10 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, self._grpc_connected_fut = loop.create_future() self._grpc_thread: threading.Thread = threading.Thread( name='grpc-thread', target=self.__poll_grpc) + self._logging_executor: concurrent.futures.ThreadPoolExecutor \ + = concurrent.futures.ThreadPoolExecutor( + max_workers=1, + thread_name_prefix="dispatch_logging") def get_sync_tp_workers_set(self): """We don't know the exact value of the threadcount set for the Python @@ -372,7 +376,8 @@ async def _handle__function_load_request(self, request): func_request.metadata.directory ) - logger.info('Successfully processed FunctionLoadRequest, ' + self._logging_executor.submit(logger.info, + 'Successfully processed FunctionLoadRequest, ' f'request ID: {self.request_id}, ' f'function ID: {function_id},' f'function Name: {function_name}') @@ -422,7 +427,10 @@ async def _handle__invocation_request(self, request): f'sync threadpool max workers: ' f'{self.get_sync_tp_workers_set()}' ) - logger.info(', '.join(function_invocation_logs)) + + self._logging_executor.submit( + logger.info, + ', '.join(function_invocation_logs)) args = {} for pb in invoc_request.input_data: From 06dcc04a07745a480ed93e8e9e263babcc45cb12 Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Wed, 1 Jun 2022 11:44:31 +1000 Subject: [PATCH 2/4] Fix padding and whitespace --- azure_functions_worker/dispatcher.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 09a663c6..90989d0d 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -94,7 +94,7 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, name='grpc-thread', target=self.__poll_grpc) self._logging_executor: concurrent.futures.ThreadPoolExecutor \ = concurrent.futures.ThreadPoolExecutor( - max_workers=1, + max_workers=1, thread_name_prefix="dispatch_logging") def get_sync_tp_workers_set(self): @@ -376,11 +376,11 @@ async def _handle__function_load_request(self, request): func_request.metadata.directory ) - self._logging_executor.submit(logger.info, - 'Successfully processed FunctionLoadRequest, ' - f'request ID: {self.request_id}, ' - f'function ID: {function_id},' - f'function Name: {function_name}') + self._logging_executor.submit(logger.info, + 'Successfully processed FunctionLoadRequest, ' + f'request ID: {self.request_id}, ' + f'function ID: {function_id},' + f'function Name: {function_name}') return protos.StreamingMessage( request_id=self.request_id, @@ -429,7 +429,7 @@ async def _handle__invocation_request(self, request): ) self._logging_executor.submit( - logger.info, + logger.info, ', '.join(function_invocation_logs)) args = {} From b541720f9599750fd555ec2836b747f0954216ed Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Thu, 2 Jun 2022 14:05:47 +1000 Subject: [PATCH 3/4] push padding for flake8 rule --- azure_functions_worker/dispatcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 90989d0d..ea7890d2 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -376,7 +376,8 @@ async def _handle__function_load_request(self, request): func_request.metadata.directory ) - self._logging_executor.submit(logger.info, + self._logging_executor.submit( + logger.info, 'Successfully processed FunctionLoadRequest, ' f'request ID: {self.request_id}, ' f'function ID: {function_id},' From d6a9fb6d9853373c605831febe3ced5f8b64c09b Mon Sep 17 00:00:00 2001 From: Anthony Shaw Date: Fri, 3 Jun 2022 10:16:51 +1000 Subject: [PATCH 4/4] Change the logger handler to hold the thread pool instead of the dispatcher --- azure_functions_worker/dispatcher.py | 30 +++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index ea7890d2..badc7615 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -92,10 +92,6 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int, self._grpc_connected_fut = loop.create_future() self._grpc_thread: threading.Thread = threading.Thread( name='grpc-thread', target=self.__poll_grpc) - self._logging_executor: concurrent.futures.ThreadPoolExecutor \ - = concurrent.futures.ThreadPoolExecutor( - max_workers=1, - thread_name_prefix="dispatch_logging") def get_sync_tp_workers_set(self): """We don't know the exact value of the threadcount set for the Python @@ -376,12 +372,10 @@ async def _handle__function_load_request(self, request): func_request.metadata.directory ) - self._logging_executor.submit( - logger.info, - 'Successfully processed FunctionLoadRequest, ' - f'request ID: {self.request_id}, ' - f'function ID: {function_id},' - f'function Name: {function_name}') + logger.info('Successfully processed FunctionLoadRequest, ' + f'request ID: {self.request_id}, ' + f'function ID: {function_id},' + f'function Name: {function_name}') return protos.StreamingMessage( request_id=self.request_id, @@ -429,9 +423,7 @@ async def _handle__invocation_request(self, request): f'{self.get_sync_tp_workers_set()}' ) - self._logging_executor.submit( - logger.info, - ', '.join(function_invocation_logs)) + logger.info(', '.join(function_invocation_logs)) args = {} for pb in invoc_request.input_data: @@ -760,6 +752,13 @@ def gen(resp_queue): class AsyncLoggingHandler(logging.Handler): + def __init__(self, *args, **kwargs): + self._logging_tp: concurrent.futures.ThreadPoolExecutor = \ + concurrent.futures.ThreadPoolExecutor( + max_workers=1, + thread_name_prefix="logging") + super().__init__(*args, **kwargs) + def emit(self, record: LogRecord) -> None: # Since we disable console log after gRPC channel is initiated, # we should redirect all the messages into dispatcher. @@ -770,7 +769,10 @@ def emit(self, record: LogRecord) -> None: # buffered in this handler, not calling the emit yet. msg = self.format(record) try: - Dispatcher.current.on_logging(record, msg) + self._logging_tp.submit( + Dispatcher.current.on_logging, + record, + msg) except RuntimeError as runtime_error: # This will cause 'Dispatcher not found' failure. # Logging such of an issue will cause infinite loop of gRPC logging