Skip to content

Commit 4940236

Browse files
felipoubasepi
andauthored
Add support for grpc aio server interceptor (#1870)
* add support for grpc aio server instrumentation * Add conditional instrumentation for grpc aio server * fix grpc async issues detected on CI * add tests with async grpc server * fix and move async grpc server interceptor * Move grpc async registration to python 3.7 section Async tracing is only supported from python 3.7. * Add try-except when checking grpc lib instrumentation * Remove explicit grpc version check Just let the instrumentation system attempt to import it, and abort if it fails. * CHANGELOG --------- Co-authored-by: Colton Myers <[email protected]>
1 parent bf7534f commit 4940236

File tree

7 files changed

+183
-19
lines changed

7 files changed

+183
-19
lines changed

CHANGELOG.asciidoc

+13
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ endif::[]
2929
//===== Bug fixes
3030
//
3131
32+
=== Unreleased
33+
34+
// Unreleased changes go here
35+
// When the next release happens, nest these changes under the "Python Agent version 6.x" heading
36+
[float]
37+
===== Features
38+
39+
* Add support for grpc aio server interceptor {pull}1870[#1870]
40+
41+
//[float]
42+
//===== Bug fixes
43+
//
44+
3245
3346
[[release-notes-6.x]]
3447
=== Python Agent version 6.x
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2022, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
import inspect
32+
33+
import grpc
34+
35+
import elasticapm
36+
from elasticapm.contrib.grpc.server_interceptor import _ServicerContextWrapper, _wrap_rpc_behavior, get_trace_parent
37+
38+
39+
class _AsyncServerInterceptor(grpc.aio.ServerInterceptor):
40+
async def intercept_service(self, continuation, handler_call_details):
41+
def transaction_wrapper(behavior, request_streaming, response_streaming):
42+
async def _interceptor(request_or_iterator, context):
43+
if request_streaming or response_streaming: # only unary-unary is supported
44+
return behavior(request_or_iterator, context)
45+
tp = get_trace_parent(handler_call_details)
46+
client = elasticapm.get_client()
47+
transaction = client.begin_transaction("request", trace_parent=tp)
48+
try:
49+
result = behavior(request_or_iterator, _ServicerContextWrapper(context, transaction))
50+
51+
# This is so we can support both sync and async rpc functions
52+
if inspect.isawaitable(result):
53+
result = await result
54+
55+
if transaction and not transaction.outcome:
56+
transaction.set_success()
57+
return result
58+
except Exception:
59+
if transaction:
60+
transaction.set_failure()
61+
client.capture_exception(handled=False)
62+
raise
63+
finally:
64+
client.end_transaction(name=handler_call_details.method)
65+
66+
return _interceptor
67+
68+
return _wrap_rpc_behavior(await continuation(handler_call_details), transaction_wrapper)

elasticapm/contrib/grpc/server_interceptor.py

+14-10
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ def _wrap_rpc_behavior(handler, continuation):
6262
)
6363

6464

65+
def get_trace_parent(handler_call_details):
66+
traceparent, tracestate = None, None
67+
for metadata in handler_call_details.invocation_metadata:
68+
if metadata.key == "traceparent":
69+
traceparent = metadata.value
70+
elif metadata.key == "tracestate":
71+
tracestate = metadata.key
72+
if traceparent:
73+
return TraceParent.from_string(traceparent, tracestate)
74+
else:
75+
return None
76+
77+
6578
class _ServicerContextWrapper(wrapt.ObjectProxy):
6679
def __init__(self, wrapped, transaction):
6780
self._self_transaction = transaction
@@ -87,16 +100,7 @@ def transaction_wrapper(behavior, request_streaming, response_streaming):
87100
def _interceptor(request_or_iterator, context):
88101
if request_streaming or response_streaming: # only unary-unary is supported
89102
return behavior(request_or_iterator, context)
90-
traceparent, tracestate = None, None
91-
for metadata in handler_call_details.invocation_metadata:
92-
if metadata.key == "traceparent":
93-
traceparent = metadata.value
94-
elif metadata.key == "tracestate":
95-
tracestate = metadata.key
96-
if traceparent:
97-
tp = TraceParent.from_string(traceparent, tracestate)
98-
else:
99-
tp = None
103+
tp = get_trace_parent(handler_call_details)
100104
client = elasticapm.get_client()
101105
transaction = client.begin_transaction("request", trace_parent=tp)
102106
try:

elasticapm/instrumentation/packages/grpc.py

+18
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,21 @@ def call(self, module, method, wrapped, instance, args, kwargs):
7474
else:
7575
kwargs["interceptors"] = interceptors
7676
return wrapped(*args, **kwargs)
77+
78+
79+
class GRPCAsyncServerInstrumentation(AbstractInstrumentedModule):
80+
name = "grpc_async_server_instrumentation"
81+
creates_transactions = True
82+
instrument_list = [("grpc.aio", "server")]
83+
84+
def call(self, module, method, wrapped, instance, args, kwargs):
85+
from elasticapm.contrib.grpc.async_server_interceptor import _AsyncServerInterceptor
86+
87+
interceptors = kwargs.get("interceptors") or (args[2] if len(args) > 2 else [])
88+
interceptors.insert(0, _AsyncServerInterceptor())
89+
if len(args) > 2:
90+
args = list(args)
91+
args[2] = interceptors
92+
else:
93+
kwargs["interceptors"] = interceptors
94+
return wrapped(*args, **kwargs)

elasticapm/instrumentation/register.py

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
"elasticapm.instrumentation.packages.asyncio.starlette.StarletteServerErrorMiddlewareInstrumentation",
9595
"elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisAsyncioInstrumentation",
9696
"elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisPipelineInstrumentation",
97+
"elasticapm.instrumentation.packages.grpc.GRPCAsyncServerInstrumentation",
9798
]
9899
)
99100

tests/contrib/grpc/grpc_app/server.py

+41-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2929
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3030

31+
import asyncio
3132
import logging
33+
import os
3234
import sys
3335
from concurrent import futures
3436

@@ -67,6 +69,30 @@ def GetServerResponseException(self, request, context):
6769
raise Exception("oh no")
6870

6971

72+
class TestServiceAsync(pb2_grpc.TestServiceServicer):
73+
def __init__(self, *args, **kwargs):
74+
pass
75+
76+
async def GetServerResponse(self, request, context):
77+
message = request.message
78+
result = f'Hello I am up and running received "{message}" message from you'
79+
result = {"message": result, "received": True}
80+
81+
return pb2.MessageResponse(**result)
82+
83+
async def GetServerResponseAbort(self, request, context):
84+
await context.abort(grpc.StatusCode.INTERNAL, "foo")
85+
86+
async def GetServerResponseUnavailable(self, request, context):
87+
"""Missing associated documentation comment in .proto file."""
88+
context.set_code(grpc.StatusCode.UNAVAILABLE)
89+
context.set_details("Method not available")
90+
return pb2.MessageResponse(message="foo", received=True)
91+
92+
async def GetServerResponseException(self, request, context):
93+
raise Exception("oh no")
94+
95+
7096
def serve(port):
7197
apm_client = GRPCApmClient(
7298
service_name="grpc-server", disable_metrics="*", api_request_time="100ms", central_config="False"
@@ -78,10 +104,24 @@ def serve(port):
78104
server.wait_for_termination()
79105

80106

107+
async def serve_async(port):
108+
apm_client = GRPCApmClient(
109+
service_name="grpc-server", disable_metrics="*", api_request_time="100ms", central_config="False"
110+
)
111+
server = grpc.aio.server()
112+
pb2_grpc.add_TestServiceServicer_to_server(TestServiceAsync(), server)
113+
server.add_insecure_port(f"[::]:{port}")
114+
await server.start()
115+
await server.wait_for_termination()
116+
117+
81118
if __name__ == "__main__":
82119
if len(sys.argv) > 1:
83120
port = sys.argv[1]
84121
else:
85122
port = "50051"
86123
logging.basicConfig()
87-
serve(port)
124+
if os.environ.get("GRPC_SERVER_ASYNC") == "1":
125+
asyncio.run(serve_async(port))
126+
else:
127+
serve(port)

tests/contrib/grpc/grpc_client_tests.py

+28-8
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@
4949
from .grpc_app.testgrpc_pb2_grpc import TestServiceStub
5050

5151

52-
@pytest.fixture()
53-
def grpc_server(validating_httpserver, request):
52+
def setup_env(request, validating_httpserver):
5453
config = getattr(request, "param", {})
5554
env = {f"ELASTIC_APM_{k.upper()}": str(v) for k, v in config.items()}
5655
env.setdefault("ELASTIC_APM_SERVER_URL", validating_httpserver.url)
56+
return env
57+
58+
59+
def setup_grpc_server(env):
5760
free_port = get_free_port()
5861
server_proc = subprocess.Popen(
5962
[os.path.join(sys.prefix, "bin", "python"), "-m", "tests.contrib.grpc.grpc_app.server", str(free_port)],
@@ -62,15 +65,32 @@ def grpc_server(validating_httpserver, request):
6265
env=env,
6366
)
6467
wait_for_open_port(free_port)
65-
yield f"localhost:{free_port}"
66-
server_proc.terminate()
68+
return server_proc, free_port
6769

6870

6971
@pytest.fixture()
70-
def grpc_client_and_server_url(grpc_server):
71-
test_channel = grpc.insecure_channel(grpc_server)
72+
def env_fixture(validating_httpserver, request):
73+
env = setup_env(request, validating_httpserver)
74+
return env
75+
76+
77+
if hasattr(grpc, "aio"):
78+
grpc_server_fixture_params = ["async", "sync"]
79+
else:
80+
grpc_server_fixture_params = ["sync"]
81+
82+
83+
@pytest.fixture(params=grpc_server_fixture_params)
84+
def grpc_client_and_server_url(env_fixture, request):
85+
env = {k: v for k, v in env_fixture.items()}
86+
if request.param == "async":
87+
env["GRPC_SERVER_ASYNC"] = "1"
88+
server_proc, free_port = setup_grpc_server(env)
89+
server_addr = f"localhost:{free_port}"
90+
test_channel = grpc.insecure_channel(server_addr)
7291
test_client = TestServiceStub(test_channel)
73-
yield test_client, grpc_server
92+
yield test_client, server_addr
93+
server_proc.terminate()
7494

7595

7696
def test_grpc_client_server_instrumentation(instrument, sending_elasticapm_client, grpc_client_and_server_url):
@@ -200,7 +220,7 @@ def test_grpc_client_unsampled_span(instrument, sending_elasticapm_client, grpc_
200220

201221

202222
@pytest.mark.parametrize(
203-
"grpc_server",
223+
"env_fixture",
204224
[
205225
{
206226
"recording": "False",

0 commit comments

Comments
 (0)