xds: support ring_hash as the endpoint-level LB policy (#7991)

Update LB policy config generation to support ring hash policy as the endpoint-level LB policy.

- Changed the CDS LB policy to accept RING_HASH as the endpoint LB policy from CDS updates. This configuration is directly passed to its child policy (aka, ClusterResolverLoadBalancer) in its config.

- Changed ClusterResolverLoadBalancer to generate different LB configs for its downstream LB policies, depending on the endpoint-level LB policies.
  - If the endpoint-level LB policy is ROUND_ROBIN, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> WeightedTargetLB -> RoundRobinLB
  - If the endpoin-level LB policy is RNIG_HASH, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> RingHashLB.
diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
index 7ee6009..d0286b2 100644
--- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
+++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
@@ -32,9 +32,11 @@
 import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
+import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
 import io.grpc.xds.XdsClient.CdsResourceWatcher;
 import io.grpc.xds.XdsClient.CdsUpdate;
 import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
+import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
 import io.grpc.xds.XdsLogger.XdsLogLevel;
 import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
 import java.util.ArrayDeque;
@@ -181,15 +183,16 @@
         helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
         return;
       }
-      String endpointPickingPolicy = root.result.lbPolicy();
-      LoadBalancerProvider localityPickingLbProvider =
-          lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME);  // hardcoded
-      LoadBalancerProvider endpointPickingLbProvider =
-          lbRegistry.getProvider(endpointPickingPolicy);
+      LoadBalancerProvider lbProvider = null;
+      Object lbConfig = null;
+      if (root.result.lbPolicy() == LbPolicy.RING_HASH) {
+        lbProvider = lbRegistry.getProvider("ring_hash");
+        lbConfig = new RingHashConfig(root.result.minRingSize(), root.result.maxRingSize());
+      } else {
+        lbProvider = lbRegistry.getProvider("round_robin");
+      }
       ClusterResolverConfig config = new ClusterResolverConfig(
-          Collections.unmodifiableList(instances),
-          new PolicySelection(localityPickingLbProvider, null /* by cluster_resolver LB policy */),
-          new PolicySelection(endpointPickingLbProvider, null));
+          Collections.unmodifiableList(instances), new PolicySelection(lbProvider, lbConfig));
       if (childLb == null) {
         childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
       }
diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
index 3b8e89c..77678f4 100644
--- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
@@ -22,7 +22,6 @@
 
 import com.github.udpa.udpa.type.v1.TypedStruct;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.CaseFormat;
 import com.google.common.base.Joiner;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
@@ -832,8 +831,6 @@
     }
 
     CdsUpdate.Builder updateBuilder = structOrError.getStruct();
-    String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to(
-        CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name());
 
     if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
       RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
@@ -841,10 +838,10 @@
         throw new ResourceInvalidException(
             "Unsupported ring hash function: " + lbConfig.getHashFunction());
       }
-      updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(),
-          lbConfig.getMaximumRingSize().getValue());
+      updateBuilder.lbPolicy(CdsUpdate.LbPolicy.RING_HASH,
+          lbConfig.getMinimumRingSize().getValue(), lbConfig.getMaximumRingSize().getValue());
     } else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) {
-      updateBuilder.lbPolicy(lbPolicy);
+      updateBuilder.lbPolicy(CdsUpdate.LbPolicy.ROUND_ROBIN);
     } else {
       throw new ResourceInvalidException("Unsupported lb policy: " + cluster.getLbPolicy());
     }
diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
index 37dc4e7..f29d932 100644
--- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
@@ -20,6 +20,7 @@
 import static io.grpc.ConnectivityState.CONNECTING;
 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
 import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
+import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
 import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -76,7 +77,8 @@
  * used in the downstream LB policies for fine-grained load balancing purposes.
  */
 final class ClusterResolverLoadBalancer extends LoadBalancer {
-
+  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
+  // to an empty locality.
   private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
   private final XdsLogger logger;
   private final String authority;
@@ -156,12 +158,7 @@
     private final Helper helper;
     private final List<String> clusters = new ArrayList<>();
     private final Map<String, ClusterState> clusterStates = new HashMap<>();
-    // An aggregate cluster is thought of as a cluster that groups the endpoints of the underlying
-    // clusters together for load balancing purposes only. Load balancing policies (both locality
-    // level and endpoint level) are configured by the aggregate cluster and apply to all of its
-    // underlying clusters.
-    private PolicySelection localityPickingPolicy;
-    private PolicySelection endpointPickingPolicy;
+    private PolicySelection endpointLbPolicy;
     private ResolvedAddresses resolvedAddresses;
     private LoadBalancer childLb;
 
@@ -175,20 +172,18 @@
       this.resolvedAddresses = resolvedAddresses;
       ClusterResolverConfig config =
           (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
-      localityPickingPolicy = config.localityPickingPolicy;
-      endpointPickingPolicy = config.endpointPickingPolicy;
+      endpointLbPolicy = config.lbPolicy;
       for (DiscoveryMechanism instance : config.discoveryMechanisms) {
         clusters.add(instance.cluster);
         ClusterState state;
         if (instance.type == DiscoveryMechanism.Type.EDS) {
           state = new EdsClusterState(instance.cluster, instance.edsServiceName,
               instance.lrsServerName, instance.maxConcurrentRequests, instance.tlsContext);
-          clusterStates.put(instance.cluster, state);
         } else {  // logical DNS
           state = new LogicalDnsClusterState(instance.cluster, instance.lrsServerName,
               instance.maxConcurrentRequests, instance.tlsContext);
-          clusterStates.put(instance.cluster, state);
         }
+        clusterStates.put(instance.cluster, state);
         state.start();
       }
     }
@@ -392,8 +387,11 @@
               for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
                 if (endpoint.isHealthy()) {
                   discard = false;
+                  long weight =
+                      (long) localityLbInfo.localityWeight() * endpoint.loadBalancingWeight();
                   Attributes attr = endpoint.eag().getAttributes().toBuilder()
-                      .set(InternalXdsAttributes.ATTR_LOCALITY, locality).build();
+                      .set(InternalXdsAttributes.ATTR_LOCALITY, locality)
+                      .set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight).build();
                   EquivalentAddressGroup eag =
                       new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
                   eag = AddressFilter.setPathFilter(
@@ -419,10 +417,10 @@
             }
             List<String> priorities = new ArrayList<>(prioritizedLocalityWeights.keySet());
             Collections.sort(priorities);
-            Map<String, PriorityChildConfig> priorityChildConfigs = generatePriorityChildConfigs(
-                name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
-                localityPickingPolicy, endpointPickingPolicy, true, lbRegistry,
-                prioritizedLocalityWeights, dropOverloads);
+            Map<String, PriorityChildConfig> priorityChildConfigs =
+                generateEdsBasedPriorityChildConfigs(
+                    name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
+                    endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads);
             status = Status.OK;
             resolved = true;
             result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities);
@@ -532,9 +530,12 @@
                 return;
               }
               backoffPolicy = null;  // reset backoff sequence if succeeded
+              // Arbitrary priority notation for all DNS-resolved endpoints.
               String priorityName = priorityName(name, 0);  // value doesn't matter
               List<EquivalentAddressGroup> addresses = new ArrayList<>();
               for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
+                // No weight attribute is attached, all endpoint-level LB policy should be able
+                // to handle such it.
                 Attributes attr = eag.getAttributes().toBuilder().set(
                     InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build();
                 eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
@@ -542,12 +543,9 @@
                     eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString()));
                 addresses.add(eag);
               }
-              PolicySelection endpointPickingPolicy =
-                  new PolicySelection(lbRegistry.getProvider("pick_first"), null);
-              PriorityChildConfig priorityChildConfig = generatePriorityChildConfig(
+              PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
                   name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
-                  endpointPickingPolicy, false, lbRegistry,
-                  Collections.<DropOverload>emptyList());
+                  lbRegistry, Collections.<DropOverload>emptyList());
               status = Status.OK;
               resolved = true;
               result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
@@ -614,58 +612,74 @@
   }
 
   /**
-   * Generates the config to be used in the priority LB policy for a single priority.
+   * Generates the config to be used in the priority LB policy for the single priority of
+   * logical DNS cluster.
    *
-   * <p>priority LB -> cluster_impl LB -> pick_first
+   * <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
    */
-  private static PriorityChildConfig generatePriorityChildConfig(
+  private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
       String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName,
       @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
-      PolicySelection endpointPickingPolicy, boolean ignoreReresolution,
       LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) {
+    // Override endpoint-level LB policy with pick_first for logical DNS cluster.
+    PolicySelection endpointLbPolicy =
+        new PolicySelection(lbRegistry.getProvider("pick_first"), null);
     ClusterImplConfig clusterImplConfig =
         new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests,
-            dropOverloads, endpointPickingPolicy, tlsContext);
+            dropOverloads, endpointLbPolicy, tlsContext);
     LoadBalancerProvider clusterImplLbProvider =
         lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
     PolicySelection clusterImplPolicy =
         new PolicySelection(clusterImplLbProvider, clusterImplConfig);
-    return new PriorityChildConfig(clusterImplPolicy, ignoreReresolution);
+    return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
   }
 
   /**
-   * Generates configs to be used in the priority LB policy for priorities in the cluster.
+   * Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
    *
-   * <p>priority LB -> cluster_impl LB (one per priority) -> weighted_target LB
-   * -> round_robin (one per locality))
+   * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
+   * -> round_robin (one per locality)) / ring_hash
    */
-  private static Map<String, PriorityChildConfig> generatePriorityChildConfigs(
+  private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
       String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName,
       @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
-      PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy,
-      boolean ignoreReresolution, LoadBalancerRegistry lbRegistry,
+      PolicySelection endpointLbPolicy, LoadBalancerRegistry lbRegistry,
       Map<String, Map<Locality, Integer>> prioritizedLocalityWeights,
       List<DropOverload> dropOverloads) {
     Map<String, PriorityChildConfig> configs = new HashMap<>();
     for (String priority : prioritizedLocalityWeights.keySet()) {
-      Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority);
-      Map<String, WeightedPolicySelection> targets = new HashMap<>();
-      for (Locality locality : localityWeights.keySet()) {
-        int weight = localityWeights.get(locality);
-        targets.put(localityName(locality),
-            new WeightedPolicySelection(weight, endpointPickingPolicy));
+      PolicySelection leafPolicy =  endpointLbPolicy;
+      // Depending on the endpoint-level load balancing policy, different LB hierarchy may be
+      // created. If the endpoint-level LB policy is round_robin, it creates a two-level LB
+      // hierarchy: a locality-level LB policy that balances load according to locality weights
+      // followed by an endpoint-level LB policy that simply rounds robin the endpoints within
+      // the locality. If the endpoint-level LB policy is ring_hash, it creates a unified LB
+      // policy that balances load by weighing the product of each endpoint's weight and the
+      // weight of the locality it belongs to.
+      if (endpointLbPolicy.getProvider().getPolicyName().equals("round_robin")) {
+        Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority);
+        Map<String, WeightedPolicySelection> targets = new HashMap<>();
+        for (Locality locality : localityWeights.keySet()) {
+          int weight = localityWeights.get(locality);
+          WeightedPolicySelection target = new WeightedPolicySelection(weight, endpointLbPolicy);
+          targets.put(localityName(locality), target);
+        }
+        LoadBalancerProvider weightedTargetLbProvider =
+            lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME);
+        WeightedTargetConfig weightedTargetConfig =
+            new WeightedTargetConfig(Collections.unmodifiableMap(targets));
+        leafPolicy = new PolicySelection(weightedTargetLbProvider, weightedTargetConfig);
       }
-      PolicySelection localityPicking = new PolicySelection(
-          localityPickingPolicy.getProvider(),
-          new WeightedTargetConfig(Collections.unmodifiableMap(targets)));
       ClusterImplConfig clusterImplConfig =
           new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests,
-              dropOverloads, localityPicking, tlsContext);
+              dropOverloads, leafPolicy, tlsContext);
       LoadBalancerProvider clusterImplLbProvider =
           lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
       PolicySelection clusterImplPolicy =
           new PolicySelection(clusterImplLbProvider, clusterImplConfig);
-      configs.put(priority, new PriorityChildConfig(clusterImplPolicy, ignoreReresolution));
+      PriorityChildConfig priorityChildConfig =
+          new PriorityChildConfig(clusterImplPolicy, true /* ignoreReresolution */);
+      configs.put(priority, priorityChildConfig);
     }
     return configs;
   }
diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java
index 9442e88..e62c70c 100644
--- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java
+++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java
@@ -67,19 +67,17 @@
   static final class ClusterResolverConfig {
     // Ordered list of clusters to be resolved.
     final List<DiscoveryMechanism> discoveryMechanisms;
-    final PolicySelection localityPickingPolicy;
-    final PolicySelection endpointPickingPolicy;
+    // Endpoint-level load balancing policy with config (round_robin or ring_hash).
+    final PolicySelection lbPolicy;
 
-    ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms,
-        PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy) {
+    ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, PolicySelection lbPolicy) {
       this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
-      this.localityPickingPolicy = checkNotNull(localityPickingPolicy, "localityPickingPolicy");
-      this.endpointPickingPolicy = checkNotNull(endpointPickingPolicy, "endpointPickingPolicy");
+      this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy");
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(discoveryMechanisms, localityPickingPolicy, endpointPickingPolicy);
+      return Objects.hash(discoveryMechanisms, lbPolicy);
     }
 
     @Override
@@ -92,16 +90,14 @@
       }
       ClusterResolverConfig that = (ClusterResolverConfig) o;
       return discoveryMechanisms.equals(that.discoveryMechanisms)
-          && localityPickingPolicy.equals(that.localityPickingPolicy)
-          && endpointPickingPolicy.equals(that.endpointPickingPolicy);
+          && lbPolicy.equals(that.lbPolicy);
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)
           .add("discoveryMechanisms", discoveryMechanisms)
-          .add("localityPickingPolicy", localityPickingPolicy)
-          .add("endpointPickingPolicy", endpointPickingPolicy)
+          .add("lbPolicy", lbPolicy)
           .toString();
     }
 
diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java
index 73c57e7..37c07eb 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClient.java
@@ -178,7 +178,7 @@
     abstract ClusterType clusterType();
 
     // Endpoint-level load balancing policy.
-    abstract String lbPolicy();
+    abstract LbPolicy lbPolicy();
 
     // Only valid if lbPolicy is "ring_hash".
     abstract long minRingSize();
@@ -251,6 +251,10 @@
       EDS, LOGICAL_DNS, AGGREGATE
     }
 
+    enum LbPolicy {
+      ROUND_ROBIN, RING_HASH
+    }
+
     // FIXME(chengyuanzhang): delete this after UpstreamTlsContext's toString() is fixed.
     @Override
     public final String toString() {
@@ -270,38 +274,37 @@
 
     @AutoValue.Builder
     abstract static class Builder {
-      // Private do not use.
+      // Private, use one of the static factory methods instead.
       protected abstract Builder clusterName(String clusterName);
 
-      // Private do not use.
+      // Private, use one of the static factory methods instead.
       protected abstract Builder clusterType(ClusterType clusterType);
 
-      // Private do not use.
-      protected abstract Builder lbPolicy(String lbPolicy);
+      abstract Builder lbPolicy(LbPolicy lbPolicy);
 
-      Builder lbPolicy(String lbPolicy, long minRingSize, long maxRingSize) {
+      Builder lbPolicy(LbPolicy lbPolicy, long minRingSize, long maxRingSize) {
         return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize);
       }
 
-      // Private do not use.
+      // Private, use lbPolicy(LbPolicy, long, long).
       protected abstract Builder minRingSize(long minRingSize);
 
-      // Private do not use.
+      // Private, use lbPolicy(.LbPolicy, long, long)
       protected abstract Builder maxRingSize(long maxRingSize);
 
-      // Private do not use.
+      // Private, use CdsUpdate.forEds() instead.
       protected abstract Builder edsServiceName(String edsServiceName);
 
-      // Private do not use.
+      // Private, use one of the static factory methods instead.
       protected abstract Builder lrsServerName(String lrsServerName);
 
-      // Private do not use.
+      // Private, use one of the static factory methods instead.
       protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests);
 
-      // Private do not use.
+      // Private, use one of the static factory methods instead.
       protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext);
 
-      // Private do not use.
+      // Private, use CdsUpdate.forAggregate() instead.
       protected abstract Builder prioritizedClusterNames(List<String> prioritizedClusterNames);
 
       abstract CdsUpdate build();
diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
index 9a3aa29..2287b8f 100644
--- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
+++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
@@ -18,7 +18,6 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
-import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -47,6 +46,9 @@
 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
 import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
 import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
+import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
+import io.grpc.xds.XdsClient.CdsUpdate;
+import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
 import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -118,8 +120,8 @@
 
     when(helper.getSynchronizationContext()).thenReturn(syncContext);
     lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME));
-    lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME));
     lbRegistry.register(new FakeLoadBalancerProvider("round_robin"));
+    lbRegistry.register(new FakeLoadBalancerProvider("ring_hash"));
     loadBalancer = new CdsLoadBalancer2(helper, lbRegistry);
     loadBalancer.handleResolvedAddresses(
         ResolvedAddresses.newBuilder()
@@ -144,8 +146,10 @@
 
   @Test
   public void discoverTopLevelEdsCluster() {
-    xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L,
-        upstreamTlsContext);
+    CdsUpdate update =
+        CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(childBalancers).hasSize(1);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
@@ -154,15 +158,15 @@
     DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
     assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
         LRS_SERVER_NAME, 100L, upstreamTlsContext);
-    assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName())
-        .isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
-    assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
-        .isEqualTo("round_robin");
+    assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
   }
 
   @Test
   public void discoverTopLevelLogicalDnsCluster() {
-    xdsClient.deliverLogicalDnsCluster(CLUSTER, LRS_SERVER_NAME, 100L, upstreamTlsContext);
+    CdsUpdate update =
+        CdsUpdate.forLogicalDns(CLUSTER, LRS_SERVER_NAME, 100L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(childBalancers).hasSize(1);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
@@ -171,10 +175,7 @@
     DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
     assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null,
         LRS_SERVER_NAME, 100L, upstreamTlsContext);
-    assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName())
-        .isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
-    assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
-        .isEqualTo("round_robin");
+    assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
   }
 
   @Test
@@ -189,7 +190,10 @@
 
   @Test
   public void nonAggregateCluster_resourceUpdate() {
-    xdsClient.deliverEdsCluster(CLUSTER, null, null, 100L, upstreamTlsContext);
+    CdsUpdate update =
+        CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(childBalancers).hasSize(1);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
@@ -197,7 +201,9 @@
     assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, null, null, 100L,
         upstreamTlsContext);
 
-    xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null);
+    update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null)
+        .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     childLbConfig = (ClusterResolverConfig) childBalancer.config;
     instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
     assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
@@ -206,7 +212,10 @@
 
   @Test
   public void nonAggregateCluster_resourceRevoked() {
-    xdsClient.deliverLogicalDnsCluster(CLUSTER, null, 100L, upstreamTlsContext);
+    CdsUpdate update =
+        CdsUpdate.forLogicalDns(CLUSTER, null, 100L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(childBalancers).hasSize(1);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
@@ -225,27 +234,40 @@
   }
 
   @Test
-  public void discoveryAggregateCluster() {
+  public void discoverAggregateCluster() {
     String cluster1 = "cluster-01.googleapis.com";
     String cluster2 = "cluster-02.googleapis.com";
     // CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)]
-    xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2));
+    CdsUpdate update =
+        CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
+            .lbPolicy(LbPolicy.RING_HASH, 100L, 1000L).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
     assertThat(childBalancers).isEmpty();
     String cluster3 = "cluster-03.googleapis.com";
     String cluster4 = "cluster-04.googleapis.com";
     // cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)]
-    xdsClient.deliverAggregateCluster(cluster1, Arrays.asList(cluster3, cluster4));
+    CdsUpdate update1 =
+        CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster4))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster1, update1);
     assertThat(xdsClient.watchers.keySet()).containsExactly(
         CLUSTER, cluster1, cluster2, cluster3, cluster4);
     assertThat(childBalancers).isEmpty();
-    xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L,
-        upstreamTlsContext);
+    CdsUpdate update3 =
+        CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster3, update3);
     assertThat(childBalancers).isEmpty();
-    xdsClient.deliverLogicalDnsCluster(cluster2, null, 100L, null);
+    CdsUpdate update2 =
+        CdsUpdate.forLogicalDns(cluster2, null, 100L, null)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster2, update2);
     assertThat(childBalancers).isEmpty();
-    xdsClient.deliverEdsCluster(cluster4, null, LRS_SERVER_NAME, 300L,
-        null);
+    CdsUpdate update4 =
+        CdsUpdate.forEds(cluster4, null, LRS_SERVER_NAME, 300L, null)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster4, update4);
     assertThat(childBalancers).hasSize(1);  // all non-aggregate clusters discovered
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
@@ -258,17 +280,20 @@
         DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext);
     assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4,
         DiscoveryMechanism.Type.EDS, null, LRS_SERVER_NAME, 300L, null);
-    assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName())
-        .isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
-    assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
-        .isEqualTo("round_robin");
+    assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName())
+        .isEqualTo("ring_hash");  // dominated by top-level cluster's config
+    assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).minRingSize).isEqualTo(100L);
+    assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).maxRingSize).isEqualTo(1000L);
   }
 
   @Test
   public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() {
     String cluster1 = "cluster-01.googleapis.com";
     // CLUSTER (aggr.) -> [cluster1 (EDS)]
-    xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
+    CdsUpdate update =
+        CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
     xdsClient.deliverResourceNotExist(cluster1);
     verify(helper).updateBalancingState(
@@ -283,11 +308,19 @@
     String cluster1 = "cluster-01.googleapis.com";
     String cluster2 = "cluster-02.googleapis.com";
     // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
-    xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2));
+    CdsUpdate update =
+        CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
-    xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null);
-    xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L,
-        upstreamTlsContext);
+    CdsUpdate update1 =
+        CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster1, update1);
+    CdsUpdate update2 =
+        CdsUpdate.forLogicalDns(cluster2, LRS_SERVER_NAME, 100L, null)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster2, update2);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
     assertThat(childLbConfig.discoveryMechanisms).hasSize(2);
@@ -321,11 +354,19 @@
     String cluster1 = "cluster-01.googleapis.com";
     String cluster2 = "cluster-02.googleapis.com";
     // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
-    xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2));
+    CdsUpdate update =
+        CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
-    xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null);
-    xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L,
-        upstreamTlsContext);
+    CdsUpdate update1 =
+        CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster1, update1);
+    CdsUpdate update2 =
+        CdsUpdate.forLogicalDns(cluster2, LRS_SERVER_NAME, 100L, null)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster2, update2);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
     assertThat(childLbConfig.discoveryMechanisms).hasSize(2);
@@ -349,20 +390,31 @@
   public void aggregateCluster_intermediateClusterChanges() {
     String cluster1 = "cluster-01.googleapis.com";
     // CLUSTER (aggr.) -> [cluster1]
-    xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
+    CdsUpdate update =
+        CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
 
     // CLUSTER (aggr.) -> [cluster2 (aggr.)]
     String cluster2 = "cluster-02.googleapis.com";
-    xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster2));
+    update =
+        CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster2))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2);
 
     // cluster2 (aggr.) -> [cluster3 (EDS)]
     String cluster3 = "cluster-03.googleapis.com";
-    xdsClient.deliverAggregateCluster(cluster2, Collections.singletonList(cluster3));
+    CdsUpdate update2 =
+        CdsUpdate.forAggregate(cluster2, Collections.singletonList(cluster3))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster2, update2);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
-    xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L,
-        upstreamTlsContext);
+    CdsUpdate update3 =
+        CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster3, update3);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
     assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
@@ -386,7 +438,10 @@
   public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicker() {
     String cluster1 = "cluster-01.googleapis.com";
     // CLUSTER (aggr.) -> [cluster1]
-    xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
+    CdsUpdate update =
+        CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
     Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM");
     xdsClient.deliverError(error);
@@ -400,8 +455,14 @@
   public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildLb() {
     String cluster1 = "cluster-01.googleapis.com";
     // CLUSTER (aggr.) -> [cluster1 (logical DNS)]
-    xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
-    xdsClient.deliverLogicalDnsCluster(cluster1, LRS_SERVER_NAME, 200L, null);
+    CdsUpdate update =
+        CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
+    CdsUpdate update1 =
+        CdsUpdate.forLogicalDns(cluster1, LRS_SERVER_NAME, 200L, null)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(cluster1, update1);
     FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
     ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config;
     assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
@@ -423,8 +484,10 @@
 
   @Test
   public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
-    xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L,
-        upstreamTlsContext);
+    CdsUpdate update =
+        CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
+            .lbPolicy(LbPolicy.ROUND_ROBIN).build();
+    xdsClient.deliverCdsUpdate(CLUSTER, update);
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
     assertThat(childBalancer.shutdown).isFalse();
     loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable"));
@@ -541,31 +604,8 @@
       watchers.remove(resourceName);
     }
 
-    private void deliverEdsCluster(String clusterName, @Nullable String edsServiceName,
-        @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests,
-        @Nullable UpstreamTlsContext tlsContext) {
+    private void deliverCdsUpdate(String clusterName, CdsUpdate update) {
       if (watchers.containsKey(clusterName)) {
-        CdsUpdate update = CdsUpdate.forEds(
-            clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext)
-            .lbPolicy("round_robin").build();
-        watchers.get(clusterName).onChanged(update);
-      }
-    }
-
-    private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName,
-        @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) {
-      if (watchers.containsKey(clusterName)) {
-        CdsUpdate update = CdsUpdate.forLogicalDns(
-            clusterName, lrsServerName, maxConcurrentRequests, tlsContext)
-            .lbPolicy("round_robin").build();
-        watchers.get(clusterName).onChanged(update);
-      }
-    }
-
-    private void deliverAggregateCluster(String clusterName, List<String> clusters) {
-      if (watchers.containsKey(clusterName)) {
-        CdsUpdate update = CdsUpdate.forAggregate(clusterName, clusters)
-            .lbPolicy("round_robin").build();
         watchers.get(clusterName).onChanged(update);
       }
     }
diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
index 692bf9e..20a9da6 100644
--- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
+++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
@@ -62,6 +62,7 @@
 import io.grpc.xds.XdsClient.CdsResourceWatcher;
 import io.grpc.xds.XdsClient.CdsUpdate;
 import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
+import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
 import io.grpc.xds.XdsClient.EdsResourceWatcher;
 import io.grpc.xds.XdsClient.EdsUpdate;
 import io.grpc.xds.XdsClient.LdsResourceWatcher;
@@ -1132,7 +1133,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isNull();
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isNull();
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1157,7 +1158,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isNull();
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("ring_hash");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.RING_HASH);
     assertThat(cdsUpdate.minRingSize()).isEqualTo(10L);
     assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L);
     assertThat(cdsUpdate.lrsServerName()).isNull();
@@ -1183,7 +1184,7 @@
     CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE);
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder();
     verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
@@ -1204,7 +1205,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isNull();
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isNull();
     assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L);
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1268,7 +1269,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isNull();
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isNull();
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1306,7 +1307,7 @@
     CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isNull();
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1323,7 +1324,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1344,7 +1345,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isNull();
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isNull();
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1392,7 +1393,7 @@
     CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
     assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isNull();
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1401,7 +1402,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@@ -1410,7 +1411,7 @@
     assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
     assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
     assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
-    assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
     assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java
index 264f623..eb1576f 100644
--- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java
@@ -65,6 +65,7 @@
 import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
+import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
 import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
 import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
 import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
@@ -113,6 +114,12 @@
           CommonTlsContextTestsUtil.CLIENT_KEY_FILE,
           CommonTlsContextTestsUtil.CLIENT_PEM_FILE,
           CommonTlsContextTestsUtil.CA_PEM_FILE);
+  private final DiscoveryMechanism edsDiscoveryMechanism1 =
+      DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, tlsContext);
+  private final DiscoveryMechanism edsDiscoveryMechanism2 =
+      DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, tlsContext);
+  private final DiscoveryMechanism logicalDnsDiscoveryMechanism =
+      DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 300L, null);
 
   private final SynchronizationContext syncContext = new SynchronizationContext(
       new Thread.UncaughtExceptionHandler() {
@@ -126,8 +133,8 @@
   private final NameResolverRegistry nsRegistry = new NameResolverRegistry();
   private final PolicySelection roundRobin =
       new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null);
-  private final PolicySelection weightedTarget =
-      new PolicySelection(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME), null);
+  private final PolicySelection ringHash = new PolicySelection(
+      new FakeLoadBalancerProvider("ring_hash"), new RingHashConfig(10L, 100L));
   private final List<FakeLoadBalancer> childBalancers = new ArrayList<>();
   private final List<FakeNameResolver> resolvers = new ArrayList<>();
   private final FakeXdsClient xdsClient = new FakeXdsClient();
@@ -165,6 +172,7 @@
 
     lbRegistry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME));
     lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_IMPL_POLICY_NAME));
+    lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME));
     lbRegistry.register(
         new FakeLoadBalancerProvider("pick_first")); // needed by logical_dns
     URI targetUri = new URI(AUTHORITY);
@@ -199,8 +207,65 @@
   }
 
   @Test
+  public void edsClustersWithRingHashEndpointLbPolicy() {
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Collections.singletonList(edsDiscoveryMechanism1), ringHash);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(childBalancers).isEmpty();
+
+    // One priority with two localities of different weights.
+    EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
+    EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
+    LocalityLbEndpoints localityLbEndpoints1 =
+        LocalityLbEndpoints.create(
+            Collections.singletonList(
+                LbEndpoint.create(endpoint1, 100 /* loadBalancingWeight */, true)),
+            10 /* localityWeight */, 1 /* priority */);
+    LocalityLbEndpoints localityLbEndpoints2 =
+        LocalityLbEndpoints.create(
+            Collections.singletonList(
+                LbEndpoint.create(endpoint2, 60 /* loadBalancingWeight */, true)),
+            50 /* localityWeight */, 1 /* priority */);
+    xdsClient.deliverClusterLoadAssignment(
+        EDS_SERVICE_NAME1,
+        ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
+    assertThat(childBalancers).hasSize(1);
+    FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
+    assertThat(childBalancer.addresses).hasSize(2);
+    EquivalentAddressGroup addr1 = childBalancer.addresses.get(0);
+    EquivalentAddressGroup addr2 = childBalancer.addresses.get(1);
+    assertThat(addr1.getAddresses()).isEqualTo(endpoint1.getAddresses());
+    assertThat(addr1.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT))
+        .isEqualTo(10 * 100);
+    assertThat(addr2.getAddresses()).isEqualTo(endpoint2.getAddresses());
+    assertThat(addr2.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT))
+        .isEqualTo(50 * 60);
+    assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
+    PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
+    assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[priority1]");
+    PriorityChildConfig priorityChildConfig =
+        Iterables.getOnlyElement(priorityLbConfig.childConfigs.values());
+    assertThat(priorityChildConfig.ignoreReresolution).isTrue();
+    assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName())
+        .isEqualTo(CLUSTER_IMPL_POLICY_NAME);
+    ClusterImplConfig clusterImplConfig =
+        (ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
+    assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L,
+        tlsContext, Collections.<DropOverload>emptyList(), "ring_hash");
+    RingHashConfig ringHashConfig =
+        (RingHashConfig) clusterImplConfig.childPolicy.getConfig();
+    assertThat(ringHashConfig.minRingSize).isEqualTo(10L);
+    assertThat(ringHashConfig.maxRingSize).isEqualTo(100L);
+  }
+
+  @Test
   public void onlyEdsClusters_receivedEndpoints() {
-    deliverConfigWithEdsClusters();  // CLUSTER1 and CLUSTER2
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
+    assertThat(childBalancers).isEmpty();
     // CLUSTER1 has priority 1 (priority3), which has locality 2, which has endpoint3.
     // CLUSTER2 has priority 1 (priority1) and 2 (priority2); priority1 has locality1,
     // which has endpoint1 and endpoint2; priority2 has locality3, which has endpoint4.
@@ -209,11 +274,19 @@
     EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3");
     EquivalentAddressGroup endpoint4 = makeAddress("endpoint-addr-4");
     LocalityLbEndpoints localityLbEndpoints1 =
-        buildLocalityLbEndpoints(1, 70, ImmutableMap.of(endpoint1, true, endpoint2, true));
+        LocalityLbEndpoints.create(
+            Arrays.asList(
+                LbEndpoint.create(endpoint1, 100, true),
+                LbEndpoint.create(endpoint2, 100, true)),
+            70 /* localityWeight */, 1 /* priority */);
     LocalityLbEndpoints localityLbEndpoints2 =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint3, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint3, 100, true)),
+            10 /* localityWeight */, 1 /* priority */);
     LocalityLbEndpoints localityLbEndpoints3 =
-        buildLocalityLbEndpoints(2, 20, Collections.singletonMap(endpoint4, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint4, 100, true)),
+            20 /* localityWeight */, 2 /* priority */);
     String priority1 = CLUSTER2 + "[priority1]";
     String priority2 = CLUSTER2 + "[priority2]";
     String priority3 = CLUSTER1 + "[priority1]";
@@ -291,7 +364,11 @@
 
   @Test
   public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
-    deliverConfigWithEdsClusters();  // CLUSTER1 and CLUSTER2
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
     verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@@ -308,14 +385,22 @@
 
   @Test
   public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() {
-    deliverConfigWithEdsClusters();  // CLUSTER1 and CLUSTER2
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
     LocalityLbEndpoints localityLbEndpoints1 =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint1, 100, true)),
+            10 /* localityWeight */, 1 /* priority */);
     LocalityLbEndpoints localityLbEndpoints2 =
-        buildLocalityLbEndpoints(2, 20, Collections.singletonMap(endpoint2, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint2, 100, true)),
+            20 /* localityWeight */, 2 /* priority */);
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1));
     xdsClient.deliverClusterLoadAssignment(
@@ -333,25 +418,19 @@
     assertPicker(pickerCaptor.getValue(), expectedError, null);
   }
 
-  private void deliverConfigWithEdsClusters() {
-    DiscoveryMechanism instance1 =
-        DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, tlsContext);
-    DiscoveryMechanism instance2 =
-        DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, tlsContext);
-    ClusterResolverConfig config =
-        new ClusterResolverConfig(Arrays.asList(instance1, instance2), weightedTarget, roundRobin);
-    deliverLbConfig(config);
-    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
-    assertThat(childBalancers).isEmpty();
-  }
-
   @Test
   public void handleEdsResource_ignoreUnhealthyEndpoints() {
-    deliverConfigWithSingleEdsCluster();  // CLUSTER1
+    ClusterResolverConfig config =
+        new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
+    deliverLbConfig(config);
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
     LocalityLbEndpoints localityLbEndpoints =
-        buildLocalityLbEndpoints(1, 10, ImmutableMap.of(endpoint1, false, endpoint2, true));
+        LocalityLbEndpoints.create(
+            Arrays.asList(
+                LbEndpoint.create(endpoint1, 100, false /* isHealthy */),
+                LbEndpoint.create(endpoint2, 100, true /* isHealthy */)),
+            10 /* localityWeight */, 1 /* priority */);
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
@@ -361,13 +440,19 @@
 
   @Test
   public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
-    deliverConfigWithSingleEdsCluster();  // CLUSTER1
+    ClusterResolverConfig config =
+        new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
+    deliverLbConfig(config);
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
     LocalityLbEndpoints localityLbEndpoints1 =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, false));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */)),
+            10 /* localityWeight */, 1 /* priority */);
     LocalityLbEndpoints localityLbEndpoints2 =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint2, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */)),
+            10 /* localityWeight */, 1 /* priority */);
     String priority = CLUSTER1 + "[priority1]";
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1,
@@ -385,29 +470,38 @@
 
   @Test
   public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() {
-    deliverConfigWithSingleEdsCluster();   // CLUSTER1
+    ClusterResolverConfig config =
+        new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
+    deliverLbConfig(config);
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
     LocalityLbEndpoints localityLbEndpoints1 =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, false));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */)),
+            10 /* localityWeight */, 1 /* priority */);
     LocalityLbEndpoints localityLbEndpoints2 =
-        buildLocalityLbEndpoints(2, 10, Collections.singletonMap(endpoint2, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */)),
+            10 /* localityWeight */, 2 /* priority */);
     String priority2 = CLUSTER1 + "[priority2]";
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1,
         ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
 
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
-    PriorityLbConfig config = (PriorityLbConfig) childBalancer.config;
-    assertThat(config.priorities).containsExactly(priority2);
+    assertThat(((PriorityLbConfig) childBalancer.config).priorities).containsExactly(priority2);
   }
 
   @Test
   public void handleEdsResource_noHealthyEndpoint() {
-    deliverConfigWithSingleEdsCluster();   // CLUSTER1
+    ClusterResolverConfig config =
+        new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
+    deliverLbConfig(config);
     EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
     LocalityLbEndpoints localityLbEndpoints =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, false));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */)),
+            10 /* localityWeight */, 1 /* priority */);
     xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1,
         Collections.singletonMap(locality1, localityLbEndpoints));  // single endpoint, unhealthy
 
@@ -418,19 +512,13 @@
         Status.UNAVAILABLE.withDescription("No usable endpoint"), null);
   }
 
-  private void deliverConfigWithSingleEdsCluster() {
-    DiscoveryMechanism instance =
-        DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, null);
-    ClusterResolverConfig config =
-        new ClusterResolverConfig(Collections.singletonList(instance), weightedTarget, roundRobin);
-    deliverLbConfig(config);
-    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
-    assertThat(childBalancers).isEmpty();
-  }
-
   @Test
   public void onlyLogicalDnsCluster_endpointsResolved() {
-    deliverConfigWithSingleLogicalDnsCluster();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
     FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@@ -447,14 +535,18 @@
         .isEqualTo(CLUSTER_IMPL_POLICY_NAME);
     ClusterImplConfig clusterImplConfig =
         (ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
-    assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_NAME, 100L, null,
+    assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_NAME, 300L, null,
         Collections.<DropOverload>emptyList(), "pick_first");
     assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses);
   }
 
   @Test
   public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
-    deliverConfigWithSingleLogicalDnsCluster();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
     FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@@ -470,7 +562,11 @@
   public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
     InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
         backoffPolicy1, backoffPolicy2);
-    deliverConfigWithSingleLogicalDnsCluster();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
     Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server");
     resolver.deliverError(error);
@@ -513,7 +609,11 @@
   @Test
   public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() {
     InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
-    deliverConfigWithSingleLogicalDnsCluster();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     EquivalentAddressGroup endpoint = makeAddress("endpoint-addr");
     FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
     resolver.deliverEndpointAddresses(Collections.singletonList(endpoint));
@@ -547,26 +647,23 @@
     inOrder.verifyNoMoreInteractions();
   }
 
-  private void deliverConfigWithSingleLogicalDnsCluster() {
-    DiscoveryMechanism instance =
-        DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 100L, null);
-    ClusterResolverConfig config =
-        new ClusterResolverConfig(Collections.singletonList(instance), weightedTarget, roundRobin);
-    deliverLbConfig(config);
-    assertThat(resolvers).hasSize(1);
-    assertThat(childBalancers).isEmpty();
-  }
-
   @Test
   public void edsClustersAndLogicalDnsCluster_receivedEndpoints() {
-    deliverConfigWithEdsAndLogicalDnsClusters();  // CLUSTER1 and CLUSTER_DNS
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");  // DNS endpoint
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");  // DNS endpoint
     EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3");  // EDS endpoint
     FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
     resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
     LocalityLbEndpoints localityLbEndpoints =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint3, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint3, 100, true)),
+            10 /* localityWeight */, 1 /* priority */);
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
 
@@ -587,7 +684,12 @@
 
   @Test
   public void noEdsResourceExists_useDnsResolutionResults() {
-    deliverConfigWithEdsAndLogicalDnsClusters();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
     verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@@ -609,11 +711,18 @@
 
   @Test
   public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() {
-    deliverConfigWithEdsAndLogicalDnsClusters();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
     LocalityLbEndpoints localityLbEndpoints =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint, 100, true)),
+            10 /* localityWeight */, 1 /* priority */);
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
     FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@@ -634,11 +743,18 @@
 
   @Test
   public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() {
-    deliverConfigWithEdsAndLogicalDnsClusters();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
     LocalityLbEndpoints localityLbEndpoints =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint, 100, true)),
+            10 /* localityWeight */, 1 /* priority */);
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
     FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);  // child LB created
@@ -656,7 +772,12 @@
 
   @Test
   public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() {
-    deliverConfigWithEdsAndLogicalDnsClusters();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     xdsClient.deliverError(Status.UNIMPLEMENTED.withDescription("not found"));
     assertThat(childBalancers).isEmpty();
@@ -672,7 +793,12 @@
 
   @Test
   public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() {
-    deliverConfigWithEdsAndLogicalDnsClusters();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     Status upstreamError = Status.UNAVAILABLE.withDescription("unreachable");
     loadBalancer.handleNameResolutionError(upstreamError);
@@ -683,12 +809,19 @@
 
   @Test
   public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
-    deliverConfigWithEdsAndLogicalDnsClusters();
+    ClusterResolverConfig config = new ClusterResolverConfig(
+        Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
+    deliverLbConfig(config);
+    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
+    assertThat(resolvers).hasSize(1);
+    assertThat(childBalancers).isEmpty();
     reset(helper);
     EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
     EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
     LocalityLbEndpoints localityLbEndpoints =
-        buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true));
+        LocalityLbEndpoints.create(
+            Collections.singletonList(LbEndpoint.create(endpoint1, 100, true)),
+            10 /* localityWeight */, 1 /* priority */);
     xdsClient.deliverClusterLoadAssignment(
         EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
     FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@@ -706,19 +839,6 @@
         any(ConnectivityState.class), any(SubchannelPicker.class));
   }
 
-  private void deliverConfigWithEdsAndLogicalDnsClusters() {
-    DiscoveryMechanism instance1 =
-        DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, null);
-    DiscoveryMechanism instance2 =
-        DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 200L, null);
-    ClusterResolverConfig config =
-        new ClusterResolverConfig(Arrays.asList(instance1, instance2), weightedTarget, roundRobin);
-    deliverLbConfig(config);
-    assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
-    assertThat(resolvers).hasSize(1);
-    assertThat(childBalancers).isEmpty();
-  }
-
   private void deliverLbConfig(ClusterResolverConfig config) {
     loadBalancer.handleResolvedAddresses(
         ResolvedAddresses.newBuilder()
@@ -765,16 +885,6 @@
     }
   }
 
-  private static LocalityLbEndpoints buildLocalityLbEndpoints(
-      int priority, int localityWeight, Map<EquivalentAddressGroup, Boolean> managedEndpoints) {
-    List<LbEndpoint> endpoints = new ArrayList<>();
-    for (EquivalentAddressGroup addr : managedEndpoints.keySet()) {
-      boolean status = managedEndpoints.get(addr);
-      endpoints.add(LbEndpoint.create(addr, 100 /* unused */, status));
-    }
-    return LocalityLbEndpoints.create(endpoints, localityWeight, priority);
-  }
-
   private static EquivalentAddressGroup makeAddress(final String name) {
     class FakeSocketAddress extends SocketAddress {
       private final String name;