xds: refactor LoadReportClient with xds v2 dropped (#9788)
diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java
index 3318dfb..d6a3679 100644
--- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java
+++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java
@@ -25,7 +25,6 @@
import com.google.common.base.Supplier;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
-import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Channel;
@@ -158,7 +157,7 @@
return;
}
checkState(lrsStream == null, "previous lbStream has not been cleared yet");
- lrsStream = new LrsStreamV3();
+ lrsStream = new LrsStream();
retryStopwatch.reset().start();
Context prevContext = context.attach();
try {
@@ -168,24 +167,73 @@
}
}
- // TODO(zivy@): The abstract class was used to support xds v2 and v3. Remove abstract here since
- // v2 is dropped and v3 is the only supported version now.
- private abstract class LrsStream {
+ private final class LrsStream {
boolean initialResponseReceived;
boolean closed;
long intervalNano = -1;
boolean reportAllClusters;
List<String> clusterNames; // clusters to report loads for, if not report all.
ScheduledHandle loadReportTimer;
+ StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
- abstract void start();
+ void start() {
+ StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
+ new StreamObserver<LoadStatsResponse>() {
+ @Override
+ public void onNext(final LoadStatsResponse response) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
+ handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
+ Durations.toNanos(response.getLoadReportingInterval()));
+ }
+ });
+ }
- abstract void sendLoadStatsRequest(List<ClusterStats> clusterStatsList);
+ @Override
+ public void onError(final Throwable t) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ handleRpcError(t);
+ }
+ });
+ }
- abstract void sendError(Exception error);
+ @Override
+ public void onCompleted() {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ handleRpcCompleted();
+ }
+ });
+ }
+ };
+ lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
+ .streamLoadStats(lrsResponseReaderV3);
+ logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
+ sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
+ }
- final void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
- long loadReportIntervalNano) {
+ void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
+ LoadStatsRequest.Builder requestBuilder =
+ LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
+ for (ClusterStats stats : clusterStatsList) {
+ requestBuilder.addClusterStats(buildClusterStats(stats));
+ }
+ LoadStatsRequest request = requestBuilder.build();
+ lrsRequestWriterV3.onNext(request);
+ logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
+ }
+
+ void sendError(Exception error) {
+ lrsRequestWriterV3.onError(error);
+ }
+
+ void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
+ long loadReportIntervalNano) {
if (closed) {
return;
}
@@ -205,11 +253,11 @@
scheduleNextLoadReport();
}
- final void handleRpcError(Throwable t) {
+ void handleRpcError(Throwable t) {
handleStreamClosed(Status.fromThrowable(t));
}
- final void handleRpcCompleted() {
+ void handleRpcCompleted() {
handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
}
@@ -291,70 +339,6 @@
lrsStream = null;
}
}
- }
-
- private final class LrsStreamV3 extends LrsStream {
- StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
-
- @Override
- void start() {
- StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
- new StreamObserver<LoadStatsResponse>() {
- @Override
- public void onNext(final LoadStatsResponse response) {
- syncContext.execute(new Runnable() {
- @Override
- public void run() {
- logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
- handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
- Durations.toNanos(response.getLoadReportingInterval()));
- }
- });
- }
-
- @Override
- public void onError(final Throwable t) {
- syncContext.execute(new Runnable() {
- @Override
- public void run() {
- handleRpcError(t);
- }
- });
- }
-
- @Override
- public void onCompleted() {
- syncContext.execute(new Runnable() {
- @Override
- public void run() {
- handleRpcCompleted();
- }
- });
- }
- };
- LoadReportingServiceStub stubV3 =
- LoadReportingServiceGrpc.newStub(channel);
- lrsRequestWriterV3 = stubV3.withWaitForReady().streamLoadStats(lrsResponseReaderV3);
- logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
- sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
- }
-
- @Override
- void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
- LoadStatsRequest.Builder requestBuilder =
- LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
- for (ClusterStats stats : clusterStatsList) {
- requestBuilder.addClusterStats(buildClusterStats(stats));
- }
- LoadStatsRequest request = requestBuilder.build();
- lrsRequestWriterV3.onNext(request);
- logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
- }
-
- @Override
- void sendError(Exception error) {
- lrsRequestWriterV3.onError(error);
- }
private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
ClusterStats stats) {
diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
index c04cdd3..1394491 100644
--- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
+++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
@@ -71,7 +71,6 @@
/**
* Unit tests for {@link LoadReportClient}.
*/
-// TODO(chengyuanzhang): missing LRS V3 test.
@RunWith(JUnit4.class)
public class LoadReportClientTest {
// bootstrap node identifier