xds: refactor LoadReportClient with xds v2 dropped (#9788)

diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java
index 3318dfb..d6a3679 100644
--- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java
+++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java
@@ -25,7 +25,6 @@
 import com.google.common.base.Supplier;
 import com.google.protobuf.util.Durations;
 import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
-import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub;
 import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
 import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
 import io.grpc.Channel;
@@ -158,7 +157,7 @@
       return;
     }
     checkState(lrsStream == null, "previous lbStream has not been cleared yet");
-    lrsStream = new LrsStreamV3();
+    lrsStream = new LrsStream();
     retryStopwatch.reset().start();
     Context prevContext = context.attach();
     try {
@@ -168,24 +167,73 @@
     }
   }
 
-  // TODO(zivy@): The abstract class was used to support xds v2 and v3. Remove abstract here since
-  // v2 is dropped and v3 is the only supported version now.
-  private abstract class LrsStream {
+  private final class LrsStream {
     boolean initialResponseReceived;
     boolean closed;
     long intervalNano = -1;
     boolean reportAllClusters;
     List<String> clusterNames;  // clusters to report loads for, if not report all.
     ScheduledHandle loadReportTimer;
+    StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
 
-    abstract void start();
+    void start() {
+      StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
+          new StreamObserver<LoadStatsResponse>() {
+            @Override
+            public void onNext(final LoadStatsResponse response) {
+              syncContext.execute(new Runnable() {
+                @Override
+                public void run() {
+                  logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
+                  handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
+                      Durations.toNanos(response.getLoadReportingInterval()));
+                }
+              });
+            }
 
-    abstract void sendLoadStatsRequest(List<ClusterStats> clusterStatsList);
+            @Override
+            public void onError(final Throwable t) {
+              syncContext.execute(new Runnable() {
+                @Override
+                public void run() {
+                  handleRpcError(t);
+                }
+              });
+            }
 
-    abstract void sendError(Exception error);
+            @Override
+            public void onCompleted() {
+              syncContext.execute(new Runnable() {
+                @Override
+                public void run() {
+                  handleRpcCompleted();
+                }
+              });
+            }
+          };
+      lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
+          .streamLoadStats(lrsResponseReaderV3);
+      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
+      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
+    }
 
-    final void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
-        long loadReportIntervalNano) {
+    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
+      LoadStatsRequest.Builder requestBuilder =
+          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
+      for (ClusterStats stats : clusterStatsList) {
+        requestBuilder.addClusterStats(buildClusterStats(stats));
+      }
+      LoadStatsRequest request = requestBuilder.build();
+      lrsRequestWriterV3.onNext(request);
+      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
+    }
+
+    void sendError(Exception error) {
+      lrsRequestWriterV3.onError(error);
+    }
+
+    void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
+                           long loadReportIntervalNano) {
       if (closed) {
         return;
       }
@@ -205,11 +253,11 @@
       scheduleNextLoadReport();
     }
 
-    final void handleRpcError(Throwable t) {
+    void handleRpcError(Throwable t) {
       handleStreamClosed(Status.fromThrowable(t));
     }
 
-    final void handleRpcCompleted() {
+    void handleRpcCompleted() {
       handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
     }
 
@@ -291,70 +339,6 @@
         lrsStream = null;
       }
     }
-  }
-
-  private final class LrsStreamV3 extends LrsStream {
-    StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
-
-    @Override
-    void start() {
-      StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
-          new StreamObserver<LoadStatsResponse>() {
-            @Override
-            public void onNext(final LoadStatsResponse response) {
-              syncContext.execute(new Runnable() {
-                @Override
-                public void run() {
-                  logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
-                  handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
-                      Durations.toNanos(response.getLoadReportingInterval()));
-                }
-              });
-            }
-
-            @Override
-            public void onError(final Throwable t) {
-              syncContext.execute(new Runnable() {
-                @Override
-                public void run() {
-                  handleRpcError(t);
-                }
-              });
-            }
-
-            @Override
-            public void onCompleted() {
-              syncContext.execute(new Runnable() {
-                @Override
-                public void run() {
-                  handleRpcCompleted();
-                }
-              });
-            }
-          };
-      LoadReportingServiceStub stubV3 =
-          LoadReportingServiceGrpc.newStub(channel);
-      lrsRequestWriterV3 = stubV3.withWaitForReady().streamLoadStats(lrsResponseReaderV3);
-      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
-      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
-    }
-
-    @Override
-    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
-      LoadStatsRequest.Builder requestBuilder =
-          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
-      for (ClusterStats stats : clusterStatsList) {
-        requestBuilder.addClusterStats(buildClusterStats(stats));
-      }
-      LoadStatsRequest request = requestBuilder.build();
-      lrsRequestWriterV3.onNext(request);
-      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
-    }
-
-    @Override
-    void sendError(Exception error) {
-      lrsRequestWriterV3.onError(error);
-    }
 
     private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
         ClusterStats stats) {
diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
index c04cdd3..1394491 100644
--- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
+++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
@@ -71,7 +71,6 @@
 /**
  * Unit tests for {@link LoadReportClient}.
  */
-// TODO(chengyuanzhang): missing LRS V3 test.
 @RunWith(JUnit4.class)
 public class LoadReportClientTest {
   // bootstrap node identifier