Load-balancing ManagedChannelImpl.
- Add NameResolver and LoadBalancer interfaces.
- ManagedChannelImpl now uses NameResolver and LoadBalancer for
transport selection, which may return transports to multiple
addresses.
- Transports are still owned by ManagedChannelImpl, which implements
TransportManager interface that is provided to LoadBalancer, so that
LoadBalancer doesn't worry about Transport lifecycles.
- Channel builders can be created by forTarget() that accepts the fully
qualified target name, which is an URI. (TODO) it's not tested.
- The old address-based construction pattern is supported by using
AbstractManagedChannelImplBuilder.DirectAddressNameResolver.
- (TODO) DnsNameResolver and SimpleLoadBalancer are currently
incomplete. They merely work for the single-address scenario.
diff --git a/core/src/main/java/io/grpc/Attributes.java b/core/src/main/java/io/grpc/Attributes.java
new file mode 100644
index 0000000..9508ecf
--- /dev/null
+++ b/core/src/main/java/io/grpc/Attributes.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * An immutable type-safe container of attributes.
+ */
+@ExperimentalApi
+@Immutable
+public final class Attributes {
+
+ private final HashMap<String, Object> data = new HashMap<String, Object>();
+
+ public static final Attributes EMPTY = new Attributes();
+
+ private Attributes() {
+ }
+
+ /**
+ * Gets the value for the key, or {@code null} if it's not present.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ public <T> T get(Key<T> key) {
+ return (T) data.get(key.name);
+ }
+
+ /**
+ * Create a new builder.
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static final class Key<T> {
+ private final String name;
+
+ /**
+ * Construct the key.
+ *
+ * @param name the name, which should be namespaced like com.foo.BarAttribute to avoid
+ * collision.
+ */
+ public Key(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+
+ public static final class Builder {
+ private Attributes product;
+
+ private Builder() {
+ this.product = new Attributes();
+ }
+
+ public <T> void set(Key<T> key, T value) {
+ product.data.put(key.name, value);
+ }
+
+ /**
+ * Build the attributes. Can only be called once.
+ */
+ public Attributes build() {
+ Preconditions.checkState(product != null, "Already built");
+ Attributes result = product;
+ product = null;
+ return result;
+ }
+ }
+}
diff --git a/core/src/main/java/io/grpc/CallOptions.java b/core/src/main/java/io/grpc/CallOptions.java
index f73c2ed..5d7f2e2 100644
--- a/core/src/main/java/io/grpc/CallOptions.java
+++ b/core/src/main/java/io/grpc/CallOptions.java
@@ -62,6 +62,9 @@
@Nullable
private String authority;
+ @Nullable
+ private RequestKey requestKey;
+
/**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
@@ -131,6 +134,25 @@
}
/**
+ * Returns a new {@code CallOptions} with a request key for affinity-based routing.
+ */
+ @ExperimentalApi
+ public CallOptions withRequestKey(@Nullable RequestKey requestKey) {
+ CallOptions newOptions = new CallOptions(this);
+ newOptions.requestKey = requestKey;
+ return newOptions;
+ }
+
+ /**
+ * Returns the request key for affinity-based routing.
+ */
+ @ExperimentalApi
+ @Nullable
+ public RequestKey getRequestKey() {
+ return requestKey;
+ }
+
+ /**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
* services, even if those services are hosted on different domain names. That assumes the
@@ -155,6 +177,7 @@
deadlineNanoTime = other.deadlineNanoTime;
compressor = other.compressor;
authority = other.authority;
+ requestKey = other.requestKey;
}
@SuppressWarnings("deprecation") // guava 14.0
diff --git a/core/src/main/java/io/grpc/DnsNameResolverFactory.java b/core/src/main/java/io/grpc/DnsNameResolverFactory.java
new file mode 100644
index 0000000..e436747
--- /dev/null
+++ b/core/src/main/java/io/grpc/DnsNameResolverFactory.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.base.Preconditions;
+
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.SharedResourceHolder;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory for DNS-based {@link NameResolver}s.
+ *
+ * <p>The format of the target URI is {@code "[dns://[<DNS_server_address>]/]<name>"}.
+ */
+@ExperimentalApi
+public final class DnsNameResolverFactory extends NameResolver.Factory {
+
+ private static final DnsNameResolverFactory instance = new DnsNameResolverFactory();
+
+ @Override
+ public NameResolver newNameResolver(URI targetUri) {
+ String scheme = targetUri.getScheme();
+ if (scheme == null) {
+ return new DnsNameResolver(null, targetUri.toString());
+ } else if (scheme.equals("dns")) {
+ String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
+ Preconditions.checkArgument(targetPath.startsWith("/"),
+ "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
+ String name = targetPath.substring(1);
+ return new DnsNameResolver(targetUri.getAuthority(), name);
+ } else {
+ return null;
+ }
+ }
+
+ private DnsNameResolverFactory() {
+ }
+
+ public static DnsNameResolverFactory getInstance() {
+ return instance;
+ }
+
+ private static class DnsNameResolver extends NameResolver {
+ private final String authority;
+ private final String host;
+ private final int port;
+ private ExecutorService executor;
+
+ DnsNameResolver(@Nullable String nsAuthority, String name) {
+ // TODO: if a DNS server is provided as nsAuthority, use it.
+ // https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
+
+ // Must prepend a "//" to the name when constructing a URI, otherwise
+ // the authority and host of the resulted URI would be null.
+ URI nameUri = URI.create("//" + name);
+ authority = Preconditions.checkNotNull(nameUri.getAuthority(),
+ "nameUri (%s) doesn't have an authority", nameUri);
+ host = Preconditions.checkNotNull(nameUri.getHost(), "host");
+ port = nameUri.getPort();
+ Preconditions.checkArgument(port > 0, "port (%s) must be positive", port);
+ }
+
+ @Override
+ public String getServiceAuthority() {
+ return authority;
+ }
+
+ @Override
+ public synchronized void start(final Listener listener) {
+ Preconditions.checkState(executor == null, "already started");
+ executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ InetAddress[] inetAddrs;
+ try {
+ inetAddrs = InetAddress.getAllByName(host);
+ } catch (Exception e) {
+ listener.onError(Status.UNAVAILABLE.withCause(e));
+ return;
+ }
+ ArrayList<ResolvedServerInfo> servers
+ = new ArrayList<ResolvedServerInfo>(inetAddrs.length);
+ for (int i = 0; i < inetAddrs.length; i++) {
+ InetAddress inetAddr = inetAddrs[i];
+ servers.add(
+ new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
+ }
+ listener.onUpdate(servers, Attributes.EMPTY);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (executor != null) {
+ executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
new file mode 100644
index 0000000..3449021
--- /dev/null
+++ b/core/src/main/java/io/grpc/LoadBalancer.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A pluggable component that receives resolved addresses from {@link NameResolver} and provides the
+ * channel a usable transport when asked.
+ *
+ * <p>Note to implementations: all methods are expected to return quickly. Any work that may block
+ * should be done asynchronously.
+ */
+// TODO(zhangkun83): since it's also used for non-loadbalancing cases like pick-first,
+// "RequestRouter" might be a better name.
+@ExperimentalApi
+@ThreadSafe
+public abstract class LoadBalancer {
+ /**
+ * Pick a transport that Channel will use for next RPC.
+ *
+ * @param requestKey for affinity-based routing
+ */
+ public abstract ListenableFuture<ClientTransport> pickTransport(
+ @Nullable RequestKey requestKey);
+
+ /**
+ * Shuts down this {@code LoadBalancer}.
+ */
+ public void shutdown() { }
+
+ /**
+ * Handles newly resolved addresses and service config from name resolution system.
+ *
+ * <p>Implementations should not modify the given {@code servers}.
+ */
+ public void handleResolvedAddresses(List<ResolvedServerInfo> servers, Attributes config) { }
+
+ /**
+ * Handles an error from the name resolution system.
+ *
+ * @param error a non-OK status
+ */
+ public void handleNameResolutionError(Status error) { }
+
+ /**
+ * Called when a transport is fully connected and ready to accept traffic.
+ */
+ public void transportReady(SocketAddress addr, ClientTransport transport) { }
+
+ /**
+ * Called when a transport is shutting down.
+ */
+ public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { }
+
+ public abstract static class Factory {
+ /**
+ * Creates a {@link LoadBalancer} that will be used inside a channel.
+ *
+ * @param serviceName the DNS-style service name, which is also the authority
+ * @param tm the interface where an {@code LoadBalancer} implementation gets connected
+ * transports from
+ */
+ public abstract LoadBalancer newLoadBalancer(String serviceName, TransportManager tm);
+ }
+}
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 0e5e33b..17c3091 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -44,6 +44,11 @@
return ManagedChannelProvider.provider().builderForAddress(name, port);
}
+ @ExperimentalApi
+ public static ManagedChannelBuilder<?> forTarget(String target) {
+ return ManagedChannelProvider.provider().builderForTarget(target);
+ }
+
/**
* Provides a custom executor.
*
@@ -99,6 +104,24 @@
@ExperimentalApi("primarily for testing")
public abstract T usePlaintext(boolean skipNegotiation);
+ /*
+ * Provides a custom {@link NameResolver.Factory} for the channel.
+ *
+ * <p>If this method is not called, the builder will look up in the global resolver registry for
+ * a factory for the provided target.
+ */
+ @ExperimentalApi
+ public abstract T nameResolverFactory(NameResolver.Factory resolverFactory);
+
+ /**
+ * Provides a custom {@link LoadBalancer.Factory} for the channel.
+ *
+ * <p>If this method is not called, the builder will use {@link SimpleLoadBalancerFactory} for the
+ * channel.
+ */
+ @ExperimentalApi
+ public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
+
/**
* Builds a channel using the given parameters.
*/
diff --git a/core/src/main/java/io/grpc/ManagedChannelProvider.java b/core/src/main/java/io/grpc/ManagedChannelProvider.java
index 991b8e1..434a007 100644
--- a/core/src/main/java/io/grpc/ManagedChannelProvider.java
+++ b/core/src/main/java/io/grpc/ManagedChannelProvider.java
@@ -106,6 +106,12 @@
*/
protected abstract ManagedChannelBuilder<?> builderForAddress(String name, int port);
+ /**
+ * Creates a new builder with the given target URI.
+ */
+ @ExperimentalApi
+ protected abstract ManagedChannelBuilder<?> builderForTarget(String target);
+
public static final class ProviderNotFoundException extends RuntimeException {
public ProviderNotFoundException(String msg) {
super(msg);
diff --git a/core/src/main/java/io/grpc/NameResolver.java b/core/src/main/java/io/grpc/NameResolver.java
new file mode 100644
index 0000000..22a5e61
--- /dev/null
+++ b/core/src/main/java/io/grpc/NameResolver.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import java.net.URI;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A pluggable component that resolves a target URI (which is broken down into 3 parts as described
+ * below) and return addresses to the caller.
+ *
+ * <p>The format of the target URI is {@code "[<scheme>:]<scheme-specific-string>"}
+ *
+ * <p>{@code NameResolver} has no knowledge of load-balancing. The addresses of a target may be
+ * changed over time, thus the caller registers a {@link Listener} to receive continuous updates.
+ */
+@ExperimentalApi
+@ThreadSafe
+public abstract class NameResolver {
+ /**
+ * Returns the authority, which is also the name of the service.
+ *
+ * <p>An implementation must generate it locally and must keep it unchanged.
+ */
+ public abstract String getServiceAuthority();
+
+ /**
+ * Starts the resolution.
+ *
+ * @param listener used to receive updates on the target
+ */
+ public abstract void start(Listener listener);
+
+ /**
+ * Stops the resolution. Updates to the Listener will stop.
+ */
+ public abstract void shutdown();
+
+ public abstract static class Factory {
+ /**
+ * Creates a {@link NameResolver} for the given target URI, or {@code null} if the given URI
+ * cannot be resolved by this factory.
+ */
+ @Nullable
+ public abstract NameResolver newNameResolver(URI targetUri);
+ }
+
+ /**
+ * Receives address updates.
+ *
+ * <p>All methods are expected to return quickly.
+ */
+ @ThreadSafe
+ public interface Listener {
+ /**
+ * Handles updates on resolved addresses and config.
+ *
+ * <p>Implementations will not modify the given {@code servers}.
+ */
+ void onUpdate(List<ResolvedServerInfo> servers, Attributes config);
+
+ /**
+ * Handles an error from the resolver.
+ *
+ * @param error a non-OK status
+ */
+ void onError(Status error);
+ }
+}
diff --git a/core/src/main/java/io/grpc/RequestKey.java b/core/src/main/java/io/grpc/RequestKey.java
new file mode 100644
index 0000000..6ba6bcf
--- /dev/null
+++ b/core/src/main/java/io/grpc/RequestKey.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+/**
+ * A key generated from an RPC request, and to be used for affinity-based
+ * routing.
+ */
+@ExperimentalApi
+public final class RequestKey {
+
+ // TODO(zhangkun83): materialize this class once we decide the form of the affinity key.
+ private RequestKey() {
+ }
+}
diff --git a/core/src/main/java/io/grpc/ResolvedServerInfo.java b/core/src/main/java/io/grpc/ResolvedServerInfo.java
new file mode 100644
index 0000000..2e914de
--- /dev/null
+++ b/core/src/main/java/io/grpc/ResolvedServerInfo.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import java.net.SocketAddress;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * The information about a server from a {@link NameResolver}.
+ */
+@ExperimentalApi
+@Immutable
+public final class ResolvedServerInfo {
+ private final SocketAddress address;
+ private final Attributes attributes;
+
+ /**
+ * Constructor.
+ *
+ * @param address the address object
+ * @param attributes attributes associated with this address.
+ */
+ public ResolvedServerInfo(SocketAddress address, Attributes attributes) {
+ this.address = address;
+ this.attributes = attributes;
+ }
+
+ /**
+ * Returns the address.
+ */
+ public SocketAddress getAddress() {
+ return address;
+ }
+
+ /**
+ * Returns the associated attributes.
+ */
+ public Attributes getAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public String toString() {
+ return "[address=" + address + ", attrs=" + attributes + "]";
+ }
+}
diff --git a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java
new file mode 100644
index 0000000..b8d07f0
--- /dev/null
+++ b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2014, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A {@link LoadBalancer} that provides simple round-robin and pick-first routing mechanism over the
+ * addresses from the {@link NameResolver}.
+ */
+// TODO(zhangkun83): Only pick-first is implemented. We need to implement round-robin.
+public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
+
+ private static final SimpleLoadBalancerFactory instance = new SimpleLoadBalancerFactory();
+
+ private SimpleLoadBalancerFactory() {
+ }
+
+ public static SimpleLoadBalancerFactory getInstance() {
+ return instance;
+ }
+
+ @Override
+ public LoadBalancer newLoadBalancer(String serviceName, TransportManager tm) {
+ return new SimpleLoadBalancer(tm);
+ }
+
+ private static class SimpleLoadBalancer extends LoadBalancer {
+ @GuardedBy("servers")
+ private final List<ResolvedServerInfo> servers = new ArrayList<ResolvedServerInfo>();
+ @GuardedBy("servers")
+ private int currentServerIndex;
+ // TODO(zhangkun83): virtually any LoadBalancer would need to handle picks before name
+ // resolution is done, we may want to move the related logic into ManagedChannelImpl.
+ @GuardedBy("servers")
+ private List<SettableFuture<ClientTransport>> pendingPicks;
+ @GuardedBy("servers")
+ private StatusException nameResolutionError;
+
+ private final TransportManager tm;
+
+ private SimpleLoadBalancer(TransportManager tm) {
+ this.tm = tm;
+ }
+
+ @Override
+ public ListenableFuture<ClientTransport> pickTransport(@Nullable RequestKey requestKey) {
+ ResolvedServerInfo currentServer;
+ synchronized (servers) {
+ if (servers.isEmpty()) {
+ if (nameResolutionError != null) {
+ return Futures.immediateFailedFuture(nameResolutionError);
+ }
+ SettableFuture<ClientTransport> future = SettableFuture.create();
+ if (pendingPicks == null) {
+ pendingPicks = new ArrayList<SettableFuture<ClientTransport>>();
+ }
+ pendingPicks.add(future);
+ return future;
+ }
+ currentServer = servers.get(currentServerIndex);
+ }
+ return tm.getTransport(currentServer.getAddress());
+ }
+
+ @Override
+ public void handleResolvedAddresses(
+ List<ResolvedServerInfo> updatedServers, Attributes config) {
+ List<SettableFuture<ClientTransport>> pendingPicksCopy = null;
+ ResolvedServerInfo currentServer = null;
+ synchronized (servers) {
+ nameResolutionError = null;
+ servers.clear();
+ for (ResolvedServerInfo addr : updatedServers) {
+ servers.add(addr);
+ }
+ if (!servers.isEmpty()) {
+ pendingPicksCopy = pendingPicks;
+ pendingPicks = null;
+ if (currentServerIndex >= servers.size()) {
+ currentServerIndex = 0;
+ }
+ currentServer = servers.get(currentServerIndex);
+ }
+ }
+ if (pendingPicksCopy != null) {
+ // If pendingPicksCopy != null, then servers.isEmpty() == false, then
+ // currentServer must have been assigned.
+ Preconditions.checkState(currentServer != null, "currentServer is null");
+ for (final SettableFuture<ClientTransport> pendingPick : pendingPicksCopy) {
+ ListenableFuture<ClientTransport> future = tm.getTransport(currentServer.getAddress());
+ Futures.addCallback(future, new FutureCallback<ClientTransport>() {
+ @Override public void onSuccess(ClientTransport result) {
+ pendingPick.set(result);
+ }
+
+ @Override public void onFailure(Throwable t) {
+ pendingPick.setException(t);
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ public void handleNameResolutionError(Status error) {
+ List<SettableFuture<ClientTransport>> pendingPicksCopy = null;
+ StatusException statusException = error.asException();
+ synchronized (servers) {
+ pendingPicksCopy = pendingPicks;
+ pendingPicks = null;
+ nameResolutionError = statusException;
+ }
+ if (pendingPicksCopy != null) {
+ for (SettableFuture<ClientTransport> pendingPick : pendingPicksCopy) {
+ pendingPick.setException(statusException);
+ }
+ }
+ }
+
+ @Override
+ public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) {
+ if (!s.isOk()) {
+ // If the current transport is shut down due to error, move on to the next address in the
+ // list
+ synchronized (servers) {
+ if (addr.equals(servers.get(currentServerIndex).getAddress())) {
+ currentServerIndex++;
+ if (currentServerIndex >= servers.size()) {
+ currentServerIndex = 0;
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/io/grpc/TransportManager.java b/core/src/main/java/io/grpc/TransportManager.java
new file mode 100644
index 0000000..1848bc7
--- /dev/null
+++ b/core/src/main/java/io/grpc/TransportManager.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import java.net.SocketAddress;
+
+/**
+ * Manages transport life-cycles and provide ready-to-use transports.
+ */
+@ExperimentalApi
+public abstract class TransportManager {
+ /**
+ * Advises this {@code TransportManager} to retain transports only to these servers, for warming
+ * up connections and discarding unused connections.
+ */
+ public abstract void updateRetainedTransports(SocketAddress[] addrs);
+
+ /**
+ * Returns the future of a transport for the given server.
+ *
+ * <p>If the channel has been shut down, the value of the future will be {@code null}.
+ */
+ // TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers,
+ // which would have a different authority than the primary servers. We need to figure out how to
+ // do it.
+ public abstract ListenableFuture<ClientTransport> getTransport(SocketAddress addr);
+}
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 4fb9038..4eeb1c9 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -39,6 +39,8 @@
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
+import java.net.SocketAddress;
+
/**
* Builder for a channel that issues in-process requests. Clients identify the in-process server by
* its name.
@@ -59,9 +61,9 @@
}
private final String name;
- private String authority = "localhost";
private InProcessChannelBuilder(String name) {
+ super(new InProcessSocketAddress(name), "localhost");
this.name = Preconditions.checkNotNull(name);
}
@@ -74,39 +76,39 @@
}
@Override
- public InProcessChannelBuilder overrideAuthority(String authority) {
- this.authority = authority;
- return this;
- }
-
- @Override
protected ClientTransportFactory buildTransportFactory() {
- return new InProcessClientTransportFactory(name, authority);
+ return new InProcessClientTransportFactory(name);
}
private static class InProcessClientTransportFactory extends AbstractReferenceCounted
implements ClientTransportFactory {
private final String name;
- private final String authority;
- private InProcessClientTransportFactory(String name, String authority) {
+ private InProcessClientTransportFactory(String name) {
this.name = name;
- this.authority = authority;
}
@Override
- public ClientTransport newClientTransport() {
+ public ClientTransport newClientTransport(SocketAddress addr, String authority) {
return new InProcessTransport(name);
}
@Override
- public String authority() {
- return authority;
- }
-
- @Override
protected void deallocate() {
// Do nothing.
}
}
+
+ private static class InProcessSocketAddress extends SocketAddress {
+ final String name;
+
+ InProcessSocketAddress(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index 5097c9f..b2d4766 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -31,12 +31,23 @@
package io.grpc.internal;
-import io.grpc.ClientInterceptor;
-import io.grpc.Internal;
-import io.grpc.ManagedChannelBuilder;
+import com.google.common.base.Preconditions;
+import io.grpc.Attributes;
+import io.grpc.ClientInterceptor;
+import io.grpc.DnsNameResolverFactory;
+import io.grpc.Internal;
+import io.grpc.LoadBalancer;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolver;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.SimpleLoadBalancerFactory;
+
+import java.net.SocketAddress;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
@@ -54,9 +65,34 @@
private Executor executor;
private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
+ private final URI target;
+
+ @Nullable
+ private final SocketAddress directServerAddress;
+
@Nullable
private String userAgent;
+ @Nullable
+ private String authorityOverride;
+
+ @Nullable
+ private NameResolver.Factory nameResolverFactory;
+
+ @Nullable
+ private LoadBalancer.Factory loadBalancerFactory;
+
+ protected AbstractManagedChannelImplBuilder(URI target) {
+ this.target = Preconditions.checkNotNull(target);
+ this.directServerAddress = null;
+ }
+
+ protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority) {
+ this.target = URI.create("direct-address:///" + directServerAddress);
+ this.directServerAddress = directServerAddress;
+ this.nameResolverFactory = new DirectAddressNameResolverFactory(directServerAddress, authority);
+ }
+
@Override
public final T executor(Executor executor) {
this.executor = executor;
@@ -74,6 +110,24 @@
return intercept(Arrays.asList(interceptors));
}
+ @Override
+ public final T nameResolverFactory(NameResolver.Factory resolverFactory) {
+ Preconditions.checkState(directServerAddress == null,
+ "directServerAddress is set (%s), which forbids the use of NameResolverFactory",
+ directServerAddress);
+ this.nameResolverFactory = resolverFactory;
+ return thisT();
+ }
+
+ @Override
+ public final T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory) {
+ Preconditions.checkState(directServerAddress == null,
+ "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
+ directServerAddress);
+ this.loadBalancerFactory = loadBalancerFactory;
+ return thisT();
+ }
+
private T thisT() {
@SuppressWarnings("unchecked")
T thisT = (T) this;
@@ -87,9 +141,32 @@
}
@Override
+ public final T overrideAuthority(String authority) {
+ this.authorityOverride = checkAuthority(authority);
+ return thisT();
+ }
+
+ /**
+ * Verifies the authority is valid. This method exists as an escape hatch for putting in an
+ * authority that is valid, but would fail the default validation provided by this
+ * implementation.
+ */
+ protected String checkAuthority(String authority) {
+ return GrpcUtil.checkAuthority(authority);
+ }
+
+ @Override
public ManagedChannelImpl build() {
- ClientTransportFactory transportFactory = buildTransportFactory();
- return new ManagedChannelImpl(transportFactory, executor, userAgent, interceptors);
+ ClientTransportFactory transportFactory = new AuthorityOverridingTransportFactory(
+ buildTransportFactory(), authorityOverride);
+ return new ManagedChannelImpl(
+ target,
+ // TODO(carl-mastrangelo): Allow clients to pass this in
+ new ExponentialBackoffPolicy.Provider(),
+ // TODO(zhangkun83): use a NameResolver registry for the "nameResolverFactory == null" case
+ nameResolverFactory == null ? DnsNameResolverFactory.getInstance() : nameResolverFactory,
+ loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance() : loadBalancerFactory,
+ transportFactory, executor, userAgent, interceptors);
}
/**
@@ -99,4 +176,68 @@
*/
@Internal
protected abstract ClientTransportFactory buildTransportFactory();
+
+ private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
+ final ClientTransportFactory factory;
+ @Nullable final String authorityOverride;
+
+ AuthorityOverridingTransportFactory(
+ ClientTransportFactory factory, @Nullable String authorityOverride) {
+ this.factory = factory;
+ this.authorityOverride = authorityOverride;
+ }
+
+ @Override
+ public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
+ return factory.newClientTransport(
+ serverAddress, authorityOverride != null ? authorityOverride : authority);
+ }
+
+ @Override
+ public int referenceCount() {
+ return factory.referenceCount();
+ }
+
+ @Override
+ public ReferenceCounted retain() {
+ factory.retain();
+ return this;
+ }
+
+ @Override
+ public ReferenceCounted release() {
+ factory.release();
+ return this;
+ }
+ }
+
+ private static class DirectAddressNameResolverFactory extends NameResolver.Factory {
+ final SocketAddress address;
+ final String authority;
+
+ DirectAddressNameResolverFactory(SocketAddress address, String authority) {
+ this.address = address;
+ this.authority = authority;
+ }
+
+ @Override
+ public NameResolver newNameResolver(URI notUsedUri) {
+ return new NameResolver() {
+ @Override
+ public String getServiceAuthority() {
+ return authority;
+ }
+
+ @Override
+ public void start(final Listener listener) {
+ listener.onUpdate(
+ Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY)),
+ Attributes.EMPTY);
+ }
+
+ @Override
+ public void shutdown() {}
+ };
+ }
+ }
}
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index de7c0aa..7c7a684 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -96,7 +96,7 @@
* @return a future for client transport. If no more transports can be created, e.g., channel is
* shut down, the future's value will be {@code null}.
*/
- ListenableFuture<ClientTransport> get();
+ ListenableFuture<ClientTransport> get(CallOptions callOptions);
}
@@ -139,7 +139,7 @@
}
ClientStreamListener listener = new ClientStreamListenerImpl(observer);
- ListenableFuture<ClientTransport> transportFuture = clientTransportProvider.get();
+ ListenableFuture<ClientTransport> transportFuture = clientTransportProvider.get(callOptions);
if (transportFuture.isDone()) {
// Try to skip DelayedStream when possible to avoid the overhead of a volatile read in the
// fast path. If that fails, stream will stay null and DelayedStream will be created.
@@ -409,7 +409,7 @@
StreamCreationTask() {
this.transportFuture = Preconditions.checkNotNull(
- clientTransportProvider.get(), "transportFuture");
+ clientTransportProvider.get(callOptions), "transportFuture");
}
@Override
diff --git a/core/src/main/java/io/grpc/internal/ClientTransport.java b/core/src/main/java/io/grpc/internal/ClientTransport.java
index c2c88e4..fd9fa85 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransport.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransport.java
@@ -108,6 +108,8 @@
* that this method is called without {@link #shutdown} being called. If the argument to this
* function is {@link Status#isOk}, it is safe to immediately reconnect.
*
+ * <p>This is called exactly once, and must be called prior to {@link #transportTerminated}.
+ *
* @param s the reason for the shutdown.
*/
void transportShutdown(Status s);
@@ -115,7 +117,8 @@
/**
* The transport completed shutting down. All resources have been released.
*
- * <p> {@link #transportShutdown(Status)} must be called before calling this method.
+ * <p>This is called exactly once, and must be called after {@link #transportShutdown} has been
+ * called.
*/
void transportTerminated();
diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
index a36ade1..bc06a10 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
@@ -31,14 +31,15 @@
package io.grpc.internal;
+import java.net.SocketAddress;
+
/** Pre-configured factory for creating {@link ClientTransport} instances. */
public interface ClientTransportFactory extends ReferenceCounted {
- /** Creates an unstarted transport for exclusive use. */
- ClientTransport newClientTransport();
-
/**
- * Returns the authority of the channel. Typically, this should be in the form {@code host:port}.
- * Note that since there is not a scheme, there can't be a default port.
+ * Creates an unstarted transport for exclusive use.
+ *
+ * @param serverAddress the address that the transport is connected to
+ * @param authority the HTTP/2 authority of the server
*/
- String authority();
+ ClientTransport newClientTransport(SocketAddress serverAddress, String authority);
}
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 7f22fb3..b2b1660 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -351,7 +351,7 @@
/**
* Shared executor for channels.
*/
- static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
+ public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
new Resource<ExecutorService>() {
private static final String name = "grpc-default-executor";
@Override
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index bc01591..7c05c64 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -33,9 +33,11 @@
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -45,11 +47,21 @@
import io.grpc.Compressor;
import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
+import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
+import io.grpc.NameResolver;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.Status;
+import io.grpc.TransportManager;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -90,8 +102,15 @@
*/
private final Channel interceptorChannel;
+ private final NameResolver nameResolver;
+ private final LoadBalancer loadBalancer;
+
+ /**
+ * Maps addresses to transports for that server.
+ */
@GuardedBy("lock")
- private TransportSet transportSet;
+ private final Map<SocketAddress, TransportSet> transports =
+ new HashMap<SocketAddress, TransportSet>();
@GuardedBy("lock")
private boolean shutdown;
@@ -101,55 +120,21 @@
private volatile Compressor defaultCompressor;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
-
@Override
- public ListenableFuture<ClientTransport> get() {
- TransportSet transportSetCopy;
+ public ListenableFuture<ClientTransport> get(CallOptions callOptions) {
synchronized (lock) {
if (shutdown) {
return NULL_VALUE_TRANSPORT_FUTURE;
}
- if (transportSet == null) {
- transportSet = new TransportSet(backoffPolicyProvider, transportFactory,
- scheduledExecutor, new TransportSet.Callback() {
- @Override
- public void onTerminated() {
- synchronized (lock) {
- if (shutdown) {
- transportSet = null;
- if (terminated) {
- log.warning("transportTerminated called after already terminated");
- }
- terminated = true;
- lock.notifyAll();
- onChannelTerminated();
- }
- }
- }
- });
- }
- transportSetCopy = transportSet;
}
- return transportSetCopy.obtainActiveTransport();
+ return loadBalancer.pickTransport(callOptions.getRequestKey());
}
};
- // TODO(zhangkun83): remove this in favor of the one that accepts BackoffPolicy.Provider
- ManagedChannelImpl(ClientTransportFactory transportFactory, @Nullable Executor executor,
- @Nullable String userAgent, List<ClientInterceptor> interceptors) {
- this(new ExponentialBackoffPolicy.Provider(), transportFactory, executor, userAgent,
- interceptors);
- }
-
- ManagedChannelImpl(BackoffPolicy.Provider backoffPolicyProvider,
+ ManagedChannelImpl(URI targetUri, BackoffPolicy.Provider backoffPolicyProvider,
+ NameResolver.Factory nameResolverFactory, LoadBalancer.Factory loadBalancerFactory,
ClientTransportFactory transportFactory, @Nullable Executor executor,
@Nullable String userAgent, List<ClientInterceptor> interceptors) {
- this.backoffPolicyProvider = backoffPolicyProvider;
- this.transportFactory = transportFactory;
- this.userAgent = userAgent;
- this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
- scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
-
if (executor == null) {
usingSharedExecutor = true;
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@@ -157,7 +142,28 @@
usingSharedExecutor = false;
this.executor = executor;
}
+ this.backoffPolicyProvider = backoffPolicyProvider;
+ this.nameResolver = nameResolverFactory.newNameResolver(targetUri);
+ Preconditions.checkArgument(this.nameResolver != null,
+ "The given NameResolverFactory cannot resolve %s", targetUri);
+ this.loadBalancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
+ this.transportFactory = transportFactory;
+ this.userAgent = userAgent;
+ this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
+ scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
+ this.nameResolver.start(new NameResolver.Listener() {
+ @Override
+ public void onUpdate(List<ResolvedServerInfo> servers, Attributes config) {
+ loadBalancer.handleResolvedAddresses(servers, config);
+ }
+
+ @Override
+ public void onError(Status error) {
+ Preconditions.checkArgument(!error.isOk(), "the error status must not be OK");
+ loadBalancer.handleNameResolutionError(error);
+ }
+ });
}
/**
@@ -180,7 +186,7 @@
*/
@Override
public ManagedChannelImpl shutdown() {
- TransportSet transportSetCopy = null;
+ ArrayList<TransportSet> transportsCopy = new ArrayList<TransportSet>();
synchronized (lock) {
if (shutdown) {
return this;
@@ -188,16 +194,16 @@
shutdown = true;
// After shutdown there are no new calls, so no new cancellation tasks are needed
scheduledExecutor = SharedResourceHolder.release(TIMER_SERVICE, scheduledExecutor);
- if (transportSet == null) {
+ if (transports.isEmpty()) {
terminated = true;
lock.notifyAll();
onChannelTerminated();
} else {
- transportSetCopy = transportSet;
+ transportsCopy.addAll(transports.values());
}
}
- if (transportSetCopy != null) {
- transportSetCopy.shutdown();
+ for (TransportSet ts : transportsCopy) {
+ ts.shutdown();
}
return this;
}
@@ -276,7 +282,8 @@
@Override
public String authority() {
- return transportFactory.authority();
+ String authority = nameResolver.getServiceAuthority();
+ return Preconditions.checkNotNull(authority, "authority");
}
}
@@ -290,4 +297,43 @@
// Release the transport factory so that it can deallocate any resources.
transportFactory.release();
}
+
+ private final TransportManager tm = new TransportManager() {
+ @Override
+ public void updateRetainedTransports(SocketAddress[] addrs) {
+ // TODO(zhangkun83): warm-up new servers and discard removed servers.
+ }
+
+ @Override
+ public ListenableFuture<ClientTransport> getTransport(final SocketAddress addr) {
+ TransportSet ts;
+ synchronized (lock) {
+ if (shutdown) {
+ return null;
+ }
+ ts = transports.get(addr);
+ if (ts == null) {
+ ts = new TransportSet(addr, authority(), loadBalancer, backoffPolicyProvider,
+ transportFactory, scheduledExecutor, new TransportSet.Callback() {
+ @Override
+ public void onTerminated() {
+ synchronized (lock) {
+ transports.remove(addr);
+ if (shutdown && transports.isEmpty()) {
+ if (terminated) {
+ log.warning("transportTerminated called after already terminated");
+ }
+ terminated = true;
+ lock.notifyAll();
+ onChannelTerminated();
+ }
+ }
+ }
+ });
+ transports.put(addr, ts);
+ }
+ }
+ return ts.obtainActiveTransport();
+ }
+ };
}
diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java
index fc4e046..9d7effe 100644
--- a/core/src/main/java/io/grpc/internal/TransportSet.java
+++ b/core/src/main/java/io/grpc/internal/TransportSet.java
@@ -36,13 +36,16 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.LoadBalancer;
import io.grpc.Status;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -63,6 +66,8 @@
}
private final Object lock = new Object();
+ private final SocketAddress server;
+ private final String authority;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Callback callback;
private final ClientTransportFactory transportFactory;
@@ -84,6 +89,8 @@
@GuardedBy("lock")
private final Collection<ClientTransport> transports = new ArrayList<ClientTransport>();
+ private final LoadBalancer loadBalancer;
+
@GuardedBy("lock")
private boolean shutdown;
@@ -93,9 +100,12 @@
*/
private volatile SettableFuture<ClientTransport> activeTransportFuture;
- TransportSet(BackoffPolicy.Provider backoffPolicyProvider,
- ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
- Callback callback) {
+ TransportSet(SocketAddress server, String authority, LoadBalancer loadBalancer,
+ BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory,
+ ScheduledExecutorService scheduledExecutor, Callback callback) {
+ this.server = server;
+ this.authority = authority;
+ this.loadBalancer = loadBalancer;
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory = transportFactory;
this.scheduledExecutor = scheduledExecutor;
@@ -156,8 +166,10 @@
if (shutdown) {
return;
}
- ClientTransport newActiveTransport = transportFactory.newClientTransport();
- log.info("Created transport '" + newActiveTransport);
+ ClientTransport newActiveTransport = transportFactory.newClientTransport(
+ server, authority);
+ log.log(Level.INFO, "Created transport {0} for {1}",
+ new Object[] {newActiveTransport, server});
transports.add(newActiveTransport);
newActiveTransport.start(
new TransportListener(newActiveTransport, activeTransportFuture));
@@ -222,30 +234,34 @@
@Override
public void transportReady() {
synchronized (lock) {
- log.info("Transport '" + transport + " is ready");
+ log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, server});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
reconnectPolicy = null;
}
}
+ loadBalancer.transportReady(server, transport);
}
@Override
public void transportShutdown(Status s) {
synchronized (lock) {
- log.info("Transport '" + transport + " is being shutdown");
+ log.log(Level.INFO, "Transport {0} for {1} is being shutdown",
+ new Object[] {transport, server});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
createActiveTransportFuture();
}
}
+ loadBalancer.transportShutdown(server, transport, s);
}
@Override
public void transportTerminated() {
boolean runCallback = false;
synchronized (lock) {
- log.info("Transport '" + transport + " is terminated");
+ log.log(Level.INFO, "Transport {0} for {1} is terminated",
+ new Object[] {transport, server});
Preconditions.checkState(!isAttachedToActiveTransport(),
"Listener is still attached to activeTransportFuture. "
+ "Seems transportTerminated was not called.");
diff --git a/core/src/test/java/io/grpc/ManagedChannelProviderTest.java b/core/src/test/java/io/grpc/ManagedChannelProviderTest.java
index ab03caf..c8eae6182 100644
--- a/core/src/test/java/io/grpc/ManagedChannelProviderTest.java
+++ b/core/src/test/java/io/grpc/ManagedChannelProviderTest.java
@@ -113,6 +113,11 @@
protected ManagedChannelBuilder<?> builderForAddress(String host, int port) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ protected ManagedChannelBuilder<?> builderForTarget(String target) {
+ throw new UnsupportedOperationException();
+ }
}
public static class Available0Provider extends BaseProvider {
diff --git a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java
new file mode 100644
index 0000000..f79f618
--- /dev/null
+++ b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+
+/** Unit test for {@link SimpleLoadBalancerFactory}. */
+@RunWith(JUnit4.class)
+public class SimpleLoadBalancerTest {
+ private LoadBalancer loadBalancer;
+
+ private ArrayList<ResolvedServerInfo> servers;
+
+ @Mock
+ private TransportManager mockTransportManager;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer(
+ "fakeservice", mockTransportManager);
+ servers = new ArrayList<ResolvedServerInfo>();
+ for (int i = 0; i < 3; i++) {
+ servers.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i), Attributes.EMPTY));
+ }
+ }
+
+ @Test
+ public void pickBeforeResolved() throws Exception {
+ ClientTransport mockTransport = mock(ClientTransport.class);
+ SettableFuture<ClientTransport> sourceFuture = SettableFuture.create();
+ when(mockTransportManager.getTransport(same(servers.get(0).getAddress())))
+ .thenReturn(sourceFuture);
+ ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
+ ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
+ assertNotNull(f1);
+ assertNotNull(f2);
+ assertNotSame(f1, f2);
+ assertFalse(f1.isDone());
+ assertFalse(f2.isDone());
+ verify(mockTransportManager, never()).getTransport(any(SocketAddress.class));
+ loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
+ verify(mockTransportManager, times(2)).getTransport(same(servers.get(0).getAddress()));
+ assertFalse(f1.isDone());
+ assertFalse(f2.isDone());
+ assertNotSame(sourceFuture, f1);
+ assertNotSame(sourceFuture, f2);
+ sourceFuture.set(mockTransport);
+ assertSame(mockTransport, f1.get());
+ assertSame(mockTransport, f2.get());
+ ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
+ assertSame(sourceFuture, f3);
+ verify(mockTransportManager, times(3)).getTransport(same(servers.get(0).getAddress()));
+ verifyNoMoreInteractions(mockTransportManager);
+ }
+
+ @Test
+ public void transportFailed() throws Exception {
+ ClientTransport mockTransport1 = mock(ClientTransport.class);
+ ClientTransport mockTransport2 = mock(ClientTransport.class);
+ when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))).thenReturn(
+ Futures.immediateFuture(mockTransport1));
+ when(mockTransportManager.getTransport(same(servers.get(1).getAddress()))).thenReturn(
+ Futures.immediateFuture(mockTransport2));
+ loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
+ ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
+ ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
+ assertSame(mockTransport1, f1.get());
+ assertSame(mockTransport1, f2.get());
+ loadBalancer.transportShutdown(servers.get(0).getAddress(), mockTransport1, Status.INTERNAL);
+ ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
+ assertSame(mockTransport2, f3.get());
+ }
+
+ private static class FakeSocketAddress extends SocketAddress {
+ final String name;
+
+ FakeSocketAddress(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSocketAddress-" + name;
+ }
+ }
+
+}
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index b81f639..5ede4c1 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -94,7 +94,7 @@
final ClientStream stream = mock(ClientStream.class);
ClientTransportProvider provider = new ClientTransportProvider() {
@Override
- public ListenableFuture<ClientTransport> get() {
+ public ListenableFuture<ClientTransport> get(CallOptions callOptions) {
return Futures.immediateFuture(transport);
}
};
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 1ed153f..3e52395 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -37,6 +37,7 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
@@ -47,6 +48,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -56,6 +58,9 @@
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.grpc.NameResolver;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.SimpleLoadBalancerFactory;
import io.grpc.Status;
import io.grpc.StringMarshaller;
@@ -70,6 +75,8 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.net.SocketAddress;
+import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -81,15 +88,23 @@
/** Unit tests for {@link ManagedChannelImpl}. */
@RunWith(JUnit4.class)
public class ManagedChannelImplTest {
+ private static final List<ClientInterceptor> NO_INTERCEPTOR =
+ Collections.<ClientInterceptor>emptyList();
private final MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodDescriptor.MethodType.UNKNOWN, "/service/method",
new StringMarshaller(), new IntegerMarshaller());
private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final String serviceName = "fake.example.com";
+ private final URI target = URI.create("//" + serviceName);
+ private final String authority = serviceName;
+ private final SocketAddress socketAddress = new SocketAddress() {};
+ private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
@Mock
private ClientTransport mockTransport;
@Mock
private ClientTransportFactory mockTransportFactory;
+
private ManagedChannel channel;
@Mock
@@ -104,16 +119,18 @@
private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
- private void createChannel(List<ClientInterceptor> interceptors) throws Exception {
- channel = new ManagedChannelImpl(new FakeBackoffPolicyProvider(), mockTransportFactory,
- executor, null, interceptors);
+ private ManagedChannel createChannel(
+ NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
+ return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
+ nameResolverFactory, SimpleLoadBalancerFactory.getInstance(),
+ mockTransportFactory, executor, null, interceptors);
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- createChannel(Collections.<ClientInterceptor>emptyList());
- when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
+ when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
+ .thenReturn(mockTransport);
}
@After
@@ -123,6 +140,7 @@
@Test
public void immediateDeadlineExceeded() {
+ ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withDeadlineNanoTime(System.nanoTime()));
call.start(mockCallListener, new Metadata());
@@ -132,6 +150,7 @@
@Test
public void shutdownWithNoTransportsEverCreated() {
+ ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
verifyNoMoreInteractions(mockTransportFactory);
channel.shutdown();
assertTrue(channel.isShutdown());
@@ -140,6 +159,8 @@
@Test
public void twoCallsAndGracefulShutdown() {
+ ManagedChannel channel = createChannel(
+ new FakeNameResolverFactory(server), Collections.<ClientInterceptor>emptyList());
verifyNoMoreInteractions(mockTransportFactory);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
verifyNoMoreInteractions(mockTransportFactory);
@@ -148,11 +169,13 @@
ClientTransport mockTransport = mock(ClientTransport.class);
ClientStream mockStream = mock(ClientStream.class);
Metadata headers = new Metadata();
- when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
+ when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
+ .thenReturn(mockTransport);
when(mockTransport.newStream(same(method), same(headers), any(ClientStreamListener.class)))
.thenReturn(mockStream);
call.start(mockCallListener, headers);
- verify(mockTransportFactory, timeout(1000)).newClientTransport();
+ verify(mockTransportFactory, timeout(1000))
+ .newClientTransport(same(socketAddress), eq(authority));
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
ClientTransport.Listener transportListener = transportListenerCaptor.getValue();
verify(mockTransport, timeout(1000))
@@ -206,6 +229,7 @@
@Test
public void transportFailsOnStart() {
+ ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
Status goldenStatus = Status.INTERNAL.withDescription("wanted it to fail");
// mockTransport2 shuts immediately during start
@@ -230,10 +254,12 @@
when(mockTransport2.newStream(same(method), same(headers2), any(ClientStreamListener.class)))
.thenReturn(mockStream2);
// The factory returns the immediately shut-down transport first, then the normal one
- when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport2, mockTransport);
+ when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
+ .thenReturn(mockTransport2, mockTransport);
call.start(mockCallListener2, headers2);
- verify(mockTransportFactory, timeout(1000).times(2)).newClientTransport();
+ verify(mockTransportFactory, timeout(1000).times(2))
+ .newClientTransport(same(socketAddress), eq(authority));
verify(mockTransport2, timeout(1000)).start(any(ClientTransport.Listener.class));
verify(mockTransport2, timeout(1000))
.newStream(same(method), same(headers2), streamListenerCaptor.capture());
@@ -268,7 +294,7 @@
transportListenerCaptor.getValue().transportTerminated();
assertTrue(channel.isTerminated());
- verify(mockTransportFactory, times(2)).newClientTransport();
+ verify(mockTransportFactory, times(2)).newClientTransport(same(socketAddress), eq(authority));
verifyNoMoreInteractions(mockTransport);
verifyNoMoreInteractions(mockTransport2);
verifyNoMoreInteractions(mockStream2);
@@ -286,13 +312,15 @@
return next.newCall(method, callOptions);
}
};
- createChannel(Arrays.asList(interceptor));
+ ManagedChannel channel = createChannel(
+ new FakeNameResolverFactory(server), Arrays.asList(interceptor));
assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
assertEquals(1, atomic.get());
}
@Test
public void testNoDeadlockOnShutdown() {
+ ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
// Force creation of transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
@@ -341,6 +369,18 @@
transportListener.transportTerminated();
}
+ @Test
+ public void nameResolutionFailed() {
+ Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
+ ManagedChannel channel = createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR);
+ ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
+ call.start(mockCallListener, new Metadata());
+ ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
+ verify(mockCallListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class));
+ Status status = statusCaptor.getValue();
+ assertSame(error, status);
+ }
+
private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
@Override
public BackoffPolicy get() {
@@ -352,4 +392,53 @@
};
}
}
+
+ private class FakeNameResolverFactory extends NameResolver.Factory {
+ final ResolvedServerInfo server;
+
+ FakeNameResolverFactory(ResolvedServerInfo server) {
+ this.server = server;
+ }
+
+ @Override
+ public NameResolver newNameResolver(final URI targetUri) {
+ assertEquals(null, targetUri.getScheme());
+ assertEquals(serviceName, targetUri.getAuthority());
+ return new NameResolver() {
+ @Override public String getServiceAuthority() {
+ assertNotNull(targetUri.toString() + " has authority", targetUri.getAuthority());
+ return targetUri.getAuthority();
+ }
+
+ @Override public void start(final Listener listener) {
+ listener.onUpdate(Collections.singletonList(server), Attributes.EMPTY);
+ }
+
+ @Override public void shutdown() {}
+ };
+ }
+ }
+
+ private class FailingNameResolverFactory extends NameResolver.Factory {
+ final Status error;
+
+ FailingNameResolverFactory(Status error) {
+ this.error = error;
+ }
+
+ @Override
+ public NameResolver newNameResolver(URI notUsedUri) {
+ return new NameResolver() {
+ @Override public String getServiceAuthority() {
+ return "irrelevant-authority";
+ }
+
+ @Override public void start(final Listener listener) {
+ listener.onError(error);
+ }
+
+ @Override public void shutdown() {}
+ };
+ }
+ }
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 8acac65..580f808 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -50,6 +50,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.net.URI;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
@@ -61,8 +62,6 @@
public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<NettyChannelBuilder> {
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
- private final SocketAddress serverAddress;
- private String authority;
private NegotiationType negotiationType = NegotiationType.TLS;
private Class<? extends Channel> channelType = NioSocketChannel.class;
@Nullable
@@ -78,30 +77,39 @@
* noticing changes to DNS.
*/
public static NettyChannelBuilder forAddress(SocketAddress serverAddress) {
- String authority;
- if (serverAddress instanceof InetSocketAddress) {
- InetSocketAddress address = (InetSocketAddress) serverAddress;
- authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
- } else {
- // Specialized address types are allowed to support custom Channel types so just assume their
- // toString() values are valid :authority values. We defer checking validity of authority
- // until buildTransportFactory() to provide the user an opportunity to override the value.
- authority = serverAddress.toString();
- }
- return new NettyChannelBuilder(serverAddress, authority);
+ return new NettyChannelBuilder(serverAddress);
}
/**
* Creates a new builder with the given host and port.
*/
public static NettyChannelBuilder forAddress(String host, int port) {
- return new NettyChannelBuilder(
- new InetSocketAddress(host, port), GrpcUtil.authorityFromHostAndPort(host, port));
+ return forAddress(new InetSocketAddress(host, port));
}
- protected NettyChannelBuilder(SocketAddress serverAddress, String authority) {
- this.serverAddress = serverAddress;
- this.authority = authority;
+ /**
+ * Creates a new builder with the given target URI that will be resolved by
+ * {@link io.grpc.NameResolver}.
+ */
+ public static NettyChannelBuilder forTarget(String targetUri) {
+ return new NettyChannelBuilder(URI.create(targetUri));
+ }
+
+ private NettyChannelBuilder(URI target) {
+ super(target);
+ }
+
+ protected NettyChannelBuilder(SocketAddress address) {
+ super(address, getAuthorityFromAddress(address));
+ }
+
+ private static String getAuthorityFromAddress(SocketAddress address) {
+ if (address instanceof InetSocketAddress) {
+ InetSocketAddress inetAddress = (InetSocketAddress) address;
+ return GrpcUtil.authorityFromHostAndPort(inetAddress.getHostString(), inetAddress.getPort());
+ } else {
+ return address.toString();
+ }
}
/**
@@ -180,28 +188,14 @@
}
@Override
- public final NettyChannelBuilder overrideAuthority(String authority) {
- this.authority = checkAuthority(authority);
- return this;
+ protected ClientTransportFactory buildTransportFactory() {
+ return new NettyTransportFactory(channelType, negotiationType, sslContext,
+ eventLoopGroup, flowControlWindow, maxMessageSize);
}
- /**
- * Verifies the authority is valid. This method exists as an escape hatch for putting in an
- * authority that is valid, but would fail the default validation provided by this implementation.
- */
- protected String checkAuthority(String authority) {
- return GrpcUtil.checkAuthority(authority);
- }
-
- @Override
- protected final ClientTransportFactory buildTransportFactory() {
- // Check authority, since non-inet ServerAddresses delay the authority check.
- checkAuthority(authority);
- return new NettyTransportFactory(serverAddress, authority, channelType, eventLoopGroup,
- flowControlWindow, createProtocolNegotiator(), maxMessageSize);
- }
-
- private ProtocolNegotiator createProtocolNegotiator() {
+ private static ProtocolNegotiator createProtocolNegotiator(
+ String authority, NegotiationType negotiationType, SslContext sslContext) {
+ ProtocolNegotiator negotiator;
switch (negotiationType) {
case PLAINTEXT:
return ProtocolNegotiators.plaintext();
@@ -223,29 +217,25 @@
private static class NettyTransportFactory extends AbstractReferenceCounted
implements ClientTransportFactory {
- private final SocketAddress serverAddress;
private final Class<? extends Channel> channelType;
+ private final NegotiationType negotiationType;
+ private final SslContext sslContext;
private final EventLoopGroup group;
private final boolean usingSharedGroup;
private final int flowControlWindow;
- private final ProtocolNegotiator negotiator;
private final int maxMessageSize;
- private final String authority;
- private NettyTransportFactory(SocketAddress serverAddress,
- String authority,
- Class<? extends Channel> channelType,
+ private NettyTransportFactory(Class<? extends Channel> channelType,
+ NegotiationType negotiationType,
+ SslContext sslContext,
EventLoopGroup group,
int flowControlWindow,
- ProtocolNegotiator negotiator,
int maxMessageSize) {
- this.serverAddress = serverAddress;
this.channelType = channelType;
+ this.negotiationType = negotiationType;
+ this.sslContext = sslContext;
this.flowControlWindow = flowControlWindow;
- this.negotiator = negotiator;
this.maxMessageSize = maxMessageSize;
- this.authority = authority;
-
usingSharedGroup = group == null;
if (usingSharedGroup) {
// The group was unspecified, using the shared group.
@@ -256,14 +246,11 @@
}
@Override
- public ClientTransport newClientTransport() {
- return new NettyClientTransport(serverAddress, channelType, group, negotiator,
- flowControlWindow, maxMessageSize, authority);
- }
-
- @Override
- public String authority() {
- return authority;
+ public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
+ GrpcUtil.checkAuthority(authority);
+ return new NettyClientTransport(serverAddress, channelType, group,
+ createProtocolNegotiator(authority, negotiationType, sslContext),
+ flowControlWindow, maxMessageSize, authority);
}
@Override
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java b/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java
index 628f803..2b13b6e 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java
@@ -51,4 +51,9 @@
protected NettyChannelBuilder builderForAddress(String name, int port) {
return NettyChannelBuilder.forAddress(name, port);
}
+
+ @Override
+ protected NettyChannelBuilder builderForTarget(String target) {
+ return NettyChannelBuilder.forTarget(target);
+ }
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java
index bbddd4e..06840bb 100644
--- a/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java
@@ -31,8 +31,6 @@
package io.grpc.netty;
-import static org.junit.Assert.assertEquals;
-
import io.grpc.internal.ClientTransportFactory;
import org.junit.Rule;
@@ -51,18 +49,28 @@
@Test
public void overrideAllowsInvalidAuthority() {
- NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){}, "") {
+ NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){}) {
@Override
protected String checkAuthority(String authority) {
return authority;
}
};
- ClientTransportFactory factory = builder.overrideAuthority("invalid_authority")
+ ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
+ }
- assertEquals("invalid_authority", factory.authority());
+ @Test
+ public void failOverrideInvalidAuthority() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid authority:");
+
+ NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){});
+
+ ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .buildTransportFactory();
}
@Test
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index 02f7176..01b7381 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -50,6 +50,9 @@
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -99,19 +102,27 @@
return new OkHttpChannelBuilder(host, port);
}
+ /**
+ * Creates a new builder for the given target URI that will be resolved by
+ * {@link io.grpc.NameResolver}.
+ */
+ public static OkHttpChannelBuilder forTarget(String targetUri) {
+ return new OkHttpChannelBuilder(URI.create(targetUri));
+ }
+
private Executor transportExecutor;
- private final String host;
- private final int port;
- private String authority;
+
private SSLSocketFactory sslSocketFactory;
private ConnectionSpec connectionSpec = DEFAULT_CONNECTION_SPEC;
private NegotiationType negotiationType = NegotiationType.TLS;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
protected OkHttpChannelBuilder(String host, int port) {
- this.host = Preconditions.checkNotNull(host);
- this.port = port;
- this.authority = GrpcUtil.authorityFromHostAndPort(host, port);
+ this(URI.create("dns:///" + GrpcUtil.authorityFromHostAndPort(host, port)));
+ }
+
+ private OkHttpChannelBuilder(URI target) {
+ super(target);
}
/**
@@ -126,27 +137,6 @@
}
/**
- * Overrides the host used with TLS and HTTP virtual hosting. It does not change what host is
- * actually connected to. This method differs from {@link #overrideAuthority(String)} in that it
- * appends the port number to the host provided.
- *
- * <p>Should only used by tests.
- *
- * @deprecated use {@link #overrideAuthority} instead
- */
- @Deprecated
- public final OkHttpChannelBuilder overrideHostForAuthority(String host) {
- this.authority = GrpcUtil.authorityFromHostAndPort(host, this.port);
- return this;
- }
-
- @Override
- public final OkHttpChannelBuilder overrideAuthority(String authority) {
- this.authority = checkAuthority(authority);
- return this;
- }
-
- /**
* Sets the negotiation type for the HTTP/2 connection.
*
* <p>Default: <code>TLS</code>
@@ -205,18 +195,10 @@
@Override
protected final ClientTransportFactory buildTransportFactory() {
- return new OkHttpTransportFactory(host, port, authority, transportExecutor,
+ return new OkHttpTransportFactory(transportExecutor,
createSocketFactory(), connectionSpec, maxMessageSize);
}
- /**
- * Verifies the authority is valid. This method exists as an escape hatch for putting in an
- * authority that is valid, but would fail the default validation provided by this implementation.
- */
- protected String checkAuthority(String authority) {
- return GrpcUtil.checkAuthority(authority);
- }
-
private SSLSocketFactory createSocketFactory() {
switch (negotiationType) {
case TLS:
@@ -231,25 +213,16 @@
private static class OkHttpTransportFactory extends AbstractReferenceCounted
implements ClientTransportFactory {
- private final String host;
- private final int port;
- private final String authority;
private final Executor executor;
private final boolean usingSharedExecutor;
private final SSLSocketFactory socketFactory;
private final ConnectionSpec connectionSpec;
private final int maxMessageSize;
- private OkHttpTransportFactory(String host,
- int port,
- String authority,
- Executor executor,
+ private OkHttpTransportFactory(Executor executor,
SSLSocketFactory socketFactory,
ConnectionSpec connectionSpec,
int maxMessageSize) {
- this.host = host;
- this.port = port;
- this.authority = authority;
this.socketFactory = socketFactory;
this.connectionSpec = connectionSpec;
this.maxMessageSize = maxMessageSize;
@@ -264,14 +237,10 @@
}
@Override
- public ClientTransport newClientTransport() {
- return new OkHttpClientTransport(host, port, authority, executor, socketFactory,
- Utils.convertSpec(connectionSpec), maxMessageSize);
- }
-
- @Override
- public String authority() {
- return authority;
+ public ClientTransport newClientTransport(SocketAddress addr, String authority) {
+ InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
+ return new OkHttpClientTransport(inetSocketAddr.getHostString(), inetSocketAddr.getPort(),
+ authority, executor, socketFactory, Utils.convertSpec(connectionSpec), maxMessageSize);
}
@Override
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java
index c915dcc..9b3e34f 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java
@@ -61,4 +61,9 @@
protected OkHttpChannelBuilder builderForAddress(String name, int port) {
return OkHttpChannelBuilder.forAddress(name, port);
}
+
+ @Override
+ protected OkHttpChannelBuilder builderForTarget(String target) {
+ return OkHttpChannelBuilder.forTarget(target);
+ }
}
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java
index a1fe95d..b80a7fd 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java
@@ -31,8 +31,6 @@
package io.grpc.okhttp;
-import static org.junit.Assert.assertEquals;
-
import io.grpc.internal.ClientTransportFactory;
import org.junit.Rule;
@@ -55,11 +53,20 @@
}
};
- ClientTransportFactory factory = builder.overrideAuthority("invalid_authority")
+ ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
+ }
- assertEquals("invalid_authority", factory.authority());
+ @Test
+ public void failOverrideInvalidAuthority() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid authority:");
+ OkHttpChannelBuilder builder = new OkHttpChannelBuilder("good", 1234);
+
+ ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .buildTransportFactory();
}
@Test