| # Copyright 2016 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Invocation-side implementation of gRPC Python.""" |
| |
| import copy |
| import functools |
| import logging |
| import os |
| import sys |
| import threading |
| import time |
| import types |
| from typing import ( |
| Any, |
| Callable, |
| Dict, |
| Iterator, |
| List, |
| Optional, |
| Sequence, |
| Set, |
| Tuple, |
| Union, |
| ) |
| |
| import grpc # pytype: disable=pyi-error |
| from grpc import _common # pytype: disable=pyi-error |
| from grpc import _compression # pytype: disable=pyi-error |
| from grpc import _grpcio_metadata # pytype: disable=pyi-error |
| from grpc import _observability # pytype: disable=pyi-error |
| from grpc._cython import cygrpc |
| from grpc._typing import ChannelArgumentType |
| from grpc._typing import DeserializingFunction |
| from grpc._typing import IntegratedCallFactory |
| from grpc._typing import MetadataType |
| from grpc._typing import NullaryCallbackType |
| from grpc._typing import ResponseType |
| from grpc._typing import SerializingFunction |
| from grpc._typing import UserTag |
| import grpc.experimental # pytype: disable=pyi-error |
| |
| _LOGGER = logging.getLogger(__name__) |
| |
| _USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) |
| |
| _EMPTY_FLAGS = 0 |
| |
| # NOTE(rbellevi): No guarantees are given about the maintenance of this |
| # environment variable. |
| _DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( |
| os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None |
| ) |
| |
| _UNARY_UNARY_INITIAL_DUE = ( |
| cygrpc.OperationType.send_initial_metadata, |
| cygrpc.OperationType.send_message, |
| cygrpc.OperationType.send_close_from_client, |
| cygrpc.OperationType.receive_initial_metadata, |
| cygrpc.OperationType.receive_message, |
| cygrpc.OperationType.receive_status_on_client, |
| ) |
| _UNARY_STREAM_INITIAL_DUE = ( |
| cygrpc.OperationType.send_initial_metadata, |
| cygrpc.OperationType.send_message, |
| cygrpc.OperationType.send_close_from_client, |
| cygrpc.OperationType.receive_initial_metadata, |
| cygrpc.OperationType.receive_status_on_client, |
| ) |
| _STREAM_UNARY_INITIAL_DUE = ( |
| cygrpc.OperationType.send_initial_metadata, |
| cygrpc.OperationType.receive_initial_metadata, |
| cygrpc.OperationType.receive_message, |
| cygrpc.OperationType.receive_status_on_client, |
| ) |
| _STREAM_STREAM_INITIAL_DUE = ( |
| cygrpc.OperationType.send_initial_metadata, |
| cygrpc.OperationType.receive_initial_metadata, |
| cygrpc.OperationType.receive_status_on_client, |
| ) |
| |
| _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( |
| "Exception calling channel subscription callback!" |
| ) |
| |
| _OK_RENDEZVOUS_REPR_FORMAT = ( |
| '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' |
| ) |
| |
| _NON_OK_RENDEZVOUS_REPR_FORMAT = ( |
| "<{} of RPC that terminated with:\n" |
| "\tstatus = {}\n" |
| '\tdetails = "{}"\n' |
| '\tdebug_error_string = "{}"\n' |
| ">" |
| ) |
| |
| |
| def _deadline(timeout: Optional[float]) -> Optional[float]: |
| return None if timeout is None else time.time() + timeout |
| |
| |
| def _unknown_code_details( |
| unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] |
| ) -> str: |
| return 'Server sent unknown code {} and details "{}"'.format( |
| unknown_cygrpc_code, details |
| ) |
| |
| |
| class _RPCState(object): |
| condition: threading.Condition |
| due: Set[cygrpc.OperationType] |
| initial_metadata: Optional[MetadataType] |
| response: Any |
| trailing_metadata: Optional[MetadataType] |
| code: Optional[grpc.StatusCode] |
| details: Optional[str] |
| debug_error_string: Optional[str] |
| cancelled: bool |
| callbacks: List[NullaryCallbackType] |
| fork_epoch: Optional[int] |
| rpc_start_time: Optional[float] # In relative seconds |
| rpc_end_time: Optional[float] # In relative seconds |
| method: Optional[str] |
| target: Optional[str] |
| |
| def __init__( |
| self, |
| due: Sequence[cygrpc.OperationType], |
| initial_metadata: Optional[MetadataType], |
| trailing_metadata: Optional[MetadataType], |
| code: Optional[grpc.StatusCode], |
| details: Optional[str], |
| ): |
| # `condition` guards all members of _RPCState. `notify_all` is called on |
| # `condition` when the state of the RPC has changed. |
| self.condition = threading.Condition() |
| |
| # The cygrpc.OperationType objects representing events due from the RPC's |
| # completion queue. If an operation is in `due`, it is guaranteed that |
| # `operate()` has been called on a corresponding operation. But the |
| # converse is not true. That is, in the case of failed `operate()` |
| # calls, there may briefly be events in `due` that do not correspond to |
| # operations submitted to Core. |
| self.due = set(due) |
| self.initial_metadata = initial_metadata |
| self.response = None |
| self.trailing_metadata = trailing_metadata |
| self.code = code |
| self.details = details |
| self.debug_error_string = None |
| # The following three fields are used for observability. |
| # Updates to those fields do not trigger self.condition. |
| self.rpc_start_time = None |
| self.rpc_end_time = None |
| self.method = None |
| self.target = None |
| |
| # The semantics of grpc.Future.cancel and grpc.Future.cancelled are |
| # slightly wonky, so they have to be tracked separately from the rest of the |
| # result of the RPC. This field tracks whether cancellation was requested |
| # prior to termination of the RPC. |
| self.cancelled = False |
| self.callbacks = [] |
| self.fork_epoch = cygrpc.get_fork_epoch() |
| |
| def reset_postfork_child(self): |
| self.condition = threading.Condition() |
| |
| |
| def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: |
| if state.code is None: |
| state.code = code |
| state.details = details |
| if state.initial_metadata is None: |
| state.initial_metadata = () |
| state.trailing_metadata = () |
| |
| |
| def _handle_event( |
| event: cygrpc.BaseEvent, |
| state: _RPCState, |
| response_deserializer: Optional[DeserializingFunction], |
| ) -> List[NullaryCallbackType]: |
| callbacks = [] |
| for batch_operation in event.batch_operations: |
| operation_type = batch_operation.type() |
| state.due.remove(operation_type) |
| if operation_type == cygrpc.OperationType.receive_initial_metadata: |
| state.initial_metadata = batch_operation.initial_metadata() |
| elif operation_type == cygrpc.OperationType.receive_message: |
| serialized_response = batch_operation.message() |
| if serialized_response is not None: |
| response = _common.deserialize( |
| serialized_response, response_deserializer |
| ) |
| if response is None: |
| details = "Exception deserializing response!" |
| _abort(state, grpc.StatusCode.INTERNAL, details) |
| else: |
| state.response = response |
| elif operation_type == cygrpc.OperationType.receive_status_on_client: |
| state.trailing_metadata = batch_operation.trailing_metadata() |
| if state.code is None: |
| code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( |
| batch_operation.code() |
| ) |
| if code is None: |
| state.code = grpc.StatusCode.UNKNOWN |
| state.details = _unknown_code_details( |
| code, batch_operation.details() |
| ) |
| else: |
| state.code = code |
| state.details = batch_operation.details() |
| state.debug_error_string = batch_operation.error_string() |
| state.rpc_end_time = time.perf_counter() |
| _observability.maybe_record_rpc_latency(state) |
| callbacks.extend(state.callbacks) |
| state.callbacks = None |
| return callbacks |
| |
| |
| def _event_handler( |
| state: _RPCState, response_deserializer: Optional[DeserializingFunction] |
| ) -> UserTag: |
| def handle_event(event): |
| with state.condition: |
| callbacks = _handle_event(event, state, response_deserializer) |
| state.condition.notify_all() |
| done = not state.due |
| for callback in callbacks: |
| try: |
| callback() |
| except Exception as e: # pylint: disable=broad-except |
| # NOTE(rbellevi): We suppress but log errors here so as not to |
| # kill the channel spin thread. |
| logging.error( |
| "Exception in callback %s: %s", repr(callback.func), repr(e) |
| ) |
| return done and state.fork_epoch >= cygrpc.get_fork_epoch() |
| |
| return handle_event |
| |
| |
| # TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. |
| # pylint: disable=too-many-statements |
| def _consume_request_iterator( |
| request_iterator: Iterator, |
| state: _RPCState, |
| call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], |
| request_serializer: SerializingFunction, |
| event_handler: Optional[UserTag], |
| ) -> None: |
| """Consume a request supplied by the user.""" |
| |
| def consume_request_iterator(): # pylint: disable=too-many-branches |
| # Iterate over the request iterator until it is exhausted or an error |
| # condition is encountered. |
| while True: |
| return_from_user_request_generator_invoked = False |
| try: |
| # The thread may die in user-code. Do not block fork for this. |
| cygrpc.enter_user_request_generator() |
| request = next(request_iterator) |
| except StopIteration: |
| break |
| except Exception: # pylint: disable=broad-except |
| cygrpc.return_from_user_request_generator() |
| return_from_user_request_generator_invoked = True |
| code = grpc.StatusCode.UNKNOWN |
| details = "Exception iterating requests!" |
| _LOGGER.exception(details) |
| call.cancel( |
| _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details |
| ) |
| _abort(state, code, details) |
| return |
| finally: |
| if not return_from_user_request_generator_invoked: |
| cygrpc.return_from_user_request_generator() |
| serialized_request = _common.serialize(request, request_serializer) |
| with state.condition: |
| if state.code is None and not state.cancelled: |
| if serialized_request is None: |
| code = grpc.StatusCode.INTERNAL |
| details = "Exception serializing request!" |
| call.cancel( |
| _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], |
| details, |
| ) |
| _abort(state, code, details) |
| return |
| else: |
| state.due.add(cygrpc.OperationType.send_message) |
| operations = ( |
| cygrpc.SendMessageOperation( |
| serialized_request, _EMPTY_FLAGS |
| ), |
| ) |
| operating = call.operate(operations, event_handler) |
| if not operating: |
| state.due.remove(cygrpc.OperationType.send_message) |
| return |
| |
| def _done(): |
| return ( |
| state.code is not None |
| or cygrpc.OperationType.send_message |
| not in state.due |
| ) |
| |
| _common.wait( |
| state.condition.wait, |
| _done, |
| spin_cb=functools.partial( |
| cygrpc.block_if_fork_in_progress, state |
| ), |
| ) |
| if state.code is not None: |
| return |
| else: |
| return |
| with state.condition: |
| if state.code is None: |
| state.due.add(cygrpc.OperationType.send_close_from_client) |
| operations = ( |
| cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), |
| ) |
| operating = call.operate(operations, event_handler) |
| if not operating: |
| state.due.remove( |
| cygrpc.OperationType.send_close_from_client |
| ) |
| |
| consumption_thread = cygrpc.ForkManagedThread( |
| target=consume_request_iterator |
| ) |
| consumption_thread.setDaemon(True) |
| consumption_thread.start() |
| |
| |
| def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: |
| """Calculates error string for RPC.""" |
| with rpc_state.condition: |
| if rpc_state.code is None: |
| return "<{} object>".format(class_name) |
| elif rpc_state.code is grpc.StatusCode.OK: |
| return _OK_RENDEZVOUS_REPR_FORMAT.format( |
| class_name, rpc_state.code, rpc_state.details |
| ) |
| else: |
| return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( |
| class_name, |
| rpc_state.code, |
| rpc_state.details, |
| rpc_state.debug_error_string, |
| ) |
| |
| |
| class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): |
| """An RPC error not tied to the execution of a particular RPC. |
| |
| The RPC represented by the state object must not be in-progress or |
| cancelled. |
| |
| Attributes: |
| _state: An instance of _RPCState. |
| """ |
| |
| _state: _RPCState |
| |
| def __init__(self, state: _RPCState): |
| with state.condition: |
| self._state = _RPCState( |
| (), |
| copy.deepcopy(state.initial_metadata), |
| copy.deepcopy(state.trailing_metadata), |
| state.code, |
| copy.deepcopy(state.details), |
| ) |
| self._state.response = copy.copy(state.response) |
| self._state.debug_error_string = copy.copy(state.debug_error_string) |
| |
| def initial_metadata(self) -> Optional[MetadataType]: |
| return self._state.initial_metadata |
| |
| def trailing_metadata(self) -> Optional[MetadataType]: |
| return self._state.trailing_metadata |
| |
| def code(self) -> Optional[grpc.StatusCode]: |
| return self._state.code |
| |
| def details(self) -> Optional[str]: |
| return _common.decode(self._state.details) |
| |
| def debug_error_string(self) -> Optional[str]: |
| return _common.decode(self._state.debug_error_string) |
| |
| def _repr(self) -> str: |
| return _rpc_state_string(self.__class__.__name__, self._state) |
| |
| def __repr__(self) -> str: |
| return self._repr() |
| |
| def __str__(self) -> str: |
| return self._repr() |
| |
| def cancel(self) -> bool: |
| """See grpc.Future.cancel.""" |
| return False |
| |
| def cancelled(self) -> bool: |
| """See grpc.Future.cancelled.""" |
| return False |
| |
| def running(self) -> bool: |
| """See grpc.Future.running.""" |
| return False |
| |
| def done(self) -> bool: |
| """See grpc.Future.done.""" |
| return True |
| |
| def result( |
| self, timeout: Optional[float] = None |
| ) -> Any: # pylint: disable=unused-argument |
| """See grpc.Future.result.""" |
| raise self |
| |
| def exception( |
| self, timeout: Optional[float] = None # pylint: disable=unused-argument |
| ) -> Optional[Exception]: |
| """See grpc.Future.exception.""" |
| return self |
| |
| def traceback( |
| self, timeout: Optional[float] = None # pylint: disable=unused-argument |
| ) -> Optional[types.TracebackType]: |
| """See grpc.Future.traceback.""" |
| try: |
| raise self |
| except grpc.RpcError: |
| return sys.exc_info()[2] |
| |
| def add_done_callback( |
| self, |
| fn: Callable[[grpc.Future], None], |
| timeout: Optional[float] = None, # pylint: disable=unused-argument |
| ) -> None: |
| """See grpc.Future.add_done_callback.""" |
| fn(self) |
| |
| |
| class _Rendezvous(grpc.RpcError, grpc.RpcContext): |
| """An RPC iterator. |
| |
| Attributes: |
| _state: An instance of _RPCState. |
| _call: An instance of SegregatedCall or IntegratedCall. |
| In either case, the _call object is expected to have operate, cancel, |
| and next_event methods. |
| _response_deserializer: A callable taking bytes and return a Python |
| object. |
| _deadline: A float representing the deadline of the RPC in seconds. Or |
| possibly None, to represent an RPC with no deadline at all. |
| """ |
| |
| _state: _RPCState |
| _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] |
| _response_deserializer: Optional[DeserializingFunction] |
| _deadline: Optional[float] |
| |
| def __init__( |
| self, |
| state: _RPCState, |
| call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], |
| response_deserializer: Optional[DeserializingFunction], |
| deadline: Optional[float], |
| ): |
| super(_Rendezvous, self).__init__() |
| self._state = state |
| self._call = call |
| self._response_deserializer = response_deserializer |
| self._deadline = deadline |
| |
| def is_active(self) -> bool: |
| """See grpc.RpcContext.is_active""" |
| with self._state.condition: |
| return self._state.code is None |
| |
| def time_remaining(self) -> Optional[float]: |
| """See grpc.RpcContext.time_remaining""" |
| with self._state.condition: |
| if self._deadline is None: |
| return None |
| else: |
| return max(self._deadline - time.time(), 0) |
| |
| def cancel(self) -> bool: |
| """See grpc.RpcContext.cancel""" |
| with self._state.condition: |
| if self._state.code is None: |
| code = grpc.StatusCode.CANCELLED |
| details = "Locally cancelled by application!" |
| self._call.cancel( |
| _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details |
| ) |
| self._state.cancelled = True |
| _abort(self._state, code, details) |
| self._state.condition.notify_all() |
| return True |
| else: |
| return False |
| |
| def add_callback(self, callback: NullaryCallbackType) -> bool: |
| """See grpc.RpcContext.add_callback""" |
| with self._state.condition: |
| if self._state.callbacks is None: |
| return False |
| else: |
| self._state.callbacks.append(callback) |
| return True |
| |
| def __iter__(self): |
| return self |
| |
| def next(self): |
| return self._next() |
| |
| def __next__(self): |
| return self._next() |
| |
| def _next(self): |
| raise NotImplementedError() |
| |
| def debug_error_string(self) -> Optional[str]: |
| raise NotImplementedError() |
| |
| def _repr(self) -> str: |
| return _rpc_state_string(self.__class__.__name__, self._state) |
| |
| def __repr__(self) -> str: |
| return self._repr() |
| |
| def __str__(self) -> str: |
| return self._repr() |
| |
| def __del__(self) -> None: |
| with self._state.condition: |
| if self._state.code is None: |
| self._state.code = grpc.StatusCode.CANCELLED |
| self._state.details = "Cancelled upon garbage collection!" |
| self._state.cancelled = True |
| self._call.cancel( |
| _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], |
| self._state.details, |
| ) |
| self._state.condition.notify_all() |
| |
| |
| class _SingleThreadedRendezvous( |
| _Rendezvous, grpc.Call, grpc.Future |
| ): # pylint: disable=too-many-ancestors |
| """An RPC iterator operating entirely on a single thread. |
| |
| The __next__ method of _SingleThreadedRendezvous does not depend on the |
| existence of any other thread, including the "channel spin thread". |
| However, this means that its interface is entirely synchronous. So this |
| class cannot completely fulfill the grpc.Future interface. The result, |
| exception, and traceback methods will never block and will instead raise |
| an exception if calling the method would result in blocking. |
| |
| This means that these methods are safe to call from add_done_callback |
| handlers. |
| """ |
| |
| _state: _RPCState |
| |
| def _is_complete(self) -> bool: |
| return self._state.code is not None |
| |
| def cancelled(self) -> bool: |
| with self._state.condition: |
| return self._state.cancelled |
| |
| def running(self) -> bool: |
| with self._state.condition: |
| return self._state.code is None |
| |
| def done(self) -> bool: |
| with self._state.condition: |
| return self._state.code is not None |
| |
| def result(self, timeout: Optional[float] = None) -> Any: |
| """Returns the result of the computation or raises its exception. |
| |
| This method will never block. Instead, it will raise an exception |
| if calling this method would otherwise result in blocking. |
| |
| Since this method will never block, any `timeout` argument passed will |
| be ignored. |
| """ |
| del timeout |
| with self._state.condition: |
| if not self._is_complete(): |
| raise grpc.experimental.UsageError( |
| "_SingleThreadedRendezvous only supports result() when the" |
| " RPC is complete." |
| ) |
| if self._state.code is grpc.StatusCode.OK: |
| return self._state.response |
| elif self._state.cancelled: |
| raise grpc.FutureCancelledError() |
| else: |
| raise self |
| |
| def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: |
| """Return the exception raised by the computation. |
| |
| This method will never block. Instead, it will raise an exception |
| if calling this method would otherwise result in blocking. |
| |
| Since this method will never block, any `timeout` argument passed will |
| be ignored. |
| """ |
| del timeout |
| with self._state.condition: |
| if not self._is_complete(): |
| raise grpc.experimental.UsageError( |
| "_SingleThreadedRendezvous only supports exception() when" |
| " the RPC is complete." |
| ) |
| if self._state.code is grpc.StatusCode.OK: |
| return None |
| elif self._state.cancelled: |
| raise grpc.FutureCancelledError() |
| else: |
| return self |
| |
| def traceback( |
| self, timeout: Optional[float] = None |
| ) -> Optional[types.TracebackType]: |
| """Access the traceback of the exception raised by the computation. |
| |
| This method will never block. Instead, it will raise an exception |
| if calling this method would otherwise result in blocking. |
| |
| Since this method will never block, any `timeout` argument passed will |
| be ignored. |
| """ |
| del timeout |
| with self._state.condition: |
| if not self._is_complete(): |
| raise grpc.experimental.UsageError( |
| "_SingleThreadedRendezvous only supports traceback() when" |
| " the RPC is complete." |
| ) |
| if self._state.code is grpc.StatusCode.OK: |
| return None |
| elif self._state.cancelled: |
| raise grpc.FutureCancelledError() |
| else: |
| try: |
| raise self |
| except grpc.RpcError: |
| return sys.exc_info()[2] |
| |
| def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: |
| with self._state.condition: |
| if self._state.code is None: |
| self._state.callbacks.append(functools.partial(fn, self)) |
| return |
| |
| fn(self) |
| |
| def initial_metadata(self) -> Optional[MetadataType]: |
| """See grpc.Call.initial_metadata""" |
| with self._state.condition: |
| # NOTE(gnossen): Based on our initial call batch, we are guaranteed |
| # to receive initial metadata before any messages. |
| while self._state.initial_metadata is None: |
| self._consume_next_event() |
| return self._state.initial_metadata |
| |
| def trailing_metadata(self) -> Optional[MetadataType]: |
| """See grpc.Call.trailing_metadata""" |
| with self._state.condition: |
| if self._state.trailing_metadata is None: |
| raise grpc.experimental.UsageError( |
| "Cannot get trailing metadata until RPC is completed." |
| ) |
| return self._state.trailing_metadata |
| |
| def code(self) -> Optional[grpc.StatusCode]: |
| """See grpc.Call.code""" |
| with self._state.condition: |
| if self._state.code is None: |
| raise grpc.experimental.UsageError( |
| "Cannot get code until RPC is completed." |
| ) |
| return self._state.code |
| |
| def details(self) -> Optional[str]: |
| """See grpc.Call.details""" |
| with self._state.condition: |
| if self._state.details is None: |
| raise grpc.experimental.UsageError( |
| "Cannot get details until RPC is completed." |
| ) |
| return _common.decode(self._state.details) |
| |
| def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: |
| event = self._call.next_event() |
| with self._state.condition: |
| callbacks = _handle_event( |
| event, self._state, self._response_deserializer |
| ) |
| for callback in callbacks: |
| # NOTE(gnossen): We intentionally allow exceptions to bubble up |
| # to the user when running on a single thread. |
| callback() |
| return event |
| |
| def _next_response(self) -> Any: |
| while True: |
| self._consume_next_event() |
| with self._state.condition: |
| if self._state.response is not None: |
| response = self._state.response |
| self._state.response = None |
| return response |
| elif ( |
| cygrpc.OperationType.receive_message not in self._state.due |
| ): |
| if self._state.code is grpc.StatusCode.OK: |
| raise StopIteration() |
| elif self._state.code is not None: |
| raise self |
| |
| def _next(self) -> Any: |
| with self._state.condition: |
| if self._state.code is None: |
| # We tentatively add the operation as expected and remove |
| # it if the enqueue operation fails. This allows us to guarantee that |
| # if an event has been submitted to the core completion queue, |
| # it is in `due`. If we waited until after a successful |
| # enqueue operation then a signal could interrupt this |
| # thread between the enqueue operation and the addition of the |
| # operation to `due`. This would cause an exception on the |
| # channel spin thread when the operation completes and no |
| # corresponding operation would be present in state.due. |
| # Note that, since `condition` is held through this block, there is |
| # no data race on `due`. |
| self._state.due.add(cygrpc.OperationType.receive_message) |
| operating = self._call.operate( |
| (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None |
| ) |
| if not operating: |
| self._state.due.remove(cygrpc.OperationType.receive_message) |
| elif self._state.code is grpc.StatusCode.OK: |
| raise StopIteration() |
| else: |
| raise self |
| return self._next_response() |
| |
| def debug_error_string(self) -> Optional[str]: |
| with self._state.condition: |
| if self._state.debug_error_string is None: |
| raise grpc.experimental.UsageError( |
| "Cannot get debug error string until RPC is completed." |
| ) |
| return _common.decode(self._state.debug_error_string) |
| |
| |
| class _MultiThreadedRendezvous( |
| _Rendezvous, grpc.Call, grpc.Future |
| ): # pylint: disable=too-many-ancestors |
| """An RPC iterator that depends on a channel spin thread. |
| |
| This iterator relies upon a per-channel thread running in the background, |
| dequeueing events from the completion queue, and notifying threads waiting |
| on the threading.Condition object in the _RPCState object. |
| |
| This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface |
| and to mediate a bidirection streaming RPC. |
| """ |
| |
| _state: _RPCState |
| |
| def initial_metadata(self) -> Optional[MetadataType]: |
| """See grpc.Call.initial_metadata""" |
| with self._state.condition: |
| |
| def _done(): |
| return self._state.initial_metadata is not None |
| |
| _common.wait(self._state.condition.wait, _done) |
| return self._state.initial_metadata |
| |
| def trailing_metadata(self) -> Optional[MetadataType]: |
| """See grpc.Call.trailing_metadata""" |
| with self._state.condition: |
| |
| def _done(): |
| return self._state.trailing_metadata is not None |
| |
| _common.wait(self._state.condition.wait, _done) |
| return self._state.trailing_metadata |
| |
| def code(self) -> Optional[grpc.StatusCode]: |
| """See grpc.Call.code""" |
| with self._state.condition: |
| |
| def _done(): |
| return self._state.code is not None |
| |
| _common.wait(self._state.condition.wait, _done) |
| return self._state.code |
| |
| def details(self) -> Optional[str]: |
| """See grpc.Call.details""" |
| with self._state.condition: |
| |
| def _done(): |
| return self._state.details is not None |
| |
| _common.wait(self._state.condition.wait, _done) |
| return _common.decode(self._state.details) |
| |
| def debug_error_string(self) -> Optional[str]: |
| with self._state.condition: |
| |
| def _done(): |
| return self._state.debug_error_string is not None |
| |
| _common.wait(self._state.condition.wait, _done) |
| return _common.decode(self._state.debug_error_string) |
| |
| def cancelled(self) -> bool: |
| with self._state.condition: |
| return self._state.cancelled |
| |
| def running(self) -> bool: |
| with self._state.condition: |
| return self._state.code is None |
| |
| def done(self) -> bool: |
| with self._state.condition: |
| return self._state.code is not None |
| |
| def _is_complete(self) -> bool: |
| return self._state.code is not None |
| |
| def result(self, timeout: Optional[float] = None) -> Any: |
| """Returns the result of the computation or raises its exception. |
| |
| See grpc.Future.result for the full API contract. |
| """ |
| with self._state.condition: |
| timed_out = _common.wait( |
| self._state.condition.wait, self._is_complete, timeout=timeout |
| ) |
| if timed_out: |
| raise grpc.FutureTimeoutError() |
| else: |
| if self._state.code is grpc.StatusCode.OK: |
| return self._state.response |
| elif self._state.cancelled: |
| raise grpc.FutureCancelledError() |
| else: |
| raise self |
| |
| def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: |
| """Return the exception raised by the computation. |
| |
| See grpc.Future.exception for the full API contract. |
| """ |
| with self._state.condition: |
| timed_out = _common.wait( |
| self._state.condition.wait, self._is_complete, timeout=timeout |
| ) |
| if timed_out: |
| raise grpc.FutureTimeoutError() |
| else: |
| if self._state.code is grpc.StatusCode.OK: |
| return None |
| elif self._state.cancelled: |
| raise grpc.FutureCancelledError() |
| else: |
| return self |
| |
| def traceback( |
| self, timeout: Optional[float] = None |
| ) -> Optional[types.TracebackType]: |
| """Access the traceback of the exception raised by the computation. |
| |
| See grpc.future.traceback for the full API contract. |
| """ |
| with self._state.condition: |
| timed_out = _common.wait( |
| self._state.condition.wait, self._is_complete, timeout=timeout |
| ) |
| if timed_out: |
| raise grpc.FutureTimeoutError() |
| else: |
| if self._state.code is grpc.StatusCode.OK: |
| return None |
| elif self._state.cancelled: |
| raise grpc.FutureCancelledError() |
| else: |
| try: |
| raise self |
| except grpc.RpcError: |
| return sys.exc_info()[2] |
| |
| def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: |
| with self._state.condition: |
| if self._state.code is None: |
| self._state.callbacks.append(functools.partial(fn, self)) |
| return |
| |
| fn(self) |
| |
| def _next(self) -> Any: |
| with self._state.condition: |
| if self._state.code is None: |
| event_handler = _event_handler( |
| self._state, self._response_deserializer |
| ) |
| self._state.due.add(cygrpc.OperationType.receive_message) |
| operating = self._call.operate( |
| (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), |
| event_handler, |
| ) |
| if not operating: |
| self._state.due.remove(cygrpc.OperationType.receive_message) |
| elif self._state.code is grpc.StatusCode.OK: |
| raise StopIteration() |
| else: |
| raise self |
| |
| def _response_ready(): |
| return self._state.response is not None or ( |
| cygrpc.OperationType.receive_message not in self._state.due |
| and self._state.code is not None |
| ) |
| |
| _common.wait(self._state.condition.wait, _response_ready) |
| if self._state.response is not None: |
| response = self._state.response |
| self._state.response = None |
| return response |
| elif cygrpc.OperationType.receive_message not in self._state.due: |
| if self._state.code is grpc.StatusCode.OK: |
| raise StopIteration() |
| elif self._state.code is not None: |
| raise self |
| |
| |
| def _start_unary_request( |
| request: Any, |
| timeout: Optional[float], |
| request_serializer: SerializingFunction, |
| ) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: |
| deadline = _deadline(timeout) |
| serialized_request = _common.serialize(request, request_serializer) |
| if serialized_request is None: |
| state = _RPCState( |
| (), |
| (), |
| (), |
| grpc.StatusCode.INTERNAL, |
| "Exception serializing request!", |
| ) |
| error = _InactiveRpcError(state) |
| return deadline, None, error |
| else: |
| return deadline, serialized_request, None |
| |
| |
| def _end_unary_response_blocking( |
| state: _RPCState, |
| call: cygrpc.SegregatedCall, |
| with_call: bool, |
| deadline: Optional[float], |
| ) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: |
| if state.code is grpc.StatusCode.OK: |
| if with_call: |
| rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) |
| return state.response, rendezvous |
| else: |
| return state.response |
| else: |
| raise _InactiveRpcError(state) # pytype: disable=not-instantiable |
| |
| |
| def _stream_unary_invocation_operations( |
| metadata: Optional[MetadataType], initial_metadata_flags: int |
| ) -> Sequence[Sequence[cygrpc.Operation]]: |
| return ( |
| ( |
| cygrpc.SendInitialMetadataOperation( |
| metadata, initial_metadata_flags |
| ), |
| cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), |
| cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), |
| ), |
| (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), |
| ) |
| |
| |
| def _stream_unary_invocation_operations_and_tags( |
| metadata: Optional[MetadataType], initial_metadata_flags: int |
| ) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: |
| return tuple( |
| ( |
| operations, |
| None, |
| ) |
| for operations in _stream_unary_invocation_operations( |
| metadata, initial_metadata_flags |
| ) |
| ) |
| |
| |
| def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: |
| parent_deadline = cygrpc.get_deadline_from_context() |
| if parent_deadline is None and user_deadline is None: |
| return None |
| elif parent_deadline is not None and user_deadline is None: |
| return parent_deadline |
| elif user_deadline is not None and parent_deadline is None: |
| return user_deadline |
| else: |
| return min(parent_deadline, user_deadline) |
| |
| |
| class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): |
| _channel: cygrpc.Channel |
| _managed_call: IntegratedCallFactory |
| _method: bytes |
| _target: bytes |
| _request_serializer: Optional[SerializingFunction] |
| _response_deserializer: Optional[DeserializingFunction] |
| _context: Any |
| _registered_call_handle: Optional[int] |
| |
| __slots__ = [ |
| "_channel", |
| "_managed_call", |
| "_method", |
| "_target", |
| "_request_serializer", |
| "_response_deserializer", |
| "_context", |
| ] |
| |
| # pylint: disable=too-many-arguments |
| def __init__( |
| self, |
| channel: cygrpc.Channel, |
| managed_call: IntegratedCallFactory, |
| method: bytes, |
| target: bytes, |
| request_serializer: Optional[SerializingFunction], |
| response_deserializer: Optional[DeserializingFunction], |
| _registered_call_handle: Optional[int], |
| ): |
| self._channel = channel |
| self._managed_call = managed_call |
| self._method = method |
| self._target = target |
| self._request_serializer = request_serializer |
| self._response_deserializer = response_deserializer |
| self._context = cygrpc.build_census_context() |
| self._registered_call_handle = _registered_call_handle |
| |
| def _prepare( |
| self, |
| request: Any, |
| timeout: Optional[float], |
| metadata: Optional[MetadataType], |
| wait_for_ready: Optional[bool], |
| compression: Optional[grpc.Compression], |
| ) -> Tuple[ |
| Optional[_RPCState], |
| Optional[Sequence[cygrpc.Operation]], |
| Optional[float], |
| Optional[grpc.RpcError], |
| ]: |
| deadline, serialized_request, rendezvous = _start_unary_request( |
| request, timeout, self._request_serializer |
| ) |
| initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( |
| wait_for_ready |
| ) |
| augmented_metadata = _compression.augment_metadata( |
| metadata, compression |
| ) |
| if serialized_request is None: |
| return None, None, None, rendezvous |
| else: |
| state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) |
| operations = ( |
| cygrpc.SendInitialMetadataOperation( |
| augmented_metadata, initial_metadata_flags |
| ), |
| cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), |
| cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), |
| cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), |
| cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), |
| cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), |
| ) |
| return state, operations, deadline, None |
| |
| def _blocking( |
| self, |
| request: Any, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: |
| state, operations, deadline, rendezvous = self._prepare( |
| request, timeout, metadata, wait_for_ready, compression |
| ) |
| if state is None: |
| raise rendezvous # pylint: disable-msg=raising-bad-type |
| else: |
| state.rpc_start_time = time.perf_counter() |
| state.method = _common.decode(self._method) |
| state.target = _common.decode(self._target) |
| call = self._channel.segregated_call( |
| cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, |
| self._method, |
| None, |
| _determine_deadline(deadline), |
| metadata, |
| None if credentials is None else credentials._credentials, |
| ( |
| ( |
| operations, |
| None, |
| ), |
| ), |
| self._context, |
| self._registered_call_handle, |
| ) |
| event = call.next_event() |
| _handle_event(event, state, self._response_deserializer) |
| return state, call |
| |
| def __call__( |
| self, |
| request: Any, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> Any: |
| ( |
| state, |
| call, |
| ) = self._blocking( |
| request, timeout, metadata, credentials, wait_for_ready, compression |
| ) |
| return _end_unary_response_blocking(state, call, False, None) |
| |
| def with_call( |
| self, |
| request: Any, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> Tuple[Any, grpc.Call]: |
| ( |
| state, |
| call, |
| ) = self._blocking( |
| request, timeout, metadata, credentials, wait_for_ready, compression |
| ) |
| return _end_unary_response_blocking(state, call, True, None) |
| |
| def future( |
| self, |
| request: Any, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> _MultiThreadedRendezvous: |
| state, operations, deadline, rendezvous = self._prepare( |
| request, timeout, metadata, wait_for_ready, compression |
| ) |
| if state is None: |
| raise rendezvous # pylint: disable-msg=raising-bad-type |
| else: |
| event_handler = _event_handler(state, self._response_deserializer) |
| state.rpc_start_time = time.perf_counter() |
| state.method = _common.decode(self._method) |
| state.target = _common.decode(self._target) |
| call = self._managed_call( |
| cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, |
| self._method, |
| None, |
| deadline, |
| metadata, |
| None if credentials is None else credentials._credentials, |
| (operations,), |
| event_handler, |
| self._context, |
| self._registered_call_handle, |
| ) |
| return _MultiThreadedRendezvous( |
| state, call, self._response_deserializer, deadline |
| ) |
| |
| |
| class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): |
| _channel: cygrpc.Channel |
| _method: bytes |
| _target: bytes |
| _request_serializer: Optional[SerializingFunction] |
| _response_deserializer: Optional[DeserializingFunction] |
| _context: Any |
| _registered_call_handle: Optional[int] |
| |
| __slots__ = [ |
| "_channel", |
| "_method", |
| "_target", |
| "_request_serializer", |
| "_response_deserializer", |
| "_context", |
| ] |
| |
| # pylint: disable=too-many-arguments |
| def __init__( |
| self, |
| channel: cygrpc.Channel, |
| method: bytes, |
| target: bytes, |
| request_serializer: SerializingFunction, |
| response_deserializer: DeserializingFunction, |
| _registered_call_handle: Optional[int], |
| ): |
| self._channel = channel |
| self._method = method |
| self._target = target |
| self._request_serializer = request_serializer |
| self._response_deserializer = response_deserializer |
| self._context = cygrpc.build_census_context() |
| self._registered_call_handle = _registered_call_handle |
| |
| def __call__( # pylint: disable=too-many-locals |
| self, |
| request: Any, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> _SingleThreadedRendezvous: |
| deadline = _deadline(timeout) |
| serialized_request = _common.serialize( |
| request, self._request_serializer |
| ) |
| if serialized_request is None: |
| state = _RPCState( |
| (), |
| (), |
| (), |
| grpc.StatusCode.INTERNAL, |
| "Exception serializing request!", |
| ) |
| raise _InactiveRpcError(state) |
| |
| state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) |
| call_credentials = ( |
| None if credentials is None else credentials._credentials |
| ) |
| initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( |
| wait_for_ready |
| ) |
| augmented_metadata = _compression.augment_metadata( |
| metadata, compression |
| ) |
| operations = ( |
| ( |
| cygrpc.SendInitialMetadataOperation( |
| augmented_metadata, initial_metadata_flags |
| ), |
| cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), |
| cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), |
| ), |
| (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), |
| (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), |
| ) |
| operations_and_tags = tuple((ops, None) for ops in operations) |
| state.rpc_start_time = time.perf_counter() |
| state.method = _common.decode(self._method) |
| state.target = _common.decode(self._target) |
| call = self._channel.segregated_call( |
| cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, |
| self._method, |
| None, |
| _determine_deadline(deadline), |
| metadata, |
| call_credentials, |
| operations_and_tags, |
| self._context, |
| self._registered_call_handle, |
| ) |
| return _SingleThreadedRendezvous( |
| state, call, self._response_deserializer, deadline |
| ) |
| |
| |
| class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): |
| _channel: cygrpc.Channel |
| _managed_call: IntegratedCallFactory |
| _method: bytes |
| _target: bytes |
| _request_serializer: Optional[SerializingFunction] |
| _response_deserializer: Optional[DeserializingFunction] |
| _context: Any |
| _registered_call_handle: Optional[int] |
| |
| __slots__ = [ |
| "_channel", |
| "_managed_call", |
| "_method", |
| "_target", |
| "_request_serializer", |
| "_response_deserializer", |
| "_context", |
| ] |
| |
| # pylint: disable=too-many-arguments |
| def __init__( |
| self, |
| channel: cygrpc.Channel, |
| managed_call: IntegratedCallFactory, |
| method: bytes, |
| target: bytes, |
| request_serializer: SerializingFunction, |
| response_deserializer: DeserializingFunction, |
| _registered_call_handle: Optional[int], |
| ): |
| self._channel = channel |
| self._managed_call = managed_call |
| self._method = method |
| self._target = target |
| self._request_serializer = request_serializer |
| self._response_deserializer = response_deserializer |
| self._context = cygrpc.build_census_context() |
| self._registered_call_handle = _registered_call_handle |
| |
| def __call__( # pylint: disable=too-many-locals |
| self, |
| request: Any, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> _MultiThreadedRendezvous: |
| deadline, serialized_request, rendezvous = _start_unary_request( |
| request, timeout, self._request_serializer |
| ) |
| initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( |
| wait_for_ready |
| ) |
| if serialized_request is None: |
| raise rendezvous # pylint: disable-msg=raising-bad-type |
| else: |
| augmented_metadata = _compression.augment_metadata( |
| metadata, compression |
| ) |
| state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) |
| operations = ( |
| ( |
| cygrpc.SendInitialMetadataOperation( |
| augmented_metadata, initial_metadata_flags |
| ), |
| cygrpc.SendMessageOperation( |
| serialized_request, _EMPTY_FLAGS |
| ), |
| cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), |
| cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), |
| ), |
| (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), |
| ) |
| state.rpc_start_time = time.perf_counter() |
| state.method = _common.decode(self._method) |
| state.target = _common.decode(self._target) |
| call = self._managed_call( |
| cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, |
| self._method, |
| None, |
| _determine_deadline(deadline), |
| metadata, |
| None if credentials is None else credentials._credentials, |
| operations, |
| _event_handler(state, self._response_deserializer), |
| self._context, |
| self._registered_call_handle, |
| ) |
| return _MultiThreadedRendezvous( |
| state, call, self._response_deserializer, deadline |
| ) |
| |
| |
| class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): |
| _channel: cygrpc.Channel |
| _managed_call: IntegratedCallFactory |
| _method: bytes |
| _target: bytes |
| _request_serializer: Optional[SerializingFunction] |
| _response_deserializer: Optional[DeserializingFunction] |
| _context: Any |
| _registered_call_handle: Optional[int] |
| |
| __slots__ = [ |
| "_channel", |
| "_managed_call", |
| "_method", |
| "_target", |
| "_request_serializer", |
| "_response_deserializer", |
| "_context", |
| ] |
| |
| # pylint: disable=too-many-arguments |
| def __init__( |
| self, |
| channel: cygrpc.Channel, |
| managed_call: IntegratedCallFactory, |
| method: bytes, |
| target: bytes, |
| request_serializer: Optional[SerializingFunction], |
| response_deserializer: Optional[DeserializingFunction], |
| _registered_call_handle: Optional[int], |
| ): |
| self._channel = channel |
| self._managed_call = managed_call |
| self._method = method |
| self._target = target |
| self._request_serializer = request_serializer |
| self._response_deserializer = response_deserializer |
| self._context = cygrpc.build_census_context() |
| self._registered_call_handle = _registered_call_handle |
| |
| def _blocking( |
| self, |
| request_iterator: Iterator, |
| timeout: Optional[float], |
| metadata: Optional[MetadataType], |
| credentials: Optional[grpc.CallCredentials], |
| wait_for_ready: Optional[bool], |
| compression: Optional[grpc.Compression], |
| ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: |
| deadline = _deadline(timeout) |
| state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) |
| initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( |
| wait_for_ready |
| ) |
| augmented_metadata = _compression.augment_metadata( |
| metadata, compression |
| ) |
| state.rpc_start_time = time.perf_counter() |
| state.method = _common.decode(self._method) |
| state.target = _common.decode(self._target) |
| call = self._channel.segregated_call( |
| cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, |
| self._method, |
| None, |
| _determine_deadline(deadline), |
| augmented_metadata, |
| None if credentials is None else credentials._credentials, |
| _stream_unary_invocation_operations_and_tags( |
| augmented_metadata, initial_metadata_flags |
| ), |
| self._context, |
| self._registered_call_handle, |
| ) |
| _consume_request_iterator( |
| request_iterator, state, call, self._request_serializer, None |
| ) |
| while True: |
| event = call.next_event() |
| with state.condition: |
| _handle_event(event, state, self._response_deserializer) |
| state.condition.notify_all() |
| if not state.due: |
| break |
| return state, call |
| |
| def __call__( |
| self, |
| request_iterator: Iterator, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> Any: |
| ( |
| state, |
| call, |
| ) = self._blocking( |
| request_iterator, |
| timeout, |
| metadata, |
| credentials, |
| wait_for_ready, |
| compression, |
| ) |
| return _end_unary_response_blocking(state, call, False, None) |
| |
| def with_call( |
| self, |
| request_iterator: Iterator, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> Tuple[Any, grpc.Call]: |
| ( |
| state, |
| call, |
| ) = self._blocking( |
| request_iterator, |
| timeout, |
| metadata, |
| credentials, |
| wait_for_ready, |
| compression, |
| ) |
| return _end_unary_response_blocking(state, call, True, None) |
| |
| def future( |
| self, |
| request_iterator: Iterator, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> _MultiThreadedRendezvous: |
| deadline = _deadline(timeout) |
| state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) |
| event_handler = _event_handler(state, self._response_deserializer) |
| initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( |
| wait_for_ready |
| ) |
| augmented_metadata = _compression.augment_metadata( |
| metadata, compression |
| ) |
| state.rpc_start_time = time.perf_counter() |
| state.method = _common.decode(self._method) |
| state.target = _common.decode(self._target) |
| call = self._managed_call( |
| cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, |
| self._method, |
| None, |
| deadline, |
| augmented_metadata, |
| None if credentials is None else credentials._credentials, |
| _stream_unary_invocation_operations( |
| metadata, initial_metadata_flags |
| ), |
| event_handler, |
| self._context, |
| self._registered_call_handle, |
| ) |
| _consume_request_iterator( |
| request_iterator, |
| state, |
| call, |
| self._request_serializer, |
| event_handler, |
| ) |
| return _MultiThreadedRendezvous( |
| state, call, self._response_deserializer, deadline |
| ) |
| |
| |
| class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): |
| _channel: cygrpc.Channel |
| _managed_call: IntegratedCallFactory |
| _method: bytes |
| _target: bytes |
| _request_serializer: Optional[SerializingFunction] |
| _response_deserializer: Optional[DeserializingFunction] |
| _context: Any |
| _registered_call_handle: Optional[int] |
| |
| __slots__ = [ |
| "_channel", |
| "_managed_call", |
| "_method", |
| "_target", |
| "_request_serializer", |
| "_response_deserializer", |
| "_context", |
| ] |
| |
| # pylint: disable=too-many-arguments |
| def __init__( |
| self, |
| channel: cygrpc.Channel, |
| managed_call: IntegratedCallFactory, |
| method: bytes, |
| target: bytes, |
| request_serializer: Optional[SerializingFunction], |
| response_deserializer: Optional[DeserializingFunction], |
| _registered_call_handle: Optional[int], |
| ): |
| self._channel = channel |
| self._managed_call = managed_call |
| self._method = method |
| self._target = target |
| self._request_serializer = request_serializer |
| self._response_deserializer = response_deserializer |
| self._context = cygrpc.build_census_context() |
| self._registered_call_handle = _registered_call_handle |
| |
| def __call__( |
| self, |
| request_iterator: Iterator, |
| timeout: Optional[float] = None, |
| metadata: Optional[MetadataType] = None, |
| credentials: Optional[grpc.CallCredentials] = None, |
| wait_for_ready: Optional[bool] = None, |
| compression: Optional[grpc.Compression] = None, |
| ) -> _MultiThreadedRendezvous: |
| deadline = _deadline(timeout) |
| state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) |
| initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( |
| wait_for_ready |
| ) |
| augmented_metadata = _compression.augment_metadata( |
| metadata, compression |
| ) |
| operations = ( |
| ( |
| cygrpc.SendInitialMetadataOperation( |
| augmented_metadata, initial_metadata_flags |
| ), |
| cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), |
| ), |
| (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), |
| ) |
| event_handler = _event_handler(state, self._response_deserializer) |
| state.rpc_start_time = time.perf_counter() |
| state.method = _common.decode(self._method) |
| state.target = _common.decode(self._target) |
| call = self._managed_call( |
| cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, |
| self._method, |
| None, |
| _determine_deadline(deadline), |
| augmented_metadata, |
| None if credentials is None else credentials._credentials, |
| operations, |
| event_handler, |
| self._context, |
| self._registered_call_handle, |
| ) |
| _consume_request_iterator( |
| request_iterator, |
| state, |
| call, |
| self._request_serializer, |
| event_handler, |
| ) |
| return _MultiThreadedRendezvous( |
| state, call, self._response_deserializer, deadline |
| ) |
| |
| |
| class _InitialMetadataFlags(int): |
| """Stores immutable initial metadata flags""" |
| |
| def __new__(cls, value: int = _EMPTY_FLAGS): |
| value &= cygrpc.InitialMetadataFlags.used_mask |
| return super(_InitialMetadataFlags, cls).__new__(cls, value) |
| |
| def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: |
| if wait_for_ready is not None: |
| if wait_for_ready: |
| return self.__class__( |
| self |
| | cygrpc.InitialMetadataFlags.wait_for_ready |
| | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set |
| ) |
| elif not wait_for_ready: |
| return self.__class__( |
| self & ~cygrpc.InitialMetadataFlags.wait_for_ready |
| | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set |
| ) |
| return self |
| |
| |
| class _ChannelCallState(object): |
| channel: cygrpc.Channel |
| managed_calls: int |
| threading: bool |
| |
| def __init__(self, channel: cygrpc.Channel): |
| self.lock = threading.Lock() |
| self.channel = channel |
| self.managed_calls = 0 |
| self.threading = False |
| |
| def reset_postfork_child(self) -> None: |
| self.managed_calls = 0 |
| |
| def __del__(self): |
| try: |
| self.channel.close( |
| cygrpc.StatusCode.cancelled, "Channel deallocated!" |
| ) |
| except (TypeError, AttributeError): |
| pass |
| |
| |
| def _run_channel_spin_thread(state: _ChannelCallState) -> None: |
| def channel_spin(): |
| while True: |
| cygrpc.block_if_fork_in_progress(state) |
| event = state.channel.next_call_event() |
| if event.completion_type == cygrpc.CompletionType.queue_timeout: |
| continue |
| call_completed = event.tag(event) |
| if call_completed: |
| with state.lock: |
| state.managed_calls -= 1 |
| if state.managed_calls == 0: |
| return |
| |
| channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) |
| channel_spin_thread.setDaemon(True) |
| channel_spin_thread.start() |
| |
| |
| def _channel_managed_call_management(state: _ChannelCallState): |
| # pylint: disable=too-many-arguments |
| def create( |
| flags: int, |
| method: bytes, |
| host: Optional[str], |
| deadline: Optional[float], |
| metadata: Optional[MetadataType], |
| credentials: Optional[cygrpc.CallCredentials], |
| operations: Sequence[Sequence[cygrpc.Operation]], |
| event_handler: UserTag, |
| context: Any, |
| _registered_call_handle: Optional[int], |
| ) -> cygrpc.IntegratedCall: |
| """Creates a cygrpc.IntegratedCall. |
| |
| Args: |
| flags: An integer bitfield of call flags. |
| method: The RPC method. |
| host: A host string for the created call. |
| deadline: A float to be the deadline of the created call or None if |
| the call is to have an infinite deadline. |
| metadata: The metadata for the call or None. |
| credentials: A cygrpc.CallCredentials or None. |
| operations: A sequence of sequences of cygrpc.Operations to be |
| started on the call. |
| event_handler: A behavior to call to handle the events resultant from |
| the operations on the call. |
| context: Context object for distributed tracing. |
| _registered_call_handle: An int representing the call handle of the |
| method, or None if the method is not registered. |
| Returns: |
| A cygrpc.IntegratedCall with which to conduct an RPC. |
| """ |
| operations_and_tags = tuple( |
| ( |
| operation, |
| event_handler, |
| ) |
| for operation in operations |
| ) |
| with state.lock: |
| call = state.channel.integrated_call( |
| flags, |
| method, |
| host, |
| deadline, |
| metadata, |
| credentials, |
| operations_and_tags, |
| context, |
| _registered_call_handle, |
| ) |
| if state.managed_calls == 0: |
| state.managed_calls = 1 |
| _run_channel_spin_thread(state) |
| else: |
| state.managed_calls += 1 |
| return call |
| |
| return create |
| |
| |
| class _ChannelConnectivityState(object): |
| lock: threading.RLock |
| channel: grpc.Channel |
| polling: bool |
| connectivity: grpc.ChannelConnectivity |
| try_to_connect: bool |
| # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 |
| callbacks_and_connectivities: List[ |
| Sequence[ |
| Union[ |
| Callable[[grpc.ChannelConnectivity], None], |
| Optional[grpc.ChannelConnectivity], |
| ] |
| ] |
| ] |
| delivering: bool |
| |
| def __init__(self, channel: grpc.Channel): |
| self.lock = threading.RLock() |
| self.channel = channel |
| self.polling = False |
| self.connectivity = None |
| self.try_to_connect = False |
| self.callbacks_and_connectivities = [] |
| self.delivering = False |
| |
| def reset_postfork_child(self) -> None: |
| self.polling = False |
| self.connectivity = None |
| self.try_to_connect = False |
| self.callbacks_and_connectivities = [] |
| self.delivering = False |
| |
| |
| def _deliveries( |
| state: _ChannelConnectivityState, |
| ) -> List[Callable[[grpc.ChannelConnectivity], None]]: |
| callbacks_needing_update = [] |
| for callback_and_connectivity in state.callbacks_and_connectivities: |
| ( |
| callback, |
| callback_connectivity, |
| ) = callback_and_connectivity |
| if callback_connectivity is not state.connectivity: |
| callbacks_needing_update.append(callback) |
| callback_and_connectivity[1] = state.connectivity |
| return callbacks_needing_update |
| |
| |
| def _deliver( |
| state: _ChannelConnectivityState, |
| initial_connectivity: grpc.ChannelConnectivity, |
| initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], |
| ) -> None: |
| connectivity = initial_connectivity |
| callbacks = initial_callbacks |
| while True: |
| for callback in callbacks: |
| cygrpc.block_if_fork_in_progress(state) |
| try: |
| callback(connectivity) |
| except Exception: # pylint: disable=broad-except |
| _LOGGER.exception( |
| _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE |
| ) |
| with state.lock: |
| callbacks = _deliveries(state) |
| if callbacks: |
| connectivity = state.connectivity |
| else: |
| state.delivering = False |
| return |
| |
| |
| def _spawn_delivery( |
| state: _ChannelConnectivityState, |
| callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], |
| ) -> None: |
| delivering_thread = cygrpc.ForkManagedThread( |
| target=_deliver, |
| args=( |
| state, |
| state.connectivity, |
| callbacks, |
| ), |
| ) |
| delivering_thread.setDaemon(True) |
| delivering_thread.start() |
| state.delivering = True |
| |
| |
| # NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. |
| def _poll_connectivity( |
| state: _ChannelConnectivityState, |
| channel: grpc.Channel, |
| initial_try_to_connect: bool, |
| ) -> None: |
| try_to_connect = initial_try_to_connect |
| connectivity = channel.check_connectivity_state(try_to_connect) |
| with state.lock: |
| state.connectivity = ( |
| _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ |
| connectivity |
| ] |
| ) |
| callbacks = tuple( |
| callback for callback, _ in state.callbacks_and_connectivities |
| ) |
| for callback_and_connectivity in state.callbacks_and_connectivities: |
| callback_and_connectivity[1] = state.connectivity |
| if callbacks: |
| _spawn_delivery(state, callbacks) |
| while True: |
| event = channel.watch_connectivity_state( |
| connectivity, time.time() + 0.2 |
| ) |
| cygrpc.block_if_fork_in_progress(state) |
| with state.lock: |
| if ( |
| not state.callbacks_and_connectivities |
| and not state.try_to_connect |
| ): |
| state.polling = False |
| state.connectivity = None |
| break |
| try_to_connect = state.try_to_connect |
| state.try_to_connect = False |
| if event.success or try_to_connect: |
| connectivity = channel.check_connectivity_state(try_to_connect) |
| with state.lock: |
| state.connectivity = ( |
| _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ |
| connectivity |
| ] |
| ) |
| if not state.delivering: |
| callbacks = _deliveries(state) |
| if callbacks: |
| _spawn_delivery(state, callbacks) |
| |
| |
| def _subscribe( |
| state: _ChannelConnectivityState, |
| callback: Callable[[grpc.ChannelConnectivity], None], |
| try_to_connect: bool, |
| ) -> None: |
| with state.lock: |
| if not state.callbacks_and_connectivities and not state.polling: |
| polling_thread = cygrpc.ForkManagedThread( |
| target=_poll_connectivity, |
| args=(state, state.channel, bool(try_to_connect)), |
| ) |
| polling_thread.setDaemon(True) |
| polling_thread.start() |
| state.polling = True |
| state.callbacks_and_connectivities.append([callback, None]) |
| elif not state.delivering and state.connectivity is not None: |
| _spawn_delivery(state, (callback,)) |
| state.try_to_connect |= bool(try_to_connect) |
| state.callbacks_and_connectivities.append( |
| [callback, state.connectivity] |
| ) |
| else: |
| state.try_to_connect |= bool(try_to_connect) |
| state.callbacks_and_connectivities.append([callback, None]) |
| |
| |
| def _unsubscribe( |
| state: _ChannelConnectivityState, |
| callback: Callable[[grpc.ChannelConnectivity], None], |
| ) -> None: |
| with state.lock: |
| for index, (subscribed_callback, unused_connectivity) in enumerate( |
| state.callbacks_and_connectivities |
| ): |
| if callback == subscribed_callback: |
| state.callbacks_and_connectivities.pop(index) |
| break |
| |
| |
| def _augment_options( |
| base_options: Sequence[ChannelArgumentType], |
| compression: Optional[grpc.Compression], |
| ) -> Sequence[ChannelArgumentType]: |
| compression_option = _compression.create_channel_option(compression) |
| return ( |
| tuple(base_options) |
| + compression_option |
| + ( |
| ( |
| cygrpc.ChannelArgKey.primary_user_agent_string, |
| _USER_AGENT, |
| ), |
| ) |
| ) |
| |
| |
| def _separate_channel_options( |
| options: Sequence[ChannelArgumentType], |
| ) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: |
| """Separates core channel options from Python channel options.""" |
| core_options = [] |
| python_options = [] |
| for pair in options: |
| if ( |
| pair[0] |
| == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream |
| ): |
| python_options.append(pair) |
| else: |
| core_options.append(pair) |
| return python_options, core_options |
| |
| |
| class Channel(grpc.Channel): |
| """A cygrpc.Channel-backed implementation of grpc.Channel.""" |
| |
| _single_threaded_unary_stream: bool |
| _channel: cygrpc.Channel |
| _call_state: _ChannelCallState |
| _connectivity_state: _ChannelConnectivityState |
| _target: str |
| _registered_call_handles: Dict[str, int] |
| |
| def __init__( |
| self, |
| target: str, |
| options: Sequence[ChannelArgumentType], |
| credentials: Optional[grpc.ChannelCredentials], |
| compression: Optional[grpc.Compression], |
| ): |
| """Constructor. |
| |
| Args: |
| target: The target to which to connect. |
| options: Configuration options for the channel. |
| credentials: A cygrpc.ChannelCredentials or None. |
| compression: An optional value indicating the compression method to be |
| used over the lifetime of the channel. |
| """ |
| python_options, core_options = _separate_channel_options(options) |
| self._single_threaded_unary_stream = ( |
| _DEFAULT_SINGLE_THREADED_UNARY_STREAM |
| ) |
| self._process_python_options(python_options) |
| self._channel = cygrpc.Channel( |
| _common.encode(target), |
| _augment_options(core_options, compression), |
| credentials, |
| ) |
| self._target = target |
| self._call_state = _ChannelCallState(self._channel) |
| self._connectivity_state = _ChannelConnectivityState(self._channel) |
| cygrpc.fork_register_channel(self) |
| if cygrpc.g_gevent_activated: |
| cygrpc.gevent_increment_channel_count() |
| |
| def _get_registered_call_handle(self, method: str) -> int: |
| """ |
| Get the registered call handle for a method. |
| |
| This is a semi-private method. It is intended for use only by gRPC generated code. |
| |
| This method is not thread-safe. |
| |
| Args: |
| method: Required, the method name for the RPC. |
| |
| Returns: |
| The registered call handle pointer in the form of a Python Long. |
| """ |
| return self._channel.get_registered_call_handle(_common.encode(method)) |
| |
| def _process_python_options( |
| self, python_options: Sequence[ChannelArgumentType] |
| ) -> None: |
| """Sets channel attributes according to python-only channel options.""" |
| for pair in python_options: |
| if ( |
| pair[0] |
| == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream |
| ): |
| self._single_threaded_unary_stream = True |
| |
| def subscribe( |
| self, |
| callback: Callable[[grpc.ChannelConnectivity], None], |
| try_to_connect: Optional[bool] = None, |
| ) -> None: |
| _subscribe(self._connectivity_state, callback, try_to_connect) |
| |
| def unsubscribe( |
| self, callback: Callable[[grpc.ChannelConnectivity], None] |
| ) -> None: |
| _unsubscribe(self._connectivity_state, callback) |
| |
| # pylint: disable=arguments-differ |
| def unary_unary( |
| self, |
| method: str, |
| request_serializer: Optional[SerializingFunction] = None, |
| response_deserializer: Optional[DeserializingFunction] = None, |
| _registered_method: Optional[bool] = False, |
| ) -> grpc.UnaryUnaryMultiCallable: |
| _registered_call_handle = None |
| if _registered_method: |
| _registered_call_handle = self._get_registered_call_handle(method) |
| return _UnaryUnaryMultiCallable( |
| self._channel, |
| _channel_managed_call_management(self._call_state), |
| _common.encode(method), |
| _common.encode(self._target), |
| request_serializer, |
| response_deserializer, |
| _registered_call_handle, |
| ) |
| |
| # pylint: disable=arguments-differ |
| def unary_stream( |
| self, |
| method: str, |
| request_serializer: Optional[SerializingFunction] = None, |
| response_deserializer: Optional[DeserializingFunction] = None, |
| _registered_method: Optional[bool] = False, |
| ) -> grpc.UnaryStreamMultiCallable: |
| _registered_call_handle = None |
| if _registered_method: |
| _registered_call_handle = self._get_registered_call_handle(method) |
| # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC |
| # on a single Python thread results in an appreciable speed-up. However, |
| # due to slight differences in capability, the multi-threaded variant |
| # remains the default. |
| if self._single_threaded_unary_stream: |
| return _SingleThreadedUnaryStreamMultiCallable( |
| self._channel, |
| _common.encode(method), |
| _common.encode(self._target), |
| request_serializer, |
| response_deserializer, |
| _registered_call_handle, |
| ) |
| else: |
| return _UnaryStreamMultiCallable( |
| self._channel, |
| _channel_managed_call_management(self._call_state), |
| _common.encode(method), |
| _common.encode(self._target), |
| request_serializer, |
| response_deserializer, |
| _registered_call_handle, |
| ) |
| |
| # pylint: disable=arguments-differ |
| def stream_unary( |
| self, |
| method: str, |
| request_serializer: Optional[SerializingFunction] = None, |
| response_deserializer: Optional[DeserializingFunction] = None, |
| _registered_method: Optional[bool] = False, |
| ) -> grpc.StreamUnaryMultiCallable: |
| _registered_call_handle = None |
| if _registered_method: |
| _registered_call_handle = self._get_registered_call_handle(method) |
| return _StreamUnaryMultiCallable( |
| self._channel, |
| _channel_managed_call_management(self._call_state), |
| _common.encode(method), |
| _common.encode(self._target), |
| request_serializer, |
| response_deserializer, |
| _registered_call_handle, |
| ) |
| |
| # pylint: disable=arguments-differ |
| def stream_stream( |
| self, |
| method: str, |
| request_serializer: Optional[SerializingFunction] = None, |
| response_deserializer: Optional[DeserializingFunction] = None, |
| _registered_method: Optional[bool] = False, |
| ) -> grpc.StreamStreamMultiCallable: |
| _registered_call_handle = None |
| if _registered_method: |
| _registered_call_handle = self._get_registered_call_handle(method) |
| return _StreamStreamMultiCallable( |
| self._channel, |
| _channel_managed_call_management(self._call_state), |
| _common.encode(method), |
| _common.encode(self._target), |
| request_serializer, |
| response_deserializer, |
| _registered_call_handle, |
| ) |
| |
| def _unsubscribe_all(self) -> None: |
| state = self._connectivity_state |
| if state: |
| with state.lock: |
| del state.callbacks_and_connectivities[:] |
| |
| def _close(self) -> None: |
| self._unsubscribe_all() |
| self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") |
| cygrpc.fork_unregister_channel(self) |
| if cygrpc.g_gevent_activated: |
| cygrpc.gevent_decrement_channel_count() |
| |
| def _close_on_fork(self) -> None: |
| self._unsubscribe_all() |
| self._channel.close_on_fork( |
| cygrpc.StatusCode.cancelled, "Channel closed due to fork" |
| ) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self._close() |
| return False |
| |
| def close(self) -> None: |
| self._close() |
| |
| def __del__(self): |
| # TODO(https://github.com/grpc/grpc/issues/12531): Several releases |
| # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call |
| # here (or more likely, call self._close() here). We don't do this today |
| # because many valid use cases today allow the channel to be deleted |
| # immediately after stubs are created. After a sufficient period of time |
| # has passed for all users to be trusted to freeze out to their channels |
| # for as long as they are in use and to close them after using them, |
| # then deletion of this grpc._channel.Channel instance can be made to |
| # effect closure of the underlying cygrpc.Channel instance. |
| try: |
| self._unsubscribe_all() |
| except: # pylint: disable=bare-except |
| # Exceptions in __del__ are ignored by Python anyway, but they can |
| # keep spamming logs. Just silence them. |
| pass |