services: HealthCheckingLoadBalancer logs to ChannelLogger (#5042)
Log the event that health check is disabled due to UNIMPLEMENTED as required in the spec:
https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md
Also log every Subchannel state change that is affected by health-checking, i.e., the state changes when the raw Subchannel state is READY and health-check is running.
Tracking issue: #4932
diff --git a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
index e5f15d0..b2fc2af 100644
--- a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
+++ b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
@@ -27,6 +27,8 @@
import com.google.common.base.Objects;
import io.grpc.Attributes;
import io.grpc.CallOptions;
+import io.grpc.ChannelLogger;
+import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ClientCall;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
@@ -202,6 +204,7 @@
private final ScheduledExecutorService timerService;
private Subchannel subchannel;
+ private ChannelLogger subchannelLogger;
// Set when RPC started. Cleared when the RPC has closed or abandoned.
@Nullable
@@ -232,6 +235,7 @@
void init(Subchannel subchannel) {
checkState(this.subchannel == null, "init() already called");
this.subchannel = checkNotNull(subchannel, "subchannel");
+ this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
}
void setServiceName(@Nullable String newServiceName) {
@@ -254,7 +258,6 @@
// A connection was lost. We will reset disabled flag because health check
// may be available on the new connection.
disabled = false;
- // TODO(zhangkun83): record this to channel tracer
}
this.rawState = rawState;
adjustHealthCheck();
@@ -282,6 +285,7 @@
}
private void startRpc() {
+ checkState(serviceName != null, "serviceName is null");
checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up");
checkState(subchannel != null, "init() not called");
// Optimization suggested by @markroth: if we are already READY and starting the health
@@ -289,6 +293,8 @@
// name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs
// waiting for the health check to respond.
if (!Objects.equal(concludedState.getState(), READY)) {
+ subchannelLogger.log(
+ ChannelLogLevel.INFO, "CONNECTING: Starting health-check for \"{0}\"", serviceName);
gotoState(ConnectivityStateInfo.forNonError(CONNECTING));
}
activeRpc = new HcStream();
@@ -378,27 +384,34 @@
void handleResponse(HealthCheckResponse response) {
callHasResponded = true;
backoffPolicy = null;
+ ServingStatus status = response.getStatus();
// running == true means the Subchannel's state (rawState) is READY
- if (Objects.equal(response.getStatus(), ServingStatus.SERVING)) {
+ if (Objects.equal(status, ServingStatus.SERVING)) {
+ subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
gotoState(ConnectivityStateInfo.forNonError(READY));
} else {
+ subchannelLogger.log(
+ ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
"Health-check service responded "
- + response.getStatus() + " for '" + callServiceName + "'")));
+ + status + " for '" + callServiceName + "'")));
}
call.request(1);
}
void handleStreamClosed(Status status) {
if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
- // TODO(zhangkun83): record this to channel tracer
disabled = true;
+ subchannelLogger.log(ChannelLogLevel.ERROR, "Health-check disabled: {0}", status);
+ subchannelLogger.log(ChannelLogLevel.INFO, "{0} (no health-check)", rawState);
gotoState(rawState);
return;
}
long delayNanos = 0;
+ subchannelLogger.log(
+ ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check stream closed with {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
@@ -416,6 +429,8 @@
startRpc();
} else {
checkState(!isRetryTimerPending(), "Retry double scheduled");
+ subchannelLogger.log(
+ ChannelLogLevel.DEBUG, "Will retry health-check after {0} ns", delayNanos);
retryTimer = syncContext.schedule(
retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService);
}
diff --git a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
index d347807..e1d4fd4 100644
--- a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
+++ b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
@@ -41,6 +41,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.Channel;
+import io.grpc.ChannelLogger;
+import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
@@ -69,6 +71,8 @@
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.stub.StreamObserver;
import java.net.SocketAddress;
+import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
@@ -105,7 +109,7 @@
@SuppressWarnings({"rawtypes", "unchecked"})
private final List<EquivalentAddressGroup>[] eagLists = new List[NUM_SUBCHANNELS];
private List<EquivalentAddressGroup> resolvedAddressList;
- private final Subchannel[] subchannels = new Subchannel[NUM_SUBCHANNELS];
+ private final FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS];
private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS];
private final Server[] servers = new Server[NUM_SUBCHANNELS];
private final HealthImpl[] healthImpls = new HealthImpl[NUM_SUBCHANNELS];
@@ -268,7 +272,7 @@
for (int i = NUM_SUBCHANNELS - 1; i >= 0; i--) {
// Not starting health check until underlying Subchannel is READY
- Subchannel subchannel = subchannels[i];
+ FakeSubchannel subchannel = subchannels[i];
HealthImpl healthImpl = healthImpls[i];
InOrder inOrder = inOrder(origLb);
hcLbEventDelivery.handleSubchannelState(
@@ -286,6 +290,7 @@
same(subchannel), eq(ConnectivityStateInfo.forNonError(IDLE)));
verifyNoMoreInteractions(origLb);
+ assertThat(subchannel.logs).isEmpty();
assertThat(healthImpl.calls).isEmpty();
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
assertThat(healthImpl.calls).hasSize(1);
@@ -297,6 +302,10 @@
same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
verifyNoMoreInteractions(origLb);
+ assertThat(subchannel.logs).containsExactly(
+ "INFO: CONNECTING: Starting health-check for \"FooService\"");
+ subchannel.logs.clear();
+
// Simulate a series of responses.
for (ServingStatus servingStatus :
new ServingStatus[] {
@@ -307,18 +316,23 @@
if (servingStatus == ServingStatus.SERVING) {
inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+ assertThat(subchannel.logs).containsExactly(
+ "INFO: READY: health-check responded SERVING");
} else {
inOrder.verify(origLb).handleSubchannelState(
same(subchannel),unavailableStateWithMsg(
"Health-check service responded " + servingStatus + " for 'FooService'"));
+ assertThat(subchannel.logs).containsExactly(
+ "INFO: TRANSIENT_FAILURE: health-check responded " + servingStatus);
}
+ subchannel.logs.clear();
verifyNoMoreInteractions(origLb);
}
}
// origLb shuts down Subchannels
for (int i = 0; i < NUM_SUBCHANNELS; i++) {
- Subchannel subchannel = subchannels[i];
+ FakeSubchannel subchannel = subchannels[i];
ServerSideCall serverCall = healthImpls[i].calls.peek();
assertThat(serverCall.cancelled).isFalse();
@@ -330,6 +344,7 @@
assertThat(serverCall.cancelled).isTrue();
verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(SHUTDOWN)));
+ assertThat(subchannel.logs).isEmpty();
}
for (int i = 0; i < NUM_SUBCHANNELS; i++) {
@@ -365,6 +380,7 @@
ServerSideCall serverCall0 = healthImpls[0].calls.poll();
ServerSideCall serverCall1 = healthImpls[1].calls.poll();
+ subchannels[0].logs.clear();
// subchannels[0] gets UNIMPLEMENTED for health checking, which will disable health
// checking and it'll use the original state, which is currently READY.
// In reality UNIMPLEMENTED is generated by GRPC server library, but the client can't tell
@@ -372,6 +388,9 @@
serverCall0.responseObserver.onError(Status.UNIMPLEMENTED.asException());
inOrder.verify(origLb).handleSubchannelState(
same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
+ assertThat(subchannels[0].logs).containsExactly(
+ "ERROR: Health-check disabled: " + Status.UNIMPLEMENTED,
+ "INFO: READY (no health-check)").inOrder();
// subchannels[1] has normal health checking
serverCall1.responseObserver.onNext(makeResponse(ServingStatus.NOT_SERVING));
@@ -412,7 +431,7 @@
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb);
- Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ FakeSubchannel subchannel = (FakeSubchannel) createSubchannel(0, Attributes.EMPTY);
assertThat(subchannel).isSameAs(subchannels[0]);
InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
@@ -423,6 +442,7 @@
assertThat(healthImpl.calls).hasSize(1);
assertThat(clock.getPendingTasks()).isEmpty();
+ subchannel.logs.clear();
// Server closes the health checking RPC without any response
healthImpl.calls.poll().responseObserver.onCompleted();
@@ -431,6 +451,9 @@
same(subchannel),
unavailableStateWithMsg(
"Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+ assertThat(subchannel.logs).containsExactly(
+ "INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.OK,
+ "DEBUG: Will retry health-check after 11 ns").inOrder();
// Retry with backoff is scheduled
inOrder.verify(backoffPolicyProvider).get();
@@ -440,6 +463,7 @@
verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11);
assertThat(clock.getPendingTasks()).isEmpty();
+ subchannel.logs.clear();
// Server closes the health checking RPC without any response
healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException());
@@ -449,6 +473,9 @@
unavailableStateWithMsg(
"Health-check stream unexpectedly closed with "
+ Status.CANCELLED + " for 'TeeService'"));
+ assertThat(subchannel.logs).containsExactly(
+ "INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.CANCELLED,
+ "DEBUG: Will retry health-check after 21 ns").inOrder();
// Retry with backoff
inOrder.verify(backoffPolicy1).nextBackoffNanos();
@@ -1029,6 +1056,18 @@
final List<EquivalentAddressGroup> eagList;
final Attributes attrs;
final Channel channel;
+ final ArrayList<String> logs = new ArrayList<String>();
+ private final ChannelLogger logger = new ChannelLogger() {
+ @Override
+ public void log(ChannelLogLevel level, String msg) {
+ logs.add(level + ": " + msg);
+ }
+
+ @Override
+ public void log(ChannelLogLevel level, String template, Object... args) {
+ log(level, MessageFormat.format(template, args));
+ }
+ };
FakeSubchannel(List<EquivalentAddressGroup> eagList, Attributes attrs, Channel channel) {
this.eagList = Collections.unmodifiableList(eagList);
@@ -1060,6 +1099,11 @@
public Channel asChannel() {
return channel;
}
+
+ @Override
+ public ChannelLogger getChannelLogger() {
+ return logger;
+ }
}
private class FakeHelper extends Helper {
@@ -1073,7 +1117,7 @@
}
}
checkState(index >= 0, "addrs " + addrs + " not found");
- Subchannel subchannel = new FakeSubchannel(addrs, attrs, channels[index]);
+ FakeSubchannel subchannel = new FakeSubchannel(addrs, attrs, channels[index]);
checkState(subchannels[index] == null, "subchannels[" + index + "] already created");
subchannels[index] = subchannel;
return subchannel;