testing: create test client for XDS federation integration tests (#9798)
This client can be used as a part of XDS federation integration tests. It can concurrently perform RPCs with different stubs using different underlying XDS servers.
For example, one might perform proxyless service mesh and DirectPath RPCs in the same process with flags like:
```
--server_uris=xds:///${PSM_TARGET},google-c2p:///${DIRECTPATH_TARGET} \
--credentials_types=INSECURE_CREDENTIALS,compute_engine_channel_creds \
--test_case=rpc_soak \
--soak_iterations=10 \
--soak_max_failures=0 \
--soak_per_iteration_max_acceptable_latency_ms=2500 \
--soak_overall_timeout_seconds=300
```
diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle
index 5fdb306..f8c5561 100644
--- a/interop-testing/build.gradle
+++ b/interop-testing/build.gradle
@@ -194,6 +194,13 @@
classpath = startScriptsClasspath.get()
}
+def xds_federation_test_client = tasks.register("xds_federation_test_client", CreateStartScripts) {
+ mainClass = "io.grpc.testing.integration.XdsFederationTestClient"
+ applicationName = "xds-federation-test-client"
+ outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
+ classpath = startScriptsClasspath.get()
+}
+
distributions.shadow.contents.into("bin") {
from(test_client)
from(test_server)
@@ -204,6 +211,7 @@
from(grpclb_fallback_test_client)
from(xds_test_client)
from(xds_test_server)
+ from(xds_federation_test_client)
fileMode = 0755
}
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
index b7298ce..6a272d3 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
@@ -2025,6 +2025,7 @@
* and channel creation behavior.
*/
public void performSoakTest(
+ String serverUri,
boolean resetChannelPerIteration,
int soakIterations,
int maxFailures,
@@ -2057,21 +2058,22 @@
SoakIterationResult result = performOneSoakIteration(soakStub);
SocketAddress peer = clientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
- System.err.print(
+ StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
- "soak iteration: %d elapsed_ms: %d peer: %s",
- i, result.getLatencyMs(), peer != null ? peer.toString() : "null"));
+ "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
+ i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
totalFailures++;
- System.err.println(String.format(" failed: %s", result.getStatus()));
+ logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
totalFailures++;
- System.err.println(
+ logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
- System.err.println(" succeeded");
+ logStr.append(" succeeded");
}
+ System.err.println(logStr.toString());
iterationsDone++;
latencies.recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
@@ -2084,20 +2086,12 @@
System.err.println(
String.format(
Locale.US,
- "soak test ran: %d / %d iterations\n"
- + "total failures: %d\n"
- + "max failures threshold: %d\n"
- + "max acceptable per iteration latency ms: %d\n"
- + " p50 soak iteration latency: %d ms\n"
- + " p90 soak iteration latency: %d ms\n"
- + "p100 soak iteration latency: %d ms\n"
- + "See breakdown above for which iterations succeeded, failed, and "
- + "why for more info.",
+ "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. "
+ + "p50: %d ms, p90: %d ms, p100: %d ms",
+ serverUri,
iterationsDone,
soakIterations,
totalFailures,
- maxFailures,
- maxAcceptablePerIterationLatencyMs,
latencies.getValueAtPercentile(50),
latencies.getValueAtPercentile(90),
latencies.getValueAtPercentile(100)));
@@ -2105,8 +2099,9 @@
String timeoutErrorMessage =
String.format(
Locale.US,
- "soak test consumed all %d seconds of time and quit early, only "
- + "having ran %d out of desired %d iterations.",
+ "(server_uri: %s) soak test consumed all %d seconds of time and quit early, "
+ + "only having ran %d out of desired %d iterations.",
+ serverUri,
overallTimeoutSeconds,
iterationsDone,
soakIterations);
@@ -2115,8 +2110,9 @@
String tooManyFailuresErrorMessage =
String.format(
Locale.US,
- "soak test total failures: %d exceeds max failures threshold: %d.",
- totalFailures, maxFailures);
+ "(server_uri: %s) soak test total failures: %d exceeds max failures "
+ + "threshold: %d.",
+ serverUri, totalFailures, maxFailures);
assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
}
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java
index 049cee5..4e3cb90 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java
@@ -461,6 +461,7 @@
case RPC_SOAK: {
tester.performSoakTest(
+ serverHost,
false /* resetChannelPerIteration */,
soakIterations,
soakMaxFailures,
@@ -472,6 +473,7 @@
case CHANNEL_SOAK: {
tester.performSoakTest(
+ serverHost,
true /* resetChannelPerIteration */,
soakIterations,
soakMaxFailures,
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java
new file mode 100644
index 0000000..a15a59e
--- /dev/null
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.testing.integration;
+
+import static org.junit.Assert.assertTrue;
+
+import io.grpc.ChannelCredentials;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.alts.ComputeEngineChannelCredentials;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * Test client that can be used to verify that XDS federation works. A list of
+ * server URIs (which can each be load balanced by different XDS servers), can
+ * be configured via flags. A separate thread is created for each of these clients
+ * and the configured test (either rpc_soak or channel_soak) is ran for each client
+ * on each thread.
+ */
+public final class XdsFederationTestClient {
+ private static final Logger logger =
+ Logger.getLogger(XdsFederationTestClient.class.getName());
+
+ /**
+ * Entry point.
+ */
+ public static void main(String[] args) throws Exception {
+ final XdsFederationTestClient client = new XdsFederationTestClient();
+ client.parseArgs(args);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ @SuppressWarnings("CatchAndPrintStackTrace")
+ public void run() {
+ System.out.println("Shutting down");
+ try {
+ client.tearDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ client.setUp();
+ try {
+ client.run();
+ } finally {
+ client.tearDown();
+ }
+ System.exit(0);
+ }
+
+ private String serverUris = "";
+ private String credentialsTypes = "";
+ private int soakIterations = 10;
+ private int soakMaxFailures = 0;
+ private int soakPerIterationMaxAcceptableLatencyMs = 1000;
+ private int soakOverallTimeoutSeconds = 10;
+ private int soakMinTimeMsBetweenRpcs = 0;
+ private String testCase = "rpc_soak";
+ private ArrayList<InnerClient> clients = new ArrayList<InnerClient>();
+
+ private void parseArgs(String[] args) {
+ boolean usage = false;
+ for (String arg : args) {
+ if (!arg.startsWith("--")) {
+ System.err.println("All arguments must start with '--': " + arg);
+ usage = true;
+ break;
+ }
+ String[] parts = arg.substring(2).split("=", 2);
+ String key = parts[0];
+ if ("help".equals(key)) {
+ usage = true;
+ break;
+ }
+ if (parts.length != 2) {
+ System.err.println("All arguments must be of the form --arg=value");
+ usage = true;
+ break;
+ }
+ String value = parts[1];
+ if ("server_uris".equals(key)) {
+ serverUris = value;
+ } else if ("credentials_types".equals(key)) {
+ credentialsTypes = value;
+ } else if ("test_case".equals(key)) {
+ testCase = value;
+ } else if ("soak_iterations".equals(key)) {
+ soakIterations = Integer.valueOf(value);
+ } else if ("soak_max_failures".equals(key)) {
+ soakMaxFailures = Integer.valueOf(value);
+ } else if ("soak_per_iteration_max_acceptable_latency_ms".equals(key)) {
+ soakPerIterationMaxAcceptableLatencyMs = Integer.valueOf(value);
+ } else if ("soak_overall_timeout_seconds".equals(key)) {
+ soakOverallTimeoutSeconds = Integer.valueOf(value);
+ } else if ("soak_min_time_ms_between_rpcs".equals(key)) {
+ soakMinTimeMsBetweenRpcs = Integer.valueOf(value);
+ } else {
+ System.err.println("Unknown argument: " + key);
+ usage = true;
+ break;
+ }
+ }
+ if (usage) {
+ XdsFederationTestClient c = new XdsFederationTestClient();
+ System.out.println(
+ "Usage: [ARGS...]"
+ + "\n"
+ + "\n --server_uris Comma separated list of server "
+ + "URIs to make RPCs to. Default: "
+ + c.serverUris
+ + "\n --credentials_types Comma-separated list of "
+ + "\n credentials, each entry is used "
+ + "\n for the server of the "
+ + "\n corresponding index in server_uris. "
+ + "\n Supported values: "
+ + "compute_engine_channel_creds,INSECURE_CREDENTIALS. Default: "
+ + c.credentialsTypes
+ + "\n --soak_iterations The number of iterations to use "
+ + "\n for the two tests: rpc_soak and "
+ + "\n channel_soak. Default: "
+ + c.soakIterations
+ + "\n --soak_max_failures The number of iterations in soak "
+ + "\n tests that are allowed to fail "
+ + "\n (either due to non-OK status code "
+ + "\n or exceeding the per-iteration max "
+ + "\n acceptable latency). Default: "
+ + c.soakMaxFailures
+ + "\n --soak_per_iteration_max_acceptable_latency_ms"
+ + "\n The number of milliseconds a "
+ + "\n single iteration in the two soak "
+ + "\n tests (rpc_soak and channel_soak) "
+ + "\n should take. Default: "
+ + c.soakPerIterationMaxAcceptableLatencyMs
+ + "\n --soak_overall_timeout_seconds The overall number of seconds "
+ + "\n after which a soak test should "
+ + "\n stop and fail, if the desired "
+ + "\n number of iterations have not yet "
+ + "\n completed. Default: "
+ + c.soakOverallTimeoutSeconds
+ + "\n --soak_min_time_ms_between_rpcs The minimum time in milliseconds "
+ + "\n between consecutive RPCs in a soak "
+ + "\n test (rpc_soak or channel_soak), "
+ + "\n useful for limiting QPS. Default: "
+ + c.soakMinTimeMsBetweenRpcs
+ + "\n --test_case=TEST_CASE Test case to run. Valid options are:"
+ + "\n rpc_soak: sends --soak_iterations large_unary RPCs"
+ + "\n channel_soak: sends --soak_iterations RPCs, rebuilding the channel "
+ + "each time."
+ + "\n Default: " + c.testCase
+ );
+ System.exit(1);
+ }
+ }
+
+ void setUp() {
+ String[] uris = serverUris.split(",", -1);
+ String[] creds = credentialsTypes.split(",", -1);
+ if (uris.length == 0) {
+ throw new IllegalArgumentException("--server_uris is empty");
+ }
+ if (uris.length != creds.length) {
+ throw new IllegalArgumentException("Number of entries in --server_uris "
+ + "does not match number of entries in --credentials_types");
+ }
+ for (int i = 0; i < uris.length; i++) {
+ clients.add(new InnerClient(creds[i], uris[i]));
+ }
+ for (InnerClient c : clients) {
+ c.setUp();
+ }
+ }
+
+ private synchronized void tearDown() {
+ try {
+ for (InnerClient c : clients) {
+ c.tearDown();
+ }
+ } catch (RuntimeException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Wraps a single client stub configuration and executes a
+ * soak test case with that configuration.
+ */
+ class InnerClient extends AbstractInteropTest {
+ private String credentialsType;
+ private String serverUri;
+ private boolean runSucceeded = false;
+
+ public InnerClient(String credentialsType, String serverUri) {
+ this.credentialsType = credentialsType;
+ this.serverUri = serverUri;
+ }
+
+ /**
+ * Indicates whether run succeeded or not. This must only be called
+ * after run() has finished.
+ */
+ public boolean runSucceeded() {
+ return runSucceeded;
+ }
+
+ /**
+ * Run the intended soak test.
+ */
+ public void run() {
+ boolean resetChannelPerIteration;
+ if (testCase.equals("rpc_soak")) {
+ resetChannelPerIteration = false;
+ } else if (testCase.equals("channel_soak")) {
+ resetChannelPerIteration = true;
+ } else {
+ throw new RuntimeException("invalid testcase: " + testCase);
+ }
+ try {
+ performSoakTest(
+ serverUri,
+ resetChannelPerIteration,
+ soakIterations,
+ soakMaxFailures,
+ soakPerIterationMaxAcceptableLatencyMs,
+ soakMinTimeMsBetweenRpcs,
+ soakOverallTimeoutSeconds);
+ logger.info("Test case: " + testCase + " done for server: " + serverUri);
+ runSucceeded = true;
+ } catch (Exception e) {
+ logger.info("Test case: " + testCase + " failed for server: " + serverUri);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected ManagedChannelBuilder<?> createChannelBuilder() {
+ ChannelCredentials channelCredentials;
+ if (credentialsType.equals("compute_engine_channel_creds")) {
+ channelCredentials = ComputeEngineChannelCredentials.create();
+ } else if (credentialsType.equals("INSECURE_CREDENTIALS")) {
+ channelCredentials = InsecureChannelCredentials.create();
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown custom credentials: " + credentialsType);
+ }
+ return NettyChannelBuilder.forTarget(serverUri, channelCredentials)
+ .keepAliveTime(3600, TimeUnit.SECONDS)
+ .keepAliveTimeout(20, TimeUnit.SECONDS);
+ }
+ }
+
+ private void run() throws Exception {
+ logger.info("Begin test case: " + testCase);
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+ for (InnerClient c : clients) {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ c.run();
+ }
+ });
+ t.start();
+ threads.add(t);
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ for (InnerClient c : clients) {
+ assertTrue(c.runSucceeded());
+ }
+ logger.info("Test case: " + testCase + " done for all clients!");
+ }
+}