services: HealthCheckingLoadBalancer logs to ChannelLogger (#5042)

Log the event that health check is disabled due to UNIMPLEMENTED as required in the spec:
https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md

Also log every Subchannel state change that is affected by health-checking, i.e., the state changes when the raw Subchannel state is READY and health-check is running.

Tracking issue: #4932
diff --git a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
index e5f15d0..b2fc2af 100644
--- a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
+++ b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
@@ -27,6 +27,8 @@
 import com.google.common.base.Objects;
 import io.grpc.Attributes;
 import io.grpc.CallOptions;
+import io.grpc.ChannelLogger;
+import io.grpc.ChannelLogger.ChannelLogLevel;
 import io.grpc.ClientCall;
 import io.grpc.ConnectivityStateInfo;
 import io.grpc.EquivalentAddressGroup;
@@ -202,6 +204,7 @@
     private final ScheduledExecutorService timerService;
 
     private Subchannel subchannel;
+    private ChannelLogger subchannelLogger;
 
     // Set when RPC started. Cleared when the RPC has closed or abandoned.
     @Nullable
@@ -232,6 +235,7 @@
     void init(Subchannel subchannel) {
       checkState(this.subchannel == null, "init() already called");
       this.subchannel = checkNotNull(subchannel, "subchannel");
+      this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
     }
 
     void setServiceName(@Nullable String newServiceName) {
@@ -254,7 +258,6 @@
         // A connection was lost.  We will reset disabled flag because health check
         // may be available on the new connection.
         disabled = false;
-        // TODO(zhangkun83): record this to channel tracer
       }
       this.rawState = rawState;
       adjustHealthCheck();
@@ -282,6 +285,7 @@
     }
 
     private void startRpc() {
+      checkState(serviceName != null, "serviceName is null");
       checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up");
       checkState(subchannel != null, "init() not called");
       // Optimization suggested by @markroth: if we are already READY and starting the health
@@ -289,6 +293,8 @@
       // name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs
       // waiting for the health check to respond.
       if (!Objects.equal(concludedState.getState(), READY)) {
+        subchannelLogger.log(
+            ChannelLogLevel.INFO, "CONNECTING: Starting health-check for \"{0}\"", serviceName);
         gotoState(ConnectivityStateInfo.forNonError(CONNECTING));
       }
       activeRpc = new HcStream();
@@ -378,27 +384,34 @@
       void handleResponse(HealthCheckResponse response) {
         callHasResponded = true;
         backoffPolicy = null;
+        ServingStatus status = response.getStatus();
         // running == true means the Subchannel's state (rawState) is READY
-        if (Objects.equal(response.getStatus(), ServingStatus.SERVING)) {
+        if (Objects.equal(status, ServingStatus.SERVING)) {
+          subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
           gotoState(ConnectivityStateInfo.forNonError(READY));
         } else {
+          subchannelLogger.log(
+              ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
           gotoState(
               ConnectivityStateInfo.forTransientFailure(
                   Status.UNAVAILABLE.withDescription(
                       "Health-check service responded "
-                      + response.getStatus() + " for '" + callServiceName + "'")));
+                      + status + " for '" + callServiceName + "'")));
         }
         call.request(1);
       }
 
       void handleStreamClosed(Status status) {
         if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
-          // TODO(zhangkun83): record this to channel tracer
           disabled = true;
+          subchannelLogger.log(ChannelLogLevel.ERROR, "Health-check disabled: {0}", status);
+          subchannelLogger.log(ChannelLogLevel.INFO, "{0} (no health-check)", rawState);
           gotoState(rawState);
           return;
         }
         long delayNanos = 0;
+        subchannelLogger.log(
+            ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check stream closed with {0}", status);
         gotoState(
             ConnectivityStateInfo.forTransientFailure(
                 Status.UNAVAILABLE.withDescription(
@@ -416,6 +429,8 @@
           startRpc();
         } else {
           checkState(!isRetryTimerPending(), "Retry double scheduled");
+          subchannelLogger.log(
+              ChannelLogLevel.DEBUG, "Will retry health-check after {0} ns", delayNanos);
           retryTimer = syncContext.schedule(
               retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService);
         }
diff --git a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
index d347807..e1d4fd4 100644
--- a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
+++ b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
@@ -41,6 +41,8 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import io.grpc.Attributes;
 import io.grpc.Channel;
+import io.grpc.ChannelLogger;
+import io.grpc.ChannelLogger.ChannelLogLevel;
 import io.grpc.ConnectivityState;
 import io.grpc.ConnectivityStateInfo;
 import io.grpc.Context;
@@ -69,6 +71,8 @@
 import io.grpc.internal.ServiceConfigUtil;
 import io.grpc.stub.StreamObserver;
 import java.net.SocketAddress;
+import java.text.MessageFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
@@ -105,7 +109,7 @@
   @SuppressWarnings({"rawtypes", "unchecked"})
   private final List<EquivalentAddressGroup>[] eagLists = new List[NUM_SUBCHANNELS];
   private List<EquivalentAddressGroup> resolvedAddressList;
-  private final Subchannel[] subchannels = new Subchannel[NUM_SUBCHANNELS];
+  private final FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS];
   private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS];
   private final Server[] servers = new Server[NUM_SUBCHANNELS];
   private final HealthImpl[] healthImpls = new HealthImpl[NUM_SUBCHANNELS];
@@ -268,7 +272,7 @@
 
     for (int i = NUM_SUBCHANNELS - 1; i >= 0; i--) {
       // Not starting health check until underlying Subchannel is READY
-      Subchannel subchannel = subchannels[i];
+      FakeSubchannel subchannel = subchannels[i];
       HealthImpl healthImpl = healthImpls[i];
       InOrder inOrder = inOrder(origLb);
       hcLbEventDelivery.handleSubchannelState(
@@ -286,6 +290,7 @@
           same(subchannel), eq(ConnectivityStateInfo.forNonError(IDLE)));
       verifyNoMoreInteractions(origLb);
 
+      assertThat(subchannel.logs).isEmpty();
       assertThat(healthImpl.calls).isEmpty();
       hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
       assertThat(healthImpl.calls).hasSize(1);
@@ -297,6 +302,10 @@
           same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
       verifyNoMoreInteractions(origLb);
 
+      assertThat(subchannel.logs).containsExactly(
+          "INFO: CONNECTING: Starting health-check for \"FooService\"");
+      subchannel.logs.clear();
+
       // Simulate a series of responses.
       for (ServingStatus servingStatus :
                new ServingStatus[] {
@@ -307,18 +316,23 @@
         if (servingStatus == ServingStatus.SERVING) {
           inOrder.verify(origLb).handleSubchannelState(
               same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+          assertThat(subchannel.logs).containsExactly(
+              "INFO: READY: health-check responded SERVING");
         } else {
           inOrder.verify(origLb).handleSubchannelState(
               same(subchannel),unavailableStateWithMsg(
                   "Health-check service responded " + servingStatus + " for 'FooService'"));
+          assertThat(subchannel.logs).containsExactly(
+              "INFO: TRANSIENT_FAILURE: health-check responded " + servingStatus);
         }
+        subchannel.logs.clear();
         verifyNoMoreInteractions(origLb);
       }
     }
 
     // origLb shuts down Subchannels
     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
-      Subchannel subchannel = subchannels[i];
+      FakeSubchannel subchannel = subchannels[i];
 
       ServerSideCall serverCall = healthImpls[i].calls.peek();
       assertThat(serverCall.cancelled).isFalse();
@@ -330,6 +344,7 @@
       assertThat(serverCall.cancelled).isTrue();
       verify(origLb).handleSubchannelState(
           same(subchannel), eq(ConnectivityStateInfo.forNonError(SHUTDOWN)));
+      assertThat(subchannel.logs).isEmpty();
     }
 
     for (int i = 0; i < NUM_SUBCHANNELS; i++) {
@@ -365,6 +380,7 @@
     ServerSideCall serverCall0 = healthImpls[0].calls.poll();
     ServerSideCall serverCall1 = healthImpls[1].calls.poll();
 
+    subchannels[0].logs.clear();
     // subchannels[0] gets UNIMPLEMENTED for health checking, which will disable health
     // checking and it'll use the original state, which is currently READY.
     // In reality UNIMPLEMENTED is generated by GRPC server library, but the client can't tell
@@ -372,6 +388,9 @@
     serverCall0.responseObserver.onError(Status.UNIMPLEMENTED.asException());
     inOrder.verify(origLb).handleSubchannelState(
         same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
+    assertThat(subchannels[0].logs).containsExactly(
+        "ERROR: Health-check disabled: " + Status.UNIMPLEMENTED,
+        "INFO: READY (no health-check)").inOrder();
 
     // subchannels[1] has normal health checking
     serverCall1.responseObserver.onNext(makeResponse(ServingStatus.NOT_SERVING));
@@ -412,7 +431,7 @@
     verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
     verifyNoMoreInteractions(origLb);
 
-    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    FakeSubchannel subchannel = (FakeSubchannel) createSubchannel(0, Attributes.EMPTY);
     assertThat(subchannel).isSameAs(subchannels[0]);
     InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
 
@@ -423,6 +442,7 @@
     assertThat(healthImpl.calls).hasSize(1);
     assertThat(clock.getPendingTasks()).isEmpty();
 
+    subchannel.logs.clear();
     // Server closes the health checking RPC without any response
     healthImpl.calls.poll().responseObserver.onCompleted();
 
@@ -431,6 +451,9 @@
         same(subchannel),
         unavailableStateWithMsg(
             "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+    assertThat(subchannel.logs).containsExactly(
+        "INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.OK,
+        "DEBUG: Will retry health-check after 11 ns").inOrder();
 
     // Retry with backoff is scheduled
     inOrder.verify(backoffPolicyProvider).get();
@@ -440,6 +463,7 @@
     verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11);
     assertThat(clock.getPendingTasks()).isEmpty();
     
+    subchannel.logs.clear();
     // Server closes the health checking RPC without any response
     healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException());
 
@@ -449,6 +473,9 @@
         unavailableStateWithMsg(
             "Health-check stream unexpectedly closed with "
             + Status.CANCELLED + " for 'TeeService'"));
+    assertThat(subchannel.logs).containsExactly(
+        "INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.CANCELLED,
+        "DEBUG: Will retry health-check after 21 ns").inOrder();
 
     // Retry with backoff
     inOrder.verify(backoffPolicy1).nextBackoffNanos();
@@ -1029,6 +1056,18 @@
     final List<EquivalentAddressGroup> eagList;
     final Attributes attrs;
     final Channel channel;
+    final ArrayList<String> logs = new ArrayList<String>();
+    private final ChannelLogger logger = new ChannelLogger() {
+        @Override
+        public void log(ChannelLogLevel level, String msg) {
+          logs.add(level + ": " + msg);
+        }
+
+        @Override
+        public void log(ChannelLogLevel level, String template, Object... args) {
+          log(level, MessageFormat.format(template, args));
+        }
+      };
 
     FakeSubchannel(List<EquivalentAddressGroup> eagList, Attributes attrs, Channel channel) {
       this.eagList = Collections.unmodifiableList(eagList);
@@ -1060,6 +1099,11 @@
     public Channel asChannel() {
       return channel;
     }
+
+    @Override
+    public ChannelLogger getChannelLogger() {
+      return logger;
+    }
   }
 
   private class FakeHelper extends Helper {
@@ -1073,7 +1117,7 @@
         }
       }
       checkState(index >= 0, "addrs " + addrs + " not found");
-      Subchannel subchannel = new FakeSubchannel(addrs, attrs, channels[index]);
+      FakeSubchannel subchannel = new FakeSubchannel(addrs, attrs, channels[index]);
       checkState(subchannels[index] == null, "subchannels[" + index + "] already created");
       subchannels[index] = subchannel;
       return subchannel;