Load-balancing ManagedChannelImpl.

- Add NameResolver and LoadBalancer interfaces.
- ManagedChannelImpl now uses NameResolver and LoadBalancer for
  transport selection, which may return transports to multiple
  addresses.
- Transports are still owned by ManagedChannelImpl, which implements
  TransportManager interface that is provided to LoadBalancer, so that
  LoadBalancer doesn't worry about Transport lifecycles.
- Channel builders can be created by forTarget() that accepts the fully
  qualified target name, which is an URI. (TODO) it's not tested.
- The old address-based construction pattern is supported by using
  AbstractManagedChannelImplBuilder.DirectAddressNameResolver.
- (TODO) DnsNameResolver and SimpleLoadBalancer are currently
  incomplete. They merely work for the single-address scenario.
diff --git a/core/src/main/java/io/grpc/Attributes.java b/core/src/main/java/io/grpc/Attributes.java
new file mode 100644
index 0000000..9508ecf
--- /dev/null
+++ b/core/src/main/java/io/grpc/Attributes.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * An immutable type-safe container of attributes.
+ */
+@ExperimentalApi
+@Immutable
+public final class Attributes {
+
+  private final HashMap<String, Object> data = new HashMap<String, Object>();
+
+  public static final Attributes EMPTY = new Attributes();
+
+  private Attributes() {
+  }
+
+  /**
+   * Gets the value for the key, or {@code null} if it's not present.
+   */
+  @SuppressWarnings("unchecked")
+  @Nullable
+  public <T> T get(Key<T> key) {
+    return (T) data.get(key.name);
+  }
+
+  /**
+   * Create a new builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static final class Key<T> {
+    private final String name;
+
+    /**
+     * Construct the key.
+     *
+     * @param name the name, which should be namespaced like com.foo.BarAttribute to avoid
+     *             collision.
+     */
+    public Key(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return data.toString();
+  }
+
+  public static final class Builder {
+    private Attributes product;
+
+    private Builder() {
+      this.product = new Attributes();
+    }
+
+    public <T> void set(Key<T> key, T value) {
+      product.data.put(key.name, value);
+    }
+
+    /**
+     * Build the attributes. Can only be called once.
+     */
+    public Attributes build() {
+      Preconditions.checkState(product != null, "Already built");
+      Attributes result = product;
+      product = null;
+      return result;
+    }
+  }
+}
diff --git a/core/src/main/java/io/grpc/CallOptions.java b/core/src/main/java/io/grpc/CallOptions.java
index f73c2ed..5d7f2e2 100644
--- a/core/src/main/java/io/grpc/CallOptions.java
+++ b/core/src/main/java/io/grpc/CallOptions.java
@@ -62,6 +62,9 @@
   @Nullable
   private String authority;
 
+  @Nullable
+  private RequestKey requestKey;
+
   /**
    * Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
    * generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
@@ -131,6 +134,25 @@
   }
 
   /**
+   * Returns a new {@code CallOptions} with a request key for affinity-based routing.
+   */
+  @ExperimentalApi
+  public CallOptions withRequestKey(@Nullable RequestKey requestKey) {
+    CallOptions newOptions = new CallOptions(this);
+    newOptions.requestKey = requestKey;
+    return newOptions;
+  }
+
+  /**
+   * Returns the request key for affinity-based routing.
+   */
+  @ExperimentalApi
+  @Nullable
+  public RequestKey getRequestKey() {
+    return requestKey;
+  }
+
+  /**
    * Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
    * generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
    * services, even if those services are hosted on different domain names. That assumes the
@@ -155,6 +177,7 @@
     deadlineNanoTime = other.deadlineNanoTime;
     compressor = other.compressor;
     authority = other.authority;
+    requestKey = other.requestKey;
   }
 
   @SuppressWarnings("deprecation") // guava 14.0
diff --git a/core/src/main/java/io/grpc/DnsNameResolverFactory.java b/core/src/main/java/io/grpc/DnsNameResolverFactory.java
new file mode 100644
index 0000000..e436747
--- /dev/null
+++ b/core/src/main/java/io/grpc/DnsNameResolverFactory.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.base.Preconditions;
+
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.SharedResourceHolder;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory for DNS-based {@link NameResolver}s.
+ *
+ * <p>The format of the target URI is {@code "[dns://[<DNS_server_address>]/]<name>"}.
+ */
+@ExperimentalApi
+public final class DnsNameResolverFactory extends NameResolver.Factory {
+
+  private static final DnsNameResolverFactory instance = new DnsNameResolverFactory();
+
+  @Override
+  public NameResolver newNameResolver(URI targetUri) {
+    String scheme = targetUri.getScheme();
+    if (scheme == null) {
+      return new DnsNameResolver(null, targetUri.toString());
+    } else if (scheme.equals("dns")) {
+      String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
+      Preconditions.checkArgument(targetPath.startsWith("/"),
+          "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
+      String name = targetPath.substring(1);
+      return new DnsNameResolver(targetUri.getAuthority(), name);
+    } else {
+      return null;
+    }
+  }
+
+  private DnsNameResolverFactory() {
+  }
+
+  public static DnsNameResolverFactory getInstance() {
+    return instance;
+  }
+
+  private static class DnsNameResolver extends NameResolver {
+    private final String authority;
+    private final String host;
+    private final int port;
+    private ExecutorService executor;
+
+    DnsNameResolver(@Nullable String nsAuthority, String name) {
+      // TODO: if a DNS server is provided as nsAuthority, use it.
+      // https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
+
+      // Must prepend a "//" to the name when constructing a URI, otherwise
+      // the authority and host of the resulted URI would be null.
+      URI nameUri = URI.create("//" + name);
+      authority = Preconditions.checkNotNull(nameUri.getAuthority(),
+          "nameUri (%s) doesn't have an authority", nameUri);
+      host = Preconditions.checkNotNull(nameUri.getHost(), "host");
+      port = nameUri.getPort();
+      Preconditions.checkArgument(port > 0, "port (%s) must be positive", port);
+    }
+
+    @Override
+    public String getServiceAuthority() {
+      return authority;
+    }
+
+    @Override
+    public synchronized void start(final Listener listener) {
+      Preconditions.checkState(executor == null, "already started");
+      executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          InetAddress[] inetAddrs;
+          try {
+            inetAddrs = InetAddress.getAllByName(host);
+          } catch (Exception e) {
+            listener.onError(Status.UNAVAILABLE.withCause(e));
+            return;
+          }
+          ArrayList<ResolvedServerInfo> servers
+              = new ArrayList<ResolvedServerInfo>(inetAddrs.length);
+          for (int i = 0; i < inetAddrs.length; i++) {
+            InetAddress inetAddr = inetAddrs[i];
+            servers.add(
+                new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
+          }
+          listener.onUpdate(servers, Attributes.EMPTY);
+        }
+      });
+    }
+
+    @Override
+    public synchronized void shutdown() {
+      if (executor != null) {
+        executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
new file mode 100644
index 0000000..3449021
--- /dev/null
+++ b/core/src/main/java/io/grpc/LoadBalancer.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A pluggable component that receives resolved addresses from {@link NameResolver} and provides the
+ * channel a usable transport when asked.
+ *
+ * <p>Note to implementations: all methods are expected to return quickly. Any work that may block
+ * should be done asynchronously.
+ */
+// TODO(zhangkun83): since it's also used for non-loadbalancing cases like pick-first,
+// "RequestRouter" might be a better name.
+@ExperimentalApi
+@ThreadSafe
+public abstract class LoadBalancer {
+  /**
+   * Pick a transport that Channel will use for next RPC.
+   *
+   * @param requestKey for affinity-based routing
+   */
+  public abstract ListenableFuture<ClientTransport> pickTransport(
+      @Nullable RequestKey requestKey);
+
+  /**
+   * Shuts down this {@code LoadBalancer}.
+   */
+  public void shutdown() { }
+
+  /**
+   * Handles newly resolved addresses and service config from name resolution system.
+   *
+   * <p>Implementations should not modify the given {@code servers}.
+   */
+  public void handleResolvedAddresses(List<ResolvedServerInfo> servers, Attributes config) { }
+
+  /**
+   * Handles an error from the name resolution system.
+   *
+   * @param error a non-OK status
+   */
+  public void handleNameResolutionError(Status error) { }
+
+  /**
+   * Called when a transport is fully connected and ready to accept traffic.
+   */
+  public void transportReady(SocketAddress addr, ClientTransport transport) { }
+
+  /**
+   * Called when a transport is shutting down.
+   */
+  public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { }
+
+  public abstract static class Factory {
+    /**
+     * Creates a {@link LoadBalancer} that will be used inside a channel.
+     *
+     * @param serviceName the DNS-style service name, which is also the authority
+     * @param tm the interface where an {@code LoadBalancer} implementation gets connected
+     *               transports from
+     */
+    public abstract LoadBalancer newLoadBalancer(String serviceName, TransportManager tm);
+  }
+}
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 0e5e33b..17c3091 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -44,6 +44,11 @@
     return ManagedChannelProvider.provider().builderForAddress(name, port);
   }
 
+  @ExperimentalApi
+  public static ManagedChannelBuilder<?> forTarget(String target) {
+    return ManagedChannelProvider.provider().builderForTarget(target);
+  }
+
   /**
    * Provides a custom executor.
    *
@@ -99,6 +104,24 @@
   @ExperimentalApi("primarily for testing")
   public abstract T usePlaintext(boolean skipNegotiation);
 
+  /*
+   * Provides a custom {@link NameResolver.Factory} for the channel.
+   *
+   * <p>If this method is not called, the builder will look up in the global resolver registry for
+   * a factory for the provided target.
+   */
+  @ExperimentalApi
+  public abstract T nameResolverFactory(NameResolver.Factory resolverFactory);
+
+  /**
+   * Provides a custom {@link LoadBalancer.Factory} for the channel.
+   *
+   * <p>If this method is not called, the builder will use {@link SimpleLoadBalancerFactory} for the
+   * channel.
+   */
+  @ExperimentalApi
+  public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
+
   /**
    * Builds a channel using the given parameters.
    */
diff --git a/core/src/main/java/io/grpc/ManagedChannelProvider.java b/core/src/main/java/io/grpc/ManagedChannelProvider.java
index 991b8e1..434a007 100644
--- a/core/src/main/java/io/grpc/ManagedChannelProvider.java
+++ b/core/src/main/java/io/grpc/ManagedChannelProvider.java
@@ -106,6 +106,12 @@
    */
   protected abstract ManagedChannelBuilder<?> builderForAddress(String name, int port);
 
+  /**
+   * Creates a new builder with the given target URI.
+   */
+  @ExperimentalApi
+  protected abstract ManagedChannelBuilder<?> builderForTarget(String target);
+
   public static final class ProviderNotFoundException extends RuntimeException {
     public ProviderNotFoundException(String msg) {
       super(msg);
diff --git a/core/src/main/java/io/grpc/NameResolver.java b/core/src/main/java/io/grpc/NameResolver.java
new file mode 100644
index 0000000..22a5e61
--- /dev/null
+++ b/core/src/main/java/io/grpc/NameResolver.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import java.net.URI;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A pluggable component that resolves a target URI (which is broken down into 3 parts as described
+ * below) and return addresses to the caller.
+ *
+ * <p>The format of the target URI is {@code "[<scheme>:]<scheme-specific-string>"}
+ *
+ * <p>{@code NameResolver} has no knowledge of load-balancing. The addresses of a target may be
+ * changed over time, thus the caller registers a {@link Listener} to receive continuous updates.
+ */
+@ExperimentalApi
+@ThreadSafe
+public abstract class NameResolver {
+  /**
+   * Returns the authority, which is also the name of the service.
+   *
+   * <p>An implementation must generate it locally and must keep it unchanged.
+   */
+  public abstract String getServiceAuthority();
+
+  /**
+   * Starts the resolution.
+   *
+   * @param listener used to receive updates on the target
+   */
+  public abstract void start(Listener listener);
+
+  /**
+   * Stops the resolution. Updates to the Listener will stop.
+   */
+  public abstract void shutdown();
+
+  public abstract static class Factory {
+    /**
+     * Creates a {@link NameResolver} for the given target URI, or {@code null} if the given URI
+     * cannot be resolved by this factory.
+     */
+    @Nullable
+    public abstract NameResolver newNameResolver(URI targetUri);
+  }
+
+  /**
+   * Receives address updates.
+   *
+   * <p>All methods are expected to return quickly.
+   */
+  @ThreadSafe
+  public interface Listener {
+    /**
+     * Handles updates on resolved addresses and config.
+     *
+     * <p>Implementations will not modify the given {@code servers}.
+     */
+    void onUpdate(List<ResolvedServerInfo> servers, Attributes config);
+
+    /**
+     * Handles an error from the resolver.
+     *
+     * @param error a non-OK status
+     */
+    void onError(Status error);
+  }
+}
diff --git a/core/src/main/java/io/grpc/RequestKey.java b/core/src/main/java/io/grpc/RequestKey.java
new file mode 100644
index 0000000..6ba6bcf
--- /dev/null
+++ b/core/src/main/java/io/grpc/RequestKey.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+/**
+ * A key generated from an RPC request, and to be used for affinity-based
+ * routing.
+ */
+@ExperimentalApi
+public final class RequestKey {
+
+  // TODO(zhangkun83): materialize this class once we decide the form of the affinity key.
+  private RequestKey() {
+  }
+}
diff --git a/core/src/main/java/io/grpc/ResolvedServerInfo.java b/core/src/main/java/io/grpc/ResolvedServerInfo.java
new file mode 100644
index 0000000..2e914de
--- /dev/null
+++ b/core/src/main/java/io/grpc/ResolvedServerInfo.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import java.net.SocketAddress;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * The information about a server from a {@link NameResolver}.
+ */
+@ExperimentalApi
+@Immutable
+public final class ResolvedServerInfo {
+  private final SocketAddress address;
+  private final Attributes attributes;
+
+  /**
+   * Constructor.
+   *
+   * @param address the address object
+   * @param attributes attributes associated with this address.
+   */
+  public ResolvedServerInfo(SocketAddress address, Attributes attributes) {
+    this.address = address;
+    this.attributes = attributes;
+  }
+
+  /**
+   * Returns the address.
+   */
+  public SocketAddress getAddress() {
+    return address;
+  }
+
+  /**
+   * Returns the associated attributes.
+   */
+  public Attributes getAttributes() {
+    return attributes;
+  }
+
+  @Override
+  public String toString() {
+    return "[address=" + address + ", attrs=" + attributes + "]";
+  }
+}
diff --git a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java
new file mode 100644
index 0000000..b8d07f0
--- /dev/null
+++ b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2014, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A {@link LoadBalancer} that provides simple round-robin and pick-first routing mechanism over the
+ * addresses from the {@link NameResolver}.
+ */
+// TODO(zhangkun83): Only pick-first is implemented. We need to implement round-robin.
+public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
+
+  private static final SimpleLoadBalancerFactory instance = new SimpleLoadBalancerFactory();
+
+  private SimpleLoadBalancerFactory() {
+  }
+
+  public static SimpleLoadBalancerFactory getInstance() {
+    return instance;
+  }
+
+  @Override
+  public LoadBalancer newLoadBalancer(String serviceName, TransportManager tm) {
+    return new SimpleLoadBalancer(tm);
+  }
+
+  private static class SimpleLoadBalancer extends LoadBalancer {
+    @GuardedBy("servers")
+    private final List<ResolvedServerInfo> servers = new ArrayList<ResolvedServerInfo>();
+    @GuardedBy("servers")
+    private int currentServerIndex;
+    // TODO(zhangkun83): virtually any LoadBalancer would need to handle picks before name
+    // resolution is done, we may want to move the related logic into ManagedChannelImpl.
+    @GuardedBy("servers")
+    private List<SettableFuture<ClientTransport>> pendingPicks;
+    @GuardedBy("servers")
+    private StatusException nameResolutionError;
+
+    private final TransportManager tm;
+
+    private SimpleLoadBalancer(TransportManager tm) {
+      this.tm = tm;
+    }
+
+    @Override
+    public ListenableFuture<ClientTransport> pickTransport(@Nullable RequestKey requestKey) {
+      ResolvedServerInfo currentServer;
+      synchronized (servers) {
+        if (servers.isEmpty()) {
+          if (nameResolutionError != null) {
+            return Futures.immediateFailedFuture(nameResolutionError);
+          }
+          SettableFuture<ClientTransport> future = SettableFuture.create();
+          if (pendingPicks == null) {
+            pendingPicks = new ArrayList<SettableFuture<ClientTransport>>();
+          }
+          pendingPicks.add(future);
+          return future;
+        }
+        currentServer = servers.get(currentServerIndex);
+      }
+      return tm.getTransport(currentServer.getAddress());
+    }
+
+    @Override
+    public void handleResolvedAddresses(
+        List<ResolvedServerInfo> updatedServers, Attributes config) {
+      List<SettableFuture<ClientTransport>> pendingPicksCopy = null;
+      ResolvedServerInfo currentServer = null;
+      synchronized (servers) {
+        nameResolutionError = null;
+        servers.clear();
+        for (ResolvedServerInfo addr : updatedServers) {
+          servers.add(addr);
+        }
+        if (!servers.isEmpty()) {
+          pendingPicksCopy = pendingPicks;
+          pendingPicks = null;
+          if (currentServerIndex >= servers.size()) {
+            currentServerIndex = 0;
+          }
+          currentServer = servers.get(currentServerIndex);
+        }
+      }
+      if (pendingPicksCopy != null) {
+        // If pendingPicksCopy != null, then servers.isEmpty() == false, then
+        // currentServer must have been assigned.
+        Preconditions.checkState(currentServer != null, "currentServer is null");
+        for (final SettableFuture<ClientTransport> pendingPick : pendingPicksCopy) {
+          ListenableFuture<ClientTransport> future = tm.getTransport(currentServer.getAddress());
+          Futures.addCallback(future, new FutureCallback<ClientTransport>() {
+            @Override public void onSuccess(ClientTransport result) {
+              pendingPick.set(result);
+            }
+
+            @Override public void onFailure(Throwable t) {
+              pendingPick.setException(t);
+            }
+          });
+        }
+      }
+    }
+
+    @Override
+    public void handleNameResolutionError(Status error) {
+      List<SettableFuture<ClientTransport>> pendingPicksCopy = null;
+      StatusException statusException = error.asException();
+      synchronized (servers) {
+        pendingPicksCopy = pendingPicks;
+        pendingPicks = null;
+        nameResolutionError = statusException;
+      }
+      if (pendingPicksCopy != null) {
+        for (SettableFuture<ClientTransport> pendingPick : pendingPicksCopy) {
+          pendingPick.setException(statusException);
+        }
+      }
+    }
+
+    @Override
+    public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) {
+      if (!s.isOk()) {
+        // If the current transport is shut down due to error, move on to the next address in the
+        // list
+        synchronized (servers) {
+          if (addr.equals(servers.get(currentServerIndex).getAddress())) {
+            currentServerIndex++;
+            if (currentServerIndex >= servers.size()) {
+              currentServerIndex = 0;
+            }
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/io/grpc/TransportManager.java b/core/src/main/java/io/grpc/TransportManager.java
new file mode 100644
index 0000000..1848bc7
--- /dev/null
+++ b/core/src/main/java/io/grpc/TransportManager.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import java.net.SocketAddress;
+
+/**
+ * Manages transport life-cycles and provide ready-to-use transports.
+ */
+@ExperimentalApi
+public abstract class TransportManager {
+  /**
+   * Advises this {@code TransportManager} to retain transports only to these servers, for warming
+   * up connections and discarding unused connections.
+   */
+  public abstract void updateRetainedTransports(SocketAddress[] addrs);
+
+  /**
+   * Returns the future of a transport for the given server.
+   *
+   * <p>If the channel has been shut down, the value of the future will be {@code null}.
+   */
+  // TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers,
+  // which would have a different authority than the primary servers. We need to figure out how to
+  // do it.
+  public abstract ListenableFuture<ClientTransport> getTransport(SocketAddress addr);
+}
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 4fb9038..4eeb1c9 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -39,6 +39,8 @@
 import io.grpc.internal.ClientTransport;
 import io.grpc.internal.ClientTransportFactory;
 
+import java.net.SocketAddress;
+
 /**
  * Builder for a channel that issues in-process requests. Clients identify the in-process server by
  * its name.
@@ -59,9 +61,9 @@
   }
 
   private final String name;
-  private String authority = "localhost";
 
   private InProcessChannelBuilder(String name) {
+    super(new InProcessSocketAddress(name), "localhost");
     this.name = Preconditions.checkNotNull(name);
   }
 
@@ -74,39 +76,39 @@
   }
 
   @Override
-  public InProcessChannelBuilder overrideAuthority(String authority) {
-    this.authority = authority;
-    return this;
-  }
-
-  @Override
   protected ClientTransportFactory buildTransportFactory() {
-    return new InProcessClientTransportFactory(name, authority);
+    return new InProcessClientTransportFactory(name);
   }
 
   private static class InProcessClientTransportFactory extends AbstractReferenceCounted
           implements ClientTransportFactory {
     private final String name;
-    private final String authority;
 
-    private InProcessClientTransportFactory(String name, String authority) {
+    private InProcessClientTransportFactory(String name) {
       this.name = name;
-      this.authority = authority;
     }
 
     @Override
-    public ClientTransport newClientTransport() {
+    public ClientTransport newClientTransport(SocketAddress addr, String authority) {
       return new InProcessTransport(name);
     }
 
     @Override
-    public String authority() {
-      return authority;
-    }
-
-    @Override
     protected void deallocate() {
       // Do nothing.
     }
   }
+
+  private static class InProcessSocketAddress extends SocketAddress {
+    final String name;
+
+    InProcessSocketAddress(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
 }
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index 5097c9f..b2d4766 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -31,12 +31,23 @@
 
 package io.grpc.internal;
 
-import io.grpc.ClientInterceptor;
-import io.grpc.Internal;
-import io.grpc.ManagedChannelBuilder;
+import com.google.common.base.Preconditions;
 
+import io.grpc.Attributes;
+import io.grpc.ClientInterceptor;
+import io.grpc.DnsNameResolverFactory;
+import io.grpc.Internal;
+import io.grpc.LoadBalancer;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.NameResolver;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.SimpleLoadBalancerFactory;
+
+import java.net.SocketAddress;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -54,9 +65,34 @@
   private Executor executor;
   private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
 
+  private final URI target;
+
+  @Nullable
+  private final SocketAddress directServerAddress;
+
   @Nullable
   private String userAgent;
 
+  @Nullable
+  private String authorityOverride;
+
+  @Nullable
+  private NameResolver.Factory nameResolverFactory;
+
+  @Nullable
+  private LoadBalancer.Factory loadBalancerFactory;
+
+  protected AbstractManagedChannelImplBuilder(URI target) {
+    this.target = Preconditions.checkNotNull(target);
+    this.directServerAddress = null;
+  }
+
+  protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority) {
+    this.target = URI.create("direct-address:///" + directServerAddress);
+    this.directServerAddress = directServerAddress;
+    this.nameResolverFactory = new DirectAddressNameResolverFactory(directServerAddress, authority);
+  }
+
   @Override
   public final T executor(Executor executor) {
     this.executor = executor;
@@ -74,6 +110,24 @@
     return intercept(Arrays.asList(interceptors));
   }
 
+  @Override
+  public final T nameResolverFactory(NameResolver.Factory resolverFactory) {
+    Preconditions.checkState(directServerAddress == null,
+        "directServerAddress is set (%s), which forbids the use of NameResolverFactory",
+        directServerAddress);
+    this.nameResolverFactory = resolverFactory;
+    return thisT();
+  }
+
+  @Override
+  public final T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory) {
+    Preconditions.checkState(directServerAddress == null,
+        "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
+        directServerAddress);
+    this.loadBalancerFactory = loadBalancerFactory;
+    return thisT();
+  }
+
   private T thisT() {
     @SuppressWarnings("unchecked")
     T thisT = (T) this;
@@ -87,9 +141,32 @@
   }
 
   @Override
+  public final T overrideAuthority(String authority) {
+    this.authorityOverride = checkAuthority(authority);
+    return thisT();
+  }
+
+  /**
+   * Verifies the authority is valid.  This method exists as an escape hatch for putting in an
+   * authority that is valid, but would fail the default validation provided by this
+   * implementation.
+   */
+  protected String checkAuthority(String authority) {
+    return GrpcUtil.checkAuthority(authority);
+  }
+
+  @Override
   public ManagedChannelImpl build() {
-    ClientTransportFactory transportFactory = buildTransportFactory();
-    return new ManagedChannelImpl(transportFactory, executor, userAgent, interceptors);
+    ClientTransportFactory transportFactory = new AuthorityOverridingTransportFactory(
+        buildTransportFactory(), authorityOverride);
+    return new ManagedChannelImpl(
+        target,
+        // TODO(carl-mastrangelo): Allow clients to pass this in
+        new ExponentialBackoffPolicy.Provider(),
+        // TODO(zhangkun83): use a NameResolver registry for the "nameResolverFactory == null" case
+        nameResolverFactory == null ? DnsNameResolverFactory.getInstance() : nameResolverFactory,
+        loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance() : loadBalancerFactory,
+        transportFactory, executor, userAgent, interceptors);
   }
 
   /**
@@ -99,4 +176,68 @@
    */
   @Internal
   protected abstract ClientTransportFactory buildTransportFactory();
+
+  private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
+    final ClientTransportFactory factory;
+    @Nullable final String authorityOverride;
+
+    AuthorityOverridingTransportFactory(
+        ClientTransportFactory factory, @Nullable String authorityOverride) {
+      this.factory = factory;
+      this.authorityOverride = authorityOverride;
+    }
+
+    @Override
+    public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
+      return factory.newClientTransport(
+          serverAddress, authorityOverride != null ? authorityOverride : authority);
+    }
+
+    @Override
+    public int referenceCount() {
+      return factory.referenceCount();
+    }
+
+    @Override
+    public ReferenceCounted retain() {
+      factory.retain();
+      return this;
+    }
+
+    @Override
+    public ReferenceCounted release() {
+      factory.release();
+      return this;
+    }
+  }
+
+  private static class DirectAddressNameResolverFactory extends NameResolver.Factory {
+    final SocketAddress address;
+    final String authority;
+
+    DirectAddressNameResolverFactory(SocketAddress address, String authority) {
+      this.address = address;
+      this.authority = authority;
+    }
+
+    @Override
+    public NameResolver newNameResolver(URI notUsedUri) {
+      return new NameResolver() {
+        @Override
+        public String getServiceAuthority() {
+          return authority;
+        }
+
+        @Override
+        public void start(final Listener listener) {
+          listener.onUpdate(
+              Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY)),
+              Attributes.EMPTY);
+        }
+
+        @Override
+        public void shutdown() {}
+      };
+    }
+  }
 }
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index de7c0aa..7c7a684 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -96,7 +96,7 @@
      * @return a future for client transport. If no more transports can be created, e.g., channel is
      *         shut down, the future's value will be {@code null}.
      */
-    ListenableFuture<ClientTransport> get();
+    ListenableFuture<ClientTransport> get(CallOptions callOptions);
   }
 
 
@@ -139,7 +139,7 @@
     }
 
     ClientStreamListener listener = new ClientStreamListenerImpl(observer);
-    ListenableFuture<ClientTransport> transportFuture = clientTransportProvider.get();
+    ListenableFuture<ClientTransport> transportFuture = clientTransportProvider.get(callOptions);
     if (transportFuture.isDone()) {
       // Try to skip DelayedStream when possible to avoid the overhead of a volatile read in the
       // fast path. If that fails, stream will stay null and DelayedStream will be created.
@@ -409,7 +409,7 @@
 
       StreamCreationTask() {
         this.transportFuture = Preconditions.checkNotNull(
-            clientTransportProvider.get(), "transportFuture");
+            clientTransportProvider.get(callOptions), "transportFuture");
       }
 
       @Override
diff --git a/core/src/main/java/io/grpc/internal/ClientTransport.java b/core/src/main/java/io/grpc/internal/ClientTransport.java
index c2c88e4..fd9fa85 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransport.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransport.java
@@ -108,6 +108,8 @@
      * that this method is called without {@link #shutdown} being called.  If the argument to this
      * function is {@link Status#isOk}, it is safe to immediately reconnect.
      *
+     * <p>This is called exactly once, and must be called prior to {@link #transportTerminated}.
+     *
      * @param s the reason for the shutdown.
      */
     void transportShutdown(Status s);
@@ -115,7 +117,8 @@
     /**
      * The transport completed shutting down. All resources have been released.
      *
-     * <p> {@link #transportShutdown(Status)} must be called before calling this method.
+     * <p>This is called exactly once, and must be called after {@link #transportShutdown} has been
+     * called.
      */
     void transportTerminated();
 
diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
index a36ade1..bc06a10 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
@@ -31,14 +31,15 @@
 
 package io.grpc.internal;
 
+import java.net.SocketAddress;
+
 /** Pre-configured factory for creating {@link ClientTransport} instances. */
 public interface ClientTransportFactory extends ReferenceCounted {
-  /** Creates an unstarted transport for exclusive use. */
-  ClientTransport newClientTransport();
-
   /**
-   * Returns the authority of the channel. Typically, this should be in the form {@code host:port}.
-   * Note that since there is not a scheme, there can't be a default port.
+   * Creates an unstarted transport for exclusive use.
+   *
+   * @param serverAddress the address that the transport is connected to
+   * @param authority the HTTP/2 authority of the server
    */
-  String authority();
+  ClientTransport newClientTransport(SocketAddress serverAddress, String authority);
 }
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 7f22fb3..b2b1660 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -351,7 +351,7 @@
   /**
    * Shared executor for channels.
    */
-  static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
+  public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
       new Resource<ExecutorService>() {
         private static final String name = "grpc-default-executor";
         @Override
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index bc01591..7c05c64 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -33,9 +33,11 @@
 
 import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
+import io.grpc.Attributes;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -45,11 +47,21 @@
 import io.grpc.Compressor;
 import io.grpc.DecompressorRegistry;
 import io.grpc.ExperimentalApi;
+import io.grpc.LoadBalancer;
 import io.grpc.ManagedChannel;
 import io.grpc.MethodDescriptor;
+import io.grpc.NameResolver;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.Status;
+import io.grpc.TransportManager;
 import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
 
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -90,8 +102,15 @@
    */
   private final Channel interceptorChannel;
 
+  private final NameResolver nameResolver;
+  private final LoadBalancer loadBalancer;
+
+  /**
+   * Maps addresses to transports for that server.
+   */
   @GuardedBy("lock")
-  private TransportSet transportSet;
+  private final Map<SocketAddress, TransportSet> transports =
+      new HashMap<SocketAddress, TransportSet>();
 
   @GuardedBy("lock")
   private boolean shutdown;
@@ -101,55 +120,21 @@
   private volatile Compressor defaultCompressor;
 
   private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
-
     @Override
-    public ListenableFuture<ClientTransport> get() {
-      TransportSet transportSetCopy;
+    public ListenableFuture<ClientTransport> get(CallOptions callOptions) {
       synchronized (lock) {
         if (shutdown) {
           return NULL_VALUE_TRANSPORT_FUTURE;
         }
-        if (transportSet == null) {
-          transportSet = new TransportSet(backoffPolicyProvider, transportFactory,
-              scheduledExecutor, new TransportSet.Callback() {
-                @Override
-                public void onTerminated() {
-                  synchronized (lock) {
-                    if (shutdown) {
-                      transportSet = null;
-                      if (terminated) {
-                        log.warning("transportTerminated called after already terminated");
-                      }
-                      terminated = true;
-                      lock.notifyAll();
-                      onChannelTerminated();
-                    }
-                  }
-                }
-              });
-        }
-        transportSetCopy = transportSet;
       }
-      return transportSetCopy.obtainActiveTransport();
+      return loadBalancer.pickTransport(callOptions.getRequestKey());
     }
   };
 
-  // TODO(zhangkun83): remove this in favor of the one that accepts BackoffPolicy.Provider
-  ManagedChannelImpl(ClientTransportFactory transportFactory, @Nullable Executor executor,
-      @Nullable String userAgent, List<ClientInterceptor> interceptors) {
-    this(new ExponentialBackoffPolicy.Provider(), transportFactory, executor, userAgent,
-        interceptors);
-  }
-
-  ManagedChannelImpl(BackoffPolicy.Provider backoffPolicyProvider,
+  ManagedChannelImpl(URI targetUri, BackoffPolicy.Provider backoffPolicyProvider,
+      NameResolver.Factory nameResolverFactory, LoadBalancer.Factory loadBalancerFactory,
       ClientTransportFactory transportFactory, @Nullable Executor executor,
       @Nullable String userAgent, List<ClientInterceptor> interceptors) {
-    this.backoffPolicyProvider = backoffPolicyProvider;
-    this.transportFactory = transportFactory;
-    this.userAgent = userAgent;
-    this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
-    scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
-
     if (executor == null) {
       usingSharedExecutor = true;
       this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@@ -157,7 +142,28 @@
       usingSharedExecutor = false;
       this.executor = executor;
     }
+    this.backoffPolicyProvider = backoffPolicyProvider;
+    this.nameResolver = nameResolverFactory.newNameResolver(targetUri);
+    Preconditions.checkArgument(this.nameResolver != null,
+        "The given NameResolverFactory cannot resolve %s", targetUri);
+    this.loadBalancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
+    this.transportFactory = transportFactory;
+    this.userAgent = userAgent;
+    this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
+    scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
 
+    this.nameResolver.start(new NameResolver.Listener() {
+      @Override
+      public void onUpdate(List<ResolvedServerInfo> servers, Attributes config) {
+        loadBalancer.handleResolvedAddresses(servers, config);
+      }
+
+      @Override
+      public void onError(Status error) {
+        Preconditions.checkArgument(!error.isOk(), "the error status must not be OK");
+        loadBalancer.handleNameResolutionError(error);
+      }
+    });
   }
 
   /**
@@ -180,7 +186,7 @@
    */
   @Override
   public ManagedChannelImpl shutdown() {
-    TransportSet transportSetCopy = null;
+    ArrayList<TransportSet> transportsCopy = new ArrayList<TransportSet>();
     synchronized (lock) {
       if (shutdown) {
         return this;
@@ -188,16 +194,16 @@
       shutdown = true;
       // After shutdown there are no new calls, so no new cancellation tasks are needed
       scheduledExecutor = SharedResourceHolder.release(TIMER_SERVICE, scheduledExecutor);
-      if (transportSet == null) {
+      if (transports.isEmpty()) {
         terminated = true;
         lock.notifyAll();
         onChannelTerminated();
       } else {
-        transportSetCopy = transportSet;
+        transportsCopy.addAll(transports.values());
       }
     }
-    if (transportSetCopy != null) {
-      transportSetCopy.shutdown();
+    for (TransportSet ts : transportsCopy) {
+      ts.shutdown();
     }
     return this;
   }
@@ -276,7 +282,8 @@
 
     @Override
     public String authority() {
-      return transportFactory.authority();
+      String authority = nameResolver.getServiceAuthority();
+      return Preconditions.checkNotNull(authority, "authority");
     }
   }
 
@@ -290,4 +297,43 @@
     // Release the transport factory so that it can deallocate any resources.
     transportFactory.release();
   }
+
+  private final TransportManager tm = new TransportManager() {
+    @Override
+    public void updateRetainedTransports(SocketAddress[] addrs) {
+      // TODO(zhangkun83): warm-up new servers and discard removed servers.
+    }
+
+    @Override
+    public ListenableFuture<ClientTransport> getTransport(final SocketAddress addr) {
+      TransportSet ts;
+      synchronized (lock) {
+        if (shutdown) {
+          return null;
+        }
+        ts = transports.get(addr);
+        if (ts == null) {
+          ts = new TransportSet(addr, authority(), loadBalancer, backoffPolicyProvider,
+              transportFactory, scheduledExecutor, new TransportSet.Callback() {
+                @Override
+                public void onTerminated() {
+                  synchronized (lock) {
+                    transports.remove(addr);
+                    if (shutdown && transports.isEmpty()) {
+                      if (terminated) {
+                        log.warning("transportTerminated called after already terminated");
+                      }
+                      terminated = true;
+                      lock.notifyAll();
+                      onChannelTerminated();
+                    }
+                  }
+                }
+              });
+          transports.put(addr, ts);
+        }
+      }
+      return ts.obtainActiveTransport();
+    }
+  };
 }
diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java
index fc4e046..9d7effe 100644
--- a/core/src/main/java/io/grpc/internal/TransportSet.java
+++ b/core/src/main/java/io/grpc/internal/TransportSet.java
@@ -36,13 +36,16 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
+import io.grpc.LoadBalancer;
 import io.grpc.Status;
 
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.annotation.Nullable;
@@ -63,6 +66,8 @@
   }
 
   private final Object lock = new Object();
+  private final SocketAddress server;
+  private final String authority;
   private final BackoffPolicy.Provider backoffPolicyProvider;
   private final Callback callback;
   private final ClientTransportFactory transportFactory;
@@ -84,6 +89,8 @@
   @GuardedBy("lock")
   private final Collection<ClientTransport> transports = new ArrayList<ClientTransport>();
 
+  private final LoadBalancer loadBalancer;
+
   @GuardedBy("lock")
   private boolean shutdown;
 
@@ -93,9 +100,12 @@
    */
   private volatile SettableFuture<ClientTransport> activeTransportFuture;
 
-  TransportSet(BackoffPolicy.Provider backoffPolicyProvider,
-      ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
-      Callback callback) {
+  TransportSet(SocketAddress server, String authority, LoadBalancer loadBalancer,
+      BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory,
+      ScheduledExecutorService scheduledExecutor, Callback callback) {
+    this.server = server;
+    this.authority = authority;
+    this.loadBalancer = loadBalancer;
     this.backoffPolicyProvider = backoffPolicyProvider;
     this.transportFactory = transportFactory;
     this.scheduledExecutor = scheduledExecutor;
@@ -156,8 +166,10 @@
           if (shutdown) {
             return;
           }
-          ClientTransport newActiveTransport = transportFactory.newClientTransport();
-          log.info("Created transport '" + newActiveTransport);
+          ClientTransport newActiveTransport = transportFactory.newClientTransport(
+              server, authority);
+          log.log(Level.INFO, "Created transport {0} for {1}",
+              new Object[] {newActiveTransport, server});
           transports.add(newActiveTransport);
           newActiveTransport.start(
               new TransportListener(newActiveTransport, activeTransportFuture));
@@ -222,30 +234,34 @@
     @Override
     public void transportReady() {
       synchronized (lock) {
-        log.info("Transport '" + transport + " is ready");
+        log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, server});
         Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
         if (isAttachedToActiveTransport()) {
           reconnectPolicy = null;
         }
       }
+      loadBalancer.transportReady(server, transport);
     }
 
     @Override
     public void transportShutdown(Status s) {
       synchronized (lock) {
-        log.info("Transport '" + transport + " is being shutdown");
+        log.log(Level.INFO, "Transport {0} for {1} is being shutdown",
+            new Object[] {transport, server});
         Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
         if (isAttachedToActiveTransport()) {
           createActiveTransportFuture();
         }
       }
+      loadBalancer.transportShutdown(server, transport, s);
     }
 
     @Override
     public void transportTerminated() {
       boolean runCallback = false;
       synchronized (lock) {
-        log.info("Transport '" + transport + " is terminated");
+        log.log(Level.INFO, "Transport {0} for {1} is terminated",
+            new Object[] {transport, server});
         Preconditions.checkState(!isAttachedToActiveTransport(),
             "Listener is still attached to activeTransportFuture. "
             + "Seems transportTerminated was not called.");
diff --git a/core/src/test/java/io/grpc/ManagedChannelProviderTest.java b/core/src/test/java/io/grpc/ManagedChannelProviderTest.java
index ab03caf..c8eae6182 100644
--- a/core/src/test/java/io/grpc/ManagedChannelProviderTest.java
+++ b/core/src/test/java/io/grpc/ManagedChannelProviderTest.java
@@ -113,6 +113,11 @@
     protected ManagedChannelBuilder<?> builderForAddress(String host, int port) {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    protected ManagedChannelBuilder<?> builderForTarget(String target) {
+      throw new UnsupportedOperationException();
+    }
   }
 
   public static class Available0Provider extends BaseProvider {
diff --git a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java
new file mode 100644
index 0000000..f79f618
--- /dev/null
+++ b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.grpc.internal.ClientTransport;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+
+/** Unit test for {@link SimpleLoadBalancerFactory}. */
+@RunWith(JUnit4.class)
+public class SimpleLoadBalancerTest {
+  private LoadBalancer loadBalancer;
+
+  private ArrayList<ResolvedServerInfo> servers;
+
+  @Mock
+  private TransportManager mockTransportManager;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer(
+        "fakeservice", mockTransportManager);
+    servers = new ArrayList<ResolvedServerInfo>();
+    for (int i = 0; i < 3; i++) {
+      servers.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i), Attributes.EMPTY));
+    }
+  }
+
+  @Test
+  public void pickBeforeResolved() throws Exception {
+    ClientTransport mockTransport = mock(ClientTransport.class);
+    SettableFuture<ClientTransport> sourceFuture = SettableFuture.create();
+    when(mockTransportManager.getTransport(same(servers.get(0).getAddress())))
+        .thenReturn(sourceFuture);
+    ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
+    ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
+    assertNotNull(f1);
+    assertNotNull(f2);
+    assertNotSame(f1, f2);
+    assertFalse(f1.isDone());
+    assertFalse(f2.isDone());
+    verify(mockTransportManager, never()).getTransport(any(SocketAddress.class));
+    loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
+    verify(mockTransportManager, times(2)).getTransport(same(servers.get(0).getAddress()));
+    assertFalse(f1.isDone());
+    assertFalse(f2.isDone());
+    assertNotSame(sourceFuture, f1);
+    assertNotSame(sourceFuture, f2);
+    sourceFuture.set(mockTransport);
+    assertSame(mockTransport, f1.get());
+    assertSame(mockTransport, f2.get());
+    ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
+    assertSame(sourceFuture, f3);
+    verify(mockTransportManager, times(3)).getTransport(same(servers.get(0).getAddress()));
+    verifyNoMoreInteractions(mockTransportManager);
+  }
+
+  @Test
+  public void transportFailed() throws Exception {
+    ClientTransport mockTransport1 = mock(ClientTransport.class);
+    ClientTransport mockTransport2 = mock(ClientTransport.class);
+    when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))).thenReturn(
+        Futures.immediateFuture(mockTransport1));
+    when(mockTransportManager.getTransport(same(servers.get(1).getAddress()))).thenReturn(
+        Futures.immediateFuture(mockTransport2));
+    loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
+    ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
+    ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
+    assertSame(mockTransport1, f1.get());
+    assertSame(mockTransport1, f2.get());
+    loadBalancer.transportShutdown(servers.get(0).getAddress(), mockTransport1, Status.INTERNAL);
+    ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
+    assertSame(mockTransport2, f3.get());
+  }
+
+  private static class FakeSocketAddress extends SocketAddress {
+    final String name;
+
+    FakeSocketAddress(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return "FakeSocketAddress-" + name;
+    }
+  }
+
+}
diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
index b81f639..5ede4c1 100644
--- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java
@@ -94,7 +94,7 @@
     final ClientStream stream = mock(ClientStream.class);
     ClientTransportProvider provider = new ClientTransportProvider() {
       @Override
-      public ListenableFuture<ClientTransport> get() {
+      public ListenableFuture<ClientTransport> get(CallOptions callOptions) {
         return Futures.immediateFuture(transport);
       }
     };
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 1ed153f..3e52395 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -37,6 +37,7 @@
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doAnswer;
@@ -47,6 +48,7 @@
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import io.grpc.Attributes;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -56,6 +58,9 @@
 import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
+import io.grpc.NameResolver;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.SimpleLoadBalancerFactory;
 import io.grpc.Status;
 import io.grpc.StringMarshaller;
 
@@ -70,6 +75,8 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.net.SocketAddress;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -81,15 +88,23 @@
 /** Unit tests for {@link ManagedChannelImpl}. */
 @RunWith(JUnit4.class)
 public class ManagedChannelImplTest {
+  private static final List<ClientInterceptor> NO_INTERCEPTOR =
+      Collections.<ClientInterceptor>emptyList();
   private final MethodDescriptor<String, Integer> method = MethodDescriptor.create(
       MethodDescriptor.MethodType.UNKNOWN, "/service/method",
       new StringMarshaller(), new IntegerMarshaller());
   private final ExecutorService executor = Executors.newSingleThreadExecutor();
+  private final String serviceName = "fake.example.com";
+  private final URI target = URI.create("//" + serviceName);
+  private final String authority = serviceName;
+  private final SocketAddress socketAddress = new SocketAddress() {};
+  private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
 
   @Mock
   private ClientTransport mockTransport;
   @Mock
   private ClientTransportFactory mockTransportFactory;
+
   private ManagedChannel channel;
 
   @Mock
@@ -104,16 +119,18 @@
   private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
       ArgumentCaptor.forClass(ClientStreamListener.class);
 
-  private void createChannel(List<ClientInterceptor> interceptors) throws Exception {
-    channel = new ManagedChannelImpl(new FakeBackoffPolicyProvider(), mockTransportFactory,
-        executor, null, interceptors);
+  private ManagedChannel createChannel(
+      NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
+    return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
+        nameResolverFactory, SimpleLoadBalancerFactory.getInstance(),
+        mockTransportFactory, executor, null, interceptors);
   }
 
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
-    createChannel(Collections.<ClientInterceptor>emptyList());
-    when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
+    when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
+        .thenReturn(mockTransport);
   }
 
   @After
@@ -123,6 +140,7 @@
 
   @Test
   public void immediateDeadlineExceeded() {
+    ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
     ClientCall<String, Integer> call =
         channel.newCall(method, CallOptions.DEFAULT.withDeadlineNanoTime(System.nanoTime()));
     call.start(mockCallListener, new Metadata());
@@ -132,6 +150,7 @@
 
   @Test
   public void shutdownWithNoTransportsEverCreated() {
+    ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
     verifyNoMoreInteractions(mockTransportFactory);
     channel.shutdown();
     assertTrue(channel.isShutdown());
@@ -140,6 +159,8 @@
 
   @Test
   public void twoCallsAndGracefulShutdown() {
+    ManagedChannel channel = createChannel(
+        new FakeNameResolverFactory(server), Collections.<ClientInterceptor>emptyList());
     verifyNoMoreInteractions(mockTransportFactory);
     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
     verifyNoMoreInteractions(mockTransportFactory);
@@ -148,11 +169,13 @@
     ClientTransport mockTransport = mock(ClientTransport.class);
     ClientStream mockStream = mock(ClientStream.class);
     Metadata headers = new Metadata();
-    when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
+    when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
+        .thenReturn(mockTransport);
     when(mockTransport.newStream(same(method), same(headers), any(ClientStreamListener.class)))
         .thenReturn(mockStream);
     call.start(mockCallListener, headers);
-    verify(mockTransportFactory, timeout(1000)).newClientTransport();
+    verify(mockTransportFactory, timeout(1000))
+        .newClientTransport(same(socketAddress), eq(authority));
     verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
     ClientTransport.Listener transportListener = transportListenerCaptor.getValue();
     verify(mockTransport, timeout(1000))
@@ -206,6 +229,7 @@
 
   @Test
   public void transportFailsOnStart() {
+    ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
     Status goldenStatus = Status.INTERNAL.withDescription("wanted it to fail");
 
     // mockTransport2 shuts immediately during start
@@ -230,10 +254,12 @@
     when(mockTransport2.newStream(same(method), same(headers2), any(ClientStreamListener.class)))
         .thenReturn(mockStream2);
     // The factory returns the immediately shut-down transport first, then the normal one
-    when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport2, mockTransport);
+    when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
+        .thenReturn(mockTransport2, mockTransport);
 
     call.start(mockCallListener2, headers2);
-    verify(mockTransportFactory, timeout(1000).times(2)).newClientTransport();
+    verify(mockTransportFactory, timeout(1000).times(2))
+        .newClientTransport(same(socketAddress), eq(authority));
     verify(mockTransport2, timeout(1000)).start(any(ClientTransport.Listener.class));
     verify(mockTransport2, timeout(1000))
         .newStream(same(method), same(headers2), streamListenerCaptor.capture());
@@ -268,7 +294,7 @@
     transportListenerCaptor.getValue().transportTerminated();
     assertTrue(channel.isTerminated());
 
-    verify(mockTransportFactory, times(2)).newClientTransport();
+    verify(mockTransportFactory, times(2)).newClientTransport(same(socketAddress), eq(authority));
     verifyNoMoreInteractions(mockTransport);
     verifyNoMoreInteractions(mockTransport2);
     verifyNoMoreInteractions(mockStream2);
@@ -286,13 +312,15 @@
         return next.newCall(method, callOptions);
       }
     };
-    createChannel(Arrays.asList(interceptor));
+    ManagedChannel channel = createChannel(
+        new FakeNameResolverFactory(server), Arrays.asList(interceptor));
     assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
     assertEquals(1, atomic.get());
   }
 
   @Test
   public void testNoDeadlockOnShutdown() {
+    ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
     // Force creation of transport
     ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
     Metadata headers = new Metadata();
@@ -341,6 +369,18 @@
     transportListener.transportTerminated();
   }
 
+  @Test
+  public void nameResolutionFailed() {
+    Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
+    ManagedChannel channel = createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR);
+    ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
+    call.start(mockCallListener, new Metadata());
+    ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
+    verify(mockCallListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class));
+    Status status = statusCaptor.getValue();
+    assertSame(error, status);
+  }
+
   private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
     @Override
     public BackoffPolicy get() {
@@ -352,4 +392,53 @@
       };
     }
   }
+
+  private class FakeNameResolverFactory extends NameResolver.Factory {
+    final ResolvedServerInfo server;
+
+    FakeNameResolverFactory(ResolvedServerInfo server) {
+      this.server = server;
+    }
+
+    @Override
+    public NameResolver newNameResolver(final URI targetUri) {
+      assertEquals(null, targetUri.getScheme());
+      assertEquals(serviceName, targetUri.getAuthority());
+      return new NameResolver() {
+        @Override public String getServiceAuthority() {
+          assertNotNull(targetUri.toString() + " has authority", targetUri.getAuthority());
+          return targetUri.getAuthority();
+        }
+
+        @Override public void start(final Listener listener) {
+          listener.onUpdate(Collections.singletonList(server), Attributes.EMPTY);
+        }
+
+        @Override public void shutdown() {}
+      };
+    }
+  }
+
+  private class FailingNameResolverFactory extends NameResolver.Factory {
+    final Status error;
+
+    FailingNameResolverFactory(Status error) {
+      this.error = error;
+    }
+
+    @Override
+    public NameResolver newNameResolver(URI notUsedUri) {
+      return new NameResolver() {
+        @Override public String getServiceAuthority() {
+          return "irrelevant-authority";
+        }
+
+        @Override public void start(final Listener listener) {
+          listener.onError(error);
+        }
+
+        @Override public void shutdown() {}
+      };
+    }
+  }
 }
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 8acac65..580f808 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -50,6 +50,7 @@
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.URI;
 
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLException;
@@ -61,8 +62,6 @@
 public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<NettyChannelBuilder> {
   public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
 
-  private final SocketAddress serverAddress;
-  private String authority;
   private NegotiationType negotiationType = NegotiationType.TLS;
   private Class<? extends Channel> channelType = NioSocketChannel.class;
   @Nullable
@@ -78,30 +77,39 @@
    * noticing changes to DNS.
    */
   public static NettyChannelBuilder forAddress(SocketAddress serverAddress) {
-    String authority;
-    if (serverAddress instanceof InetSocketAddress) {
-      InetSocketAddress address = (InetSocketAddress) serverAddress;
-      authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
-    } else {
-      // Specialized address types are allowed to support custom Channel types so just assume their
-      // toString() values are valid :authority values. We defer checking validity of authority
-      // until buildTransportFactory() to provide the user an opportunity to override the value.
-      authority = serverAddress.toString();
-    }
-    return new NettyChannelBuilder(serverAddress, authority);
+    return new NettyChannelBuilder(serverAddress);
   }
 
   /**
    * Creates a new builder with the given host and port.
    */
   public static NettyChannelBuilder forAddress(String host, int port) {
-    return new NettyChannelBuilder(
-        new InetSocketAddress(host, port), GrpcUtil.authorityFromHostAndPort(host, port));
+    return forAddress(new InetSocketAddress(host, port));
   }
 
-  protected NettyChannelBuilder(SocketAddress serverAddress, String authority) {
-    this.serverAddress = serverAddress;
-    this.authority = authority;
+  /**
+   * Creates a new builder with the given target URI that will be resolved by
+   * {@link io.grpc.NameResolver}.
+   */
+  public static NettyChannelBuilder forTarget(String targetUri) {
+    return new NettyChannelBuilder(URI.create(targetUri));
+  }
+
+  private NettyChannelBuilder(URI target) {
+    super(target);
+  }
+
+  protected NettyChannelBuilder(SocketAddress address) {
+    super(address, getAuthorityFromAddress(address));
+  }
+
+  private static String getAuthorityFromAddress(SocketAddress address) {
+    if (address instanceof InetSocketAddress) {
+      InetSocketAddress inetAddress = (InetSocketAddress) address;
+      return GrpcUtil.authorityFromHostAndPort(inetAddress.getHostString(), inetAddress.getPort());
+    } else {
+      return address.toString();
+    }
   }
 
   /**
@@ -180,28 +188,14 @@
   }
 
   @Override
-  public final NettyChannelBuilder overrideAuthority(String authority) {
-    this.authority = checkAuthority(authority);
-    return this;
+  protected ClientTransportFactory buildTransportFactory() {
+    return new NettyTransportFactory(channelType, negotiationType, sslContext,
+        eventLoopGroup, flowControlWindow, maxMessageSize);
   }
 
-  /**
-   * Verifies the authority is valid.  This method exists as an escape hatch for putting in an
-   * authority that is valid, but would fail the default validation provided by this implementation.
-   */
-  protected String checkAuthority(String authority) {
-    return GrpcUtil.checkAuthority(authority);
-  }
-
-  @Override
-  protected final ClientTransportFactory buildTransportFactory() {
-    // Check authority, since non-inet ServerAddresses delay the authority check.
-    checkAuthority(authority);
-    return new NettyTransportFactory(serverAddress, authority, channelType, eventLoopGroup,
-        flowControlWindow, createProtocolNegotiator(), maxMessageSize);
-  }
-
-  private ProtocolNegotiator createProtocolNegotiator() {
+  private static ProtocolNegotiator createProtocolNegotiator(
+      String authority, NegotiationType negotiationType, SslContext sslContext) {
+    ProtocolNegotiator negotiator;
     switch (negotiationType) {
       case PLAINTEXT:
         return ProtocolNegotiators.plaintext();
@@ -223,29 +217,25 @@
 
   private static class NettyTransportFactory extends AbstractReferenceCounted
           implements ClientTransportFactory {
-    private final SocketAddress serverAddress;
     private final Class<? extends Channel> channelType;
+    private final NegotiationType negotiationType;
+    private final SslContext sslContext;
     private final EventLoopGroup group;
     private final boolean usingSharedGroup;
     private final int flowControlWindow;
-    private final ProtocolNegotiator negotiator;
     private final int maxMessageSize;
-    private final String authority;
 
-    private NettyTransportFactory(SocketAddress serverAddress,
-                                  String authority,
-                                  Class<? extends Channel> channelType,
+    private NettyTransportFactory(Class<? extends Channel> channelType,
+                                  NegotiationType negotiationType,
+                                  SslContext sslContext,
                                   EventLoopGroup group,
                                   int flowControlWindow,
-                                  ProtocolNegotiator negotiator,
                                   int maxMessageSize) {
-      this.serverAddress = serverAddress;
       this.channelType = channelType;
+      this.negotiationType = negotiationType;
+      this.sslContext = sslContext;
       this.flowControlWindow = flowControlWindow;
-      this.negotiator = negotiator;
       this.maxMessageSize = maxMessageSize;
-      this.authority = authority;
-
       usingSharedGroup = group == null;
       if (usingSharedGroup) {
         // The group was unspecified, using the shared group.
@@ -256,14 +246,11 @@
     }
 
     @Override
-    public ClientTransport newClientTransport() {
-      return new NettyClientTransport(serverAddress, channelType, group, negotiator,
-              flowControlWindow, maxMessageSize, authority);
-    }
-
-    @Override
-    public String authority() {
-      return authority;
+    public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
+      GrpcUtil.checkAuthority(authority);
+      return new NettyClientTransport(serverAddress, channelType, group,
+          createProtocolNegotiator(authority, negotiationType, sslContext),
+          flowControlWindow, maxMessageSize, authority);
     }
 
     @Override
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java b/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java
index 628f803..2b13b6e 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelProvider.java
@@ -51,4 +51,9 @@
   protected NettyChannelBuilder builderForAddress(String name, int port) {
     return NettyChannelBuilder.forAddress(name, port);
   }
+
+  @Override
+  protected NettyChannelBuilder builderForTarget(String target) {
+    return NettyChannelBuilder.forTarget(target);
+  }
 }
diff --git a/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java
index bbddd4e..06840bb 100644
--- a/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java
@@ -31,8 +31,6 @@
 
 package io.grpc.netty;
 
-import static org.junit.Assert.assertEquals;
-
 import io.grpc.internal.ClientTransportFactory;
 
 import org.junit.Rule;
@@ -51,18 +49,28 @@
 
   @Test
   public void overrideAllowsInvalidAuthority() {
-    NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){}, "") {
+    NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){}) {
       @Override
       protected String checkAuthority(String authority) {
         return authority;
       }
     };
 
-    ClientTransportFactory factory = builder.overrideAuthority("invalid_authority")
+    ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
         .negotiationType(NegotiationType.PLAINTEXT)
         .buildTransportFactory();
+  }
 
-    assertEquals("invalid_authority", factory.authority());
+  @Test
+  public void failOverrideInvalidAuthority() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid authority:");
+
+    NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){});
+
+    ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
+        .negotiationType(NegotiationType.PLAINTEXT)
+        .buildTransportFactory();
   }
 
   @Test
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index 02f7176..01b7381 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -50,6 +50,9 @@
 import io.grpc.internal.SharedResourceHolder;
 import io.grpc.internal.SharedResourceHolder.Resource;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -99,19 +102,27 @@
     return new OkHttpChannelBuilder(host, port);
   }
 
+  /**
+   * Creates a new builder for the given target URI that will be resolved by
+   * {@link io.grpc.NameResolver}.
+   */
+  public static OkHttpChannelBuilder forTarget(String targetUri) {
+    return new OkHttpChannelBuilder(URI.create(targetUri));
+  }
+
   private Executor transportExecutor;
-  private final String host;
-  private final int port;
-  private String authority;
+
   private SSLSocketFactory sslSocketFactory;
   private ConnectionSpec connectionSpec = DEFAULT_CONNECTION_SPEC;
   private NegotiationType negotiationType = NegotiationType.TLS;
   private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
 
   protected OkHttpChannelBuilder(String host, int port) {
-    this.host = Preconditions.checkNotNull(host);
-    this.port = port;
-    this.authority = GrpcUtil.authorityFromHostAndPort(host, port);
+    this(URI.create("dns:///" + GrpcUtil.authorityFromHostAndPort(host, port)));
+  }
+
+  private OkHttpChannelBuilder(URI target) {
+    super(target);
   }
 
   /**
@@ -126,27 +137,6 @@
   }
 
   /**
-   * Overrides the host used with TLS and HTTP virtual hosting. It does not change what host is
-   * actually connected to. This method differs from {@link #overrideAuthority(String)} in that it
-   * appends the port number to the host provided.
-   *
-   * <p>Should only used by tests.
-   *
-   * @deprecated use {@link #overrideAuthority} instead
-   */
-  @Deprecated
-  public final OkHttpChannelBuilder overrideHostForAuthority(String host) {
-    this.authority = GrpcUtil.authorityFromHostAndPort(host, this.port);
-    return this;
-  }
-
-  @Override
-  public final OkHttpChannelBuilder overrideAuthority(String authority) {
-    this.authority = checkAuthority(authority);
-    return this;
-  }
-
-  /**
    * Sets the negotiation type for the HTTP/2 connection.
    *
    * <p>Default: <code>TLS</code>
@@ -205,18 +195,10 @@
 
   @Override
   protected final ClientTransportFactory buildTransportFactory() {
-    return new OkHttpTransportFactory(host, port, authority, transportExecutor,
+    return new OkHttpTransportFactory(transportExecutor,
             createSocketFactory(), connectionSpec, maxMessageSize);
   }
 
-  /**
-   * Verifies the authority is valid.  This method exists as an escape hatch for putting in an
-   * authority that is valid, but would fail the default validation provided by this implementation.
-   */
-  protected String checkAuthority(String authority) {
-    return GrpcUtil.checkAuthority(authority);
-  }
-
   private SSLSocketFactory createSocketFactory() {
     switch (negotiationType) {
       case TLS:
@@ -231,25 +213,16 @@
 
   private static class OkHttpTransportFactory extends AbstractReferenceCounted
           implements ClientTransportFactory {
-    private final String host;
-    private final int port;
-    private final String authority;
     private final Executor executor;
     private final boolean usingSharedExecutor;
     private final SSLSocketFactory socketFactory;
     private final ConnectionSpec connectionSpec;
     private final int maxMessageSize;
 
-    private OkHttpTransportFactory(String host,
-                                   int port,
-                                   String authority,
-                                   Executor executor,
+    private OkHttpTransportFactory(Executor executor,
                                    SSLSocketFactory socketFactory,
                                    ConnectionSpec connectionSpec,
                                    int maxMessageSize) {
-      this.host = host;
-      this.port = port;
-      this.authority = authority;
       this.socketFactory = socketFactory;
       this.connectionSpec = connectionSpec;
       this.maxMessageSize = maxMessageSize;
@@ -264,14 +237,10 @@
     }
 
     @Override
-    public ClientTransport newClientTransport() {
-      return new OkHttpClientTransport(host, port, authority, executor, socketFactory,
-          Utils.convertSpec(connectionSpec), maxMessageSize);
-    }
-
-    @Override
-    public String authority() {
-      return authority;
+    public ClientTransport newClientTransport(SocketAddress addr, String authority) {
+      InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
+      return new OkHttpClientTransport(inetSocketAddr.getHostString(), inetSocketAddr.getPort(),
+          authority, executor, socketFactory, Utils.convertSpec(connectionSpec), maxMessageSize);
     }
 
     @Override
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java
index c915dcc..9b3e34f 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java
@@ -61,4 +61,9 @@
   protected OkHttpChannelBuilder builderForAddress(String name, int port) {
     return OkHttpChannelBuilder.forAddress(name, port);
   }
+
+  @Override
+  protected OkHttpChannelBuilder builderForTarget(String target) {
+    return OkHttpChannelBuilder.forTarget(target);
+  }
 }
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java
index a1fe95d..b80a7fd 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java
@@ -31,8 +31,6 @@
 
 package io.grpc.okhttp;
 
-import static org.junit.Assert.assertEquals;
-
 import io.grpc.internal.ClientTransportFactory;
 
 import org.junit.Rule;
@@ -55,11 +53,20 @@
       }
     };
 
-    ClientTransportFactory factory = builder.overrideAuthority("invalid_authority")
+    ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
         .negotiationType(NegotiationType.PLAINTEXT)
         .buildTransportFactory();
+  }
 
-    assertEquals("invalid_authority", factory.authority());
+  @Test
+  public void failOverrideInvalidAuthority() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid authority:");
+    OkHttpChannelBuilder builder = new OkHttpChannelBuilder("good", 1234);
+
+    ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
+        .negotiationType(NegotiationType.PLAINTEXT)
+        .buildTransportFactory();
   }
 
   @Test