Skip to content

Commit 90af0a1

Browse files
authored
[python] Add observability instrumentation to asyncio stack (#33992)
This is already present in the grpc python sync stack and has been missing from aio stack. CC @gnossen <!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. -->
1 parent e1cb290 commit 90af0a1

File tree

4 files changed

+35
-8
lines changed

4 files changed

+35
-8
lines changed

src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call):
1919
def install_context_from_request_call_event(RequestCallEvent event):
2020
maybe_save_server_trace_context(event)
2121

22+
def install_context_from_request_call_event_aio(GrpcCallWrapper event):
23+
pass
24+
2225
def uninstall_context():
2326
pass
2427

@@ -31,5 +34,8 @@ cdef class CensusContext:
3134
def set_census_context_on_call(_CallState call_state, CensusContext census_ctx):
3235
pass
3336

37+
def set_instrumentation_context_on_call_aio(GrpcCallWrapper call_state, CensusContext census_ctx):
38+
pass
39+
3440
def get_deadline_from_context():
3541
return None

src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,14 @@ cdef class _AioCall(GrpcCallWrapper):
295295

296296
async def unary_unary(self,
297297
bytes request,
298-
tuple outbound_initial_metadata):
298+
tuple outbound_initial_metadata,
299+
object context = None):
299300
"""Performs a unary unary RPC.
300301
301302
Args:
302303
request: the serialized requests in bytes.
303304
outbound_initial_metadata: optional outbound metadata.
305+
context: instrumentation context.
304306
"""
305307
cdef tuple ops
306308

@@ -313,6 +315,8 @@ cdef class _AioCall(GrpcCallWrapper):
313315
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
314316
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
315317

318+
if context is not None:
319+
set_instrumentation_context_on_call_aio(self, context)
316320
ops = (initial_metadata_op, send_message_op, send_close_op,
317321
receive_initial_metadata_op, receive_message_op,
318322
receive_status_on_client_op)
@@ -390,7 +394,8 @@ cdef class _AioCall(GrpcCallWrapper):
390394

391395
async def initiate_unary_stream(self,
392396
bytes request,
393-
tuple outbound_initial_metadata):
397+
tuple outbound_initial_metadata,
398+
object context = None):
394399
"""Implementation of the start of a unary-stream call."""
395400
# Peer may prematurely end this RPC at any point. We need a corutine
396401
# that watches if the server sends the final status.
@@ -406,6 +411,8 @@ cdef class _AioCall(GrpcCallWrapper):
406411
cdef Operation send_close_op = SendCloseFromClientOperation(
407412
_EMPTY_FLAGS)
408413

414+
if context is not None:
415+
set_instrumentation_context_on_call_aio(self, context)
409416
outbound_ops = (
410417
initial_metadata_op,
411418
send_message_op,
@@ -429,7 +436,8 @@ cdef class _AioCall(GrpcCallWrapper):
429436

430437
async def stream_unary(self,
431438
tuple outbound_initial_metadata,
432-
object metadata_sent_observer):
439+
object metadata_sent_observer,
440+
object context = None):
433441
"""Actual implementation of the complete unary-stream call.
434442
435443
Needs to pay extra attention to the raise mechanism. If we want to
@@ -460,6 +468,9 @@ cdef class _AioCall(GrpcCallWrapper):
460468
cdef tuple inbound_ops
461469
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
462470
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
471+
472+
if context is not None:
473+
set_instrumentation_context_on_call_aio(self, context)
463474
inbound_ops = (receive_message_op, receive_status_on_client_op)
464475

465476
# Executes all operations in one batch.
@@ -484,7 +495,8 @@ cdef class _AioCall(GrpcCallWrapper):
484495

485496
async def initiate_stream_stream(self,
486497
tuple outbound_initial_metadata,
487-
object metadata_sent_observer):
498+
object metadata_sent_observer,
499+
object context = None):
488500
"""Actual implementation of the complete stream-stream call.
489501
490502
Needs to pay extra attention to the raise mechanism. If we want to
@@ -495,6 +507,9 @@ cdef class _AioCall(GrpcCallWrapper):
495507
# that watches if the server sends the final status.
496508
status_task = self._loop.create_task(self._handle_status_once_received())
497509

510+
if context is not None:
511+
set_instrumentation_context_on_call_aio(self, context)
512+
498513
try:
499514
# Sends out initial_metadata ASAP.
500515
await _send_initial_metadata(self,

src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
401401
# Executes application logic
402402
cdef object response_message
403403
cdef _SyncServicerContext sync_servicer_context
404+
install_context_from_request_call_event_aio(rpc_state)
404405

405406
if _is_async_handler(unary_handler):
406407
# Run async method handlers in this coroutine
@@ -453,6 +454,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
453454
rpc_state.metadata_sent = True
454455
rpc_state.status_sent = True
455456
await execute_batch(rpc_state, finish_ops, loop)
457+
uninstall_context()
456458

457459

458460
async def _finish_handler_with_stream_responses(RPCState rpc_state,

src/python/grpcio/grpc/aio/_call.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ def __init__(
561561
loop,
562562
)
563563
self._request = request
564+
self._context = cygrpc.build_census_context()
564565
self._invocation_task = loop.create_task(self._invoke())
565566
self._init_unary_response_mixin(self._invocation_task)
566567

@@ -574,7 +575,7 @@ async def _invoke(self) -> ResponseType:
574575
# https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785
575576
try:
576577
serialized_response = await self._cython_call.unary_unary(
577-
serialized_request, self._metadata
578+
serialized_request, self._metadata, self._context
578579
)
579580
except asyncio.CancelledError:
580581
if not self.cancelled():
@@ -624,6 +625,7 @@ def __init__(
624625
loop,
625626
)
626627
self._request = request
628+
self._context = cygrpc.build_census_context()
627629
self._send_unary_request_task = loop.create_task(
628630
self._send_unary_request()
629631
)
@@ -635,7 +637,7 @@ async def _send_unary_request(self) -> ResponseType:
635637
)
636638
try:
637639
await self._cython_call.initiate_unary_stream(
638-
serialized_request, self._metadata
640+
serialized_request, self._metadata, self._context
639641
)
640642
except asyncio.CancelledError:
641643
if not self.cancelled():
@@ -679,13 +681,14 @@ def __init__(
679681
loop,
680682
)
681683

684+
self._context = cygrpc.build_census_context()
682685
self._init_stream_request_mixin(request_iterator)
683686
self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))
684687

685688
async def _conduct_rpc(self) -> ResponseType:
686689
try:
687690
serialized_response = await self._cython_call.stream_unary(
688-
self._metadata, self._metadata_sent_observer
691+
self._metadata, self._metadata_sent_observer, self._context
689692
)
690693
except asyncio.CancelledError:
691694
if not self.cancelled():
@@ -731,6 +734,7 @@ def __init__(
731734
response_deserializer,
732735
loop,
733736
)
737+
self._context = cygrpc.build_census_context()
734738
self._initializer = self._loop.create_task(self._prepare_rpc())
735739
self._init_stream_request_mixin(request_iterator)
736740
self._init_stream_response_mixin(self._initializer)
@@ -743,7 +747,7 @@ async def _prepare_rpc(self):
743747
"""
744748
try:
745749
await self._cython_call.initiate_stream_stream(
746-
self._metadata, self._metadata_sent_observer
750+
self._metadata, self._metadata_sent_observer, self._context
747751
)
748752
except asyncio.CancelledError:
749753
if not self.cancelled():

0 commit comments

Comments
 (0)