Add support for gRPC connection management (available when using optional grpc_gcp dependency) (#5553)

diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py
index 6a2f052..b4ac9e0 100644
--- a/google/api_core/grpc_helpers.py
+++ b/google/api_core/grpc_helpers.py
@@ -26,6 +26,11 @@
 import google.auth.transport.grpc
 import google.auth.transport.requests
 
+try:
+    import grpc_gcp
+    HAS_GRPC_GCP = True
+except ImportError:
+    HAS_GRPC_GCP = False
 
 # The list of gRPC Callable interfaces that return iterators.
 _STREAM_WRAP_CLASSES = (
@@ -149,7 +154,11 @@
         return _wrap_unary_errors(callable_)
 
 
-def create_channel(target, credentials=None, scopes=None, **kwargs):
+def create_channel(target,
+                   credentials=None,
+                   scopes=None,
+                   ssl_credentials=None,
+                   **kwargs):
     """Create a secure channel with credentials.
 
     Args:
@@ -160,8 +169,10 @@
         scopes (Sequence[str]): A optional list of scopes needed for this
             service. These are only used when credentials are not specified and
             are passed to :func:`google.auth.default`.
+        ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
+            credentials. This can be used to specify different certificates.
         kwargs: Additional key-word args passed to
-            :func:`google.auth.transport.grpc.secure_authorized_channel`.
+            :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
 
     Returns:
         grpc.Channel: The created channel.
@@ -174,8 +185,26 @@
 
     request = google.auth.transport.requests.Request()
 
-    return google.auth.transport.grpc.secure_authorized_channel(
-        credentials, request, target, **kwargs)
+    # Create the metadata plugin for inserting the authorization header.
+    metadata_plugin = google.auth.transport.grpc.AuthMetadataPlugin(
+        credentials, request)
+
+    # Create a set of grpc.CallCredentials using the metadata plugin.
+    google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
+
+    if ssl_credentials is None:
+        ssl_credentials = grpc.ssl_channel_credentials()
+
+    # Combine the ssl credentials and the authorization credentials.
+    composite_credentials = grpc.composite_channel_credentials(
+        ssl_credentials, google_auth_credentials)
+
+    if HAS_GRPC_GCP:
+        # If grpc_gcp module is available use grpc_gcp.secure_channel,
+        # otherwise, use grpc.secure_channel to create grpc channel.
+        return grpc_gcp.secure_channel(target, composite_credentials, **kwargs)
+    else:
+        return grpc.secure_channel(target, composite_credentials, **kwargs)
 
 
 _MethodCall = collections.namedtuple(
diff --git a/nox.py b/nox.py
index 7b40c68..dacfbb5 100644
--- a/nox.py
+++ b/nox.py
@@ -66,6 +66,23 @@
 
 
 @nox.session
[email protected]('py', ['2.7', '3.5', '3.6', '3.7'])
+def unit_grpc_gcp(session, py):
+    """Run the unit test suite with grpcio-gcp installed."""
+
+    # Run unit tests against all supported versions of Python.
+    session.interpreter = 'python{}'.format(py)
+
+    # Set the virtualenv dirname.
+    session.virtualenv_dirname = 'unit-grpc-gcp-' + py
+
+    # Install grpcio-gcp
+    session.install('grpcio-gcp')
+
+    default(session)
+
+
[email protected]
 def lint(session):
     """Run linters.
 
diff --git a/tests/unit/test_grpc_helpers.py b/tests/unit/test_grpc_helpers.py
index e5e4311..b91847c 100644
--- a/tests/unit/test_grpc_helpers.py
+++ b/tests/unit/test_grpc_helpers.py
@@ -176,60 +176,157 @@
     wrap_stream_errors.assert_called_once_with(callable_)
 
 
[email protected]('grpc.composite_channel_credentials')
 @mock.patch(
     'google.auth.default',
     return_value=(mock.sentinel.credentials, mock.sentinel.projet))
[email protected]('google.auth.transport.grpc.secure_authorized_channel')
-def test_create_channel_implicit(secure_authorized_channel, default):
[email protected]('grpc.secure_channel')
+def test_create_channel_implicit(
+        grpc_secure_channel, default, composite_creds_call):
     target = 'example.com:443'
+    composite_creds = composite_creds_call.return_value
 
     channel = grpc_helpers.create_channel(target)
 
-    assert channel is secure_authorized_channel.return_value
+    assert channel is grpc_secure_channel.return_value
     default.assert_called_once_with(scopes=None)
-    secure_authorized_channel.assert_called_once_with(
-        mock.sentinel.credentials, mock.ANY, target)
+    if (grpc_helpers.HAS_GRPC_GCP):
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds, None)
+    else:
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds)
 
 
[email protected]('grpc.composite_channel_credentials')
 @mock.patch(
     'google.auth.default',
     return_value=(mock.sentinel.credentials, mock.sentinel.projet))
[email protected]('google.auth.transport.grpc.secure_authorized_channel')
-def test_create_channel_implicit_with_scopes(
-        secure_authorized_channel, default):
[email protected]('grpc.secure_channel')
+def test_create_channel_implicit_with_ssl_creds(
+        grpc_secure_channel, default, composite_creds_call):
     target = 'example.com:443'
 
+    ssl_creds = grpc.ssl_channel_credentials()
+
+    grpc_helpers.create_channel(target, ssl_credentials=ssl_creds)
+
+    default.assert_called_once_with(scopes=None)
+    composite_creds_call.assert_called_once_with(ssl_creds, mock.ANY)
+    composite_creds = composite_creds_call.return_value
+    if (grpc_helpers.HAS_GRPC_GCP):
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds, None)
+    else:
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds)
+
+
[email protected]('grpc.composite_channel_credentials')
[email protected](
+    'google.auth.default',
+    return_value=(mock.sentinel.credentials, mock.sentinel.projet))
[email protected]('grpc.secure_channel')
+def test_create_channel_implicit_with_scopes(
+        grpc_secure_channel, default, composite_creds_call):
+    target = 'example.com:443'
+    composite_creds = composite_creds_call.return_value
+
     channel = grpc_helpers.create_channel(target, scopes=['one', 'two'])
 
-    assert channel is secure_authorized_channel.return_value
+    assert channel is grpc_secure_channel.return_value
     default.assert_called_once_with(scopes=['one', 'two'])
+    if (grpc_helpers.HAS_GRPC_GCP):
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds, None)
+    else:
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds)
 
 
[email protected]('google.auth.transport.grpc.secure_authorized_channel')
-def test_create_channel_explicit(secure_authorized_channel):
[email protected]('grpc.composite_channel_credentials')
[email protected]('google.auth.credentials.with_scopes_if_required')
[email protected]('grpc.secure_channel')
+def test_create_channel_explicit(
+        grpc_secure_channel, auth_creds, composite_creds_call):
     target = 'example.com:443'
+    composite_creds = composite_creds_call.return_value
 
     channel = grpc_helpers.create_channel(
         target, credentials=mock.sentinel.credentials)
 
-    assert channel is secure_authorized_channel.return_value
-    secure_authorized_channel.assert_called_once_with(
-        mock.sentinel.credentials, mock.ANY, target)
+    auth_creds.assert_called_once_with(mock.sentinel.credentials, None)
+    assert channel is grpc_secure_channel.return_value
+    if (grpc_helpers.HAS_GRPC_GCP):
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds, None)
+    else:
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds)
 
 
[email protected]('google.auth.transport.grpc.secure_authorized_channel')
-def test_create_channel_explicit_scoped(unused_secure_authorized_channel):
[email protected]('grpc.composite_channel_credentials')
[email protected]('grpc.secure_channel')
+def test_create_channel_explicit_scoped(
+        grpc_secure_channel, composite_creds_call):
+    target = 'example.com:443'
     scopes = ['1', '2']
+    composite_creds = composite_creds_call.return_value
+
+    credentials = mock.create_autospec(
+        google.auth.credentials.Scoped, instance=True)
+    credentials.requires_scopes = True
+
+    channel = grpc_helpers.create_channel(
+        target,
+        credentials=credentials,
+        scopes=scopes)
+
+    credentials.with_scopes.assert_called_once_with(scopes)
+    assert channel is grpc_secure_channel.return_value
+    if (grpc_helpers.HAS_GRPC_GCP):
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds, None)
+    else:
+        grpc_secure_channel.assert_called_once_with(
+            target, composite_creds)
+
+
[email protected](not grpc_helpers.HAS_GRPC_GCP,
+                    reason='grpc_gcp module not available')
[email protected]('grpc_gcp.secure_channel')
+def test_create_channel_with_grpc_gcp(grpc_gcp_secure_channel):
+    target = 'example.com:443'
+    scopes = ['test_scope']
 
     credentials = mock.create_autospec(
         google.auth.credentials.Scoped, instance=True)
     credentials.requires_scopes = True
 
     grpc_helpers.create_channel(
-        mock.sentinel.target,
+        target,
         credentials=credentials,
         scopes=scopes)
+    grpc_gcp_secure_channel.assert_called()
+    credentials.with_scopes.assert_called_once_with(scopes)
 
+
[email protected](grpc_helpers.HAS_GRPC_GCP,
+                    reason='grpc_gcp module not available')
[email protected]('grpc.secure_channel')
+def test_create_channel_without_grpc_gcp(grpc_secure_channel):
+    target = 'example.com:443'
+    scopes = ['test_scope']
+
+    credentials = mock.create_autospec(
+        google.auth.credentials.Scoped, instance=True)
+    credentials.requires_scopes = True
+
+    grpc_helpers.create_channel(
+        target,
+        credentials=credentials,
+        scopes=scopes)
+    grpc_secure_channel.assert_called()
     credentials.with_scopes.assert_called_once_with(scopes)