Move multiple-port ServerImpl to NettyServer (#7674)
Change InternalServer to handle multiple addresses and implemented in NettyServer.
It makes ServerImpl to have a single transport server, and this single transport server (NettyServer) will bind to all listening addresses during bootstrap. (#7674)
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
index a422506..7922ebd 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
@@ -83,11 +83,21 @@
}
@Override
+ public List<? extends SocketAddress> getListenSocketAddresses() {
+ return Collections.singletonList(getListenSocketAddress());
+ }
+
+ @Override
public InternalInstrumented<SocketStats> getListenSocketStats() {
return null;
}
@Override
+ public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
+ return null;
+ }
+
+ @Override
public void shutdown() {
if (!registry.remove(name, this)) {
throw new AssertionError();
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
index f768c60..25291c2 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
@@ -33,7 +33,6 @@
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
import io.grpc.internal.SharedResourcePool;
import java.io.File;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
@@ -109,7 +108,7 @@
final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
- public List<? extends InternalServer> buildClientTransportServers(
+ public InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return buildTransportServers(streamTracerFactories);
}
@@ -187,9 +186,9 @@
return this;
}
- List<InProcessServer> buildTransportServers(
+ InProcessServer buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
- return Collections.singletonList(new InProcessServer(this, streamTracerFactories));
+ return new InProcessServer(this, streamTracerFactories);
}
@Override
diff --git a/core/src/main/java/io/grpc/internal/InternalServer.java b/core/src/main/java/io/grpc/internal/InternalServer.java
index 389b24b..0445ae3 100644
--- a/core/src/main/java/io/grpc/internal/InternalServer.java
+++ b/core/src/main/java/io/grpc/internal/InternalServer.java
@@ -20,12 +20,13 @@
import io.grpc.InternalInstrumented;
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
- * An object that accepts new incoming connections. This would commonly encapsulate a bound socket
- * that {@code accept()}s new connections.
+ * An object that accepts new incoming connections on one or more listening socket addresses.
+ * This would commonly encapsulate a bound socket that {@code accept()}s new connections.
*/
@ThreadSafe
public interface InternalServer {
@@ -49,13 +50,25 @@
void shutdown();
/**
- * Returns the listening socket address. May change after {@link start(ServerListener)} is
+ * Returns the first listening socket address. May change after {@link start(ServerListener)} is
* called.
*/
SocketAddress getListenSocketAddress();
/**
- * Returns the listen socket stats of this server. May return {@code null}.
+ * Returns the first listen socket stats of this server. May return {@code null}.
*/
@Nullable InternalInstrumented<SocketStats> getListenSocketStats();
+
+ /**
+ * Returns a list of listening socket addresses. May change after {@link start(ServerListener)}
+ * is called.
+ */
+ List<? extends SocketAddress> getListenSocketAddresses();
+
+ /**
+ * Returns a list of listen socket stats of this server. May return {@code null}.
+ */
+ @Nullable List<InternalInstrumented<SocketStats>> getListenSocketStatsList();
+
}
diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java
index a400fca..6c66aac 100644
--- a/core/src/main/java/io/grpc/internal/ServerImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerImpl.java
@@ -110,12 +110,11 @@
@GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
@GuardedBy("lock") private boolean terminated;
/** Service encapsulating something similar to an accept() socket. */
- private final List<? extends InternalServer> transportServers;
+ private final InternalServer transportServer;
private final Object lock = new Object();
@GuardedBy("lock") private boolean transportServersTerminated;
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
@GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
- @GuardedBy("lock") private int activeTransportServers;
private final Context rootContext;
@@ -131,20 +130,18 @@
* Construct a server.
*
* @param builder builder with configuration for server
- * @param transportServers transport servers that will create new incoming transports
+ * @param transportServer transport servers that will create new incoming transports
* @param rootContext context that callbacks for new RPCs should be derived from
*/
ServerImpl(
ServerImplBuilder builder,
- List<? extends InternalServer> transportServers,
+ InternalServer transportServer,
Context rootContext) {
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
this.fallbackRegistry =
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
- Preconditions.checkNotNull(transportServers, "transportServers");
- Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
- this.transportServers = new ArrayList<>(transportServers);
+ this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
this.logId =
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
// Fork from the passed in context so that it does not propagate cancellation, it only
@@ -179,10 +176,7 @@
// Start and wait for any ports to actually be bound.
ServerListenerImpl listener = new ServerListenerImpl();
- for (InternalServer ts : transportServers) {
- ts.start(listener);
- activeTransportServers++;
- }
+ transportServer.start(listener);
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
return this;
@@ -195,8 +189,7 @@
synchronized (lock) {
checkState(started, "Not started");
checkState(!terminated, "Already terminated");
- for (InternalServer ts : transportServers) {
- SocketAddress addr = ts.getListenSocketAddress();
+ for (SocketAddress addr: transportServer.getListenSocketAddresses()) {
if (addr instanceof InetSocketAddress) {
return ((InetSocketAddress) addr).getPort();
}
@@ -216,11 +209,7 @@
private List<SocketAddress> getListenSocketsIgnoringLifecycle() {
synchronized (lock) {
- List<SocketAddress> addrs = new ArrayList<>(transportServers.size());
- for (InternalServer ts : transportServers) {
- addrs.add(ts.getListenSocketAddress());
- }
- return Collections.unmodifiableList(addrs);
+ return Collections.unmodifiableList(transportServer.getListenSocketAddresses());
}
}
@@ -268,9 +257,7 @@
}
}
if (shutdownTransportServers) {
- for (InternalServer ts : transportServers) {
- ts.shutdown();
- }
+ transportServer.shutdown();
}
return this;
}
@@ -388,8 +375,7 @@
ArrayList<ServerTransport> copiedTransports;
Status shutdownNowStatusCopy;
synchronized (lock) {
- activeTransportServers--;
- if (activeTransportServers != 0) {
+ if (serverShutdownCallbackInvoked) {
return;
}
@@ -662,12 +648,9 @@
@Override
public ListenableFuture<ServerStats> getStats() {
ServerStats.Builder builder = new ServerStats.Builder();
- for (InternalServer ts : transportServers) {
- // TODO(carl-mastrangelo): remove the list and just add directly.
- InternalInstrumented<SocketStats> stats = ts.getListenSocketStats();
- if (stats != null ) {
- builder.addListenSockets(Collections.singletonList(stats));
- }
+ List<InternalInstrumented<SocketStats>> stats = transportServer.getListenSocketStatsList();
+ if (stats != null ) {
+ builder.addListenSockets(stats);
}
serverCallTracer.updateBuilder(builder);
SettableFuture<ServerStats> ret = SettableFuture.create();
@@ -679,7 +662,7 @@
public String toString() {
return MoreObjects.toStringHelper(this)
.add("logId", logId.getId())
- .add("transportServers", transportServers)
+ .add("transportServer", transportServer)
.toString();
}
diff --git a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java
index 9208394..04e6059 100644
--- a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java
@@ -97,7 +97,7 @@
* is meant for Transport implementors and should not be used by normal users.
*/
public interface ClientTransportServersBuilder {
- List<? extends InternalServer> buildClientTransportServers(
+ InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories);
}
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
index 9f56a35..6095e29 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java
@@ -22,7 +22,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
-import com.google.common.collect.Iterables;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.FakeClock;
import io.grpc.internal.ObjectPool;
@@ -55,8 +54,8 @@
@Test
public void scheduledExecutorService_default() {
InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
- InProcessServer server = Iterables.getOnlyElement(
- builder.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>()));
+ InProcessServer server =
+ builder.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>());
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
server.getScheduledExecutorServicePool();
@@ -80,8 +79,8 @@
InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
assertSame(builder, builder1);
- InProcessServer server = Iterables.getOnlyElement(
- builder1.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>()));
+ InProcessServer server =
+ builder1.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>());
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
server.getScheduledExecutorServicePool();
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
index f7e325a..a987937 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
@@ -19,7 +19,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import com.google.common.collect.ImmutableList;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
@@ -55,16 +54,16 @@
public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
InProcessServerBuilder builder = InProcessServerBuilder
.forName(TRANSPORT_NAME)
.maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
- return ImmutableList.of(new InProcessServer(builder, streamTracerFactories));
+ return new InProcessServer(builder, streamTracerFactories);
}
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return newServer(streamTracerFactories);
}
diff --git a/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java
index 6d8f3a1..152fdf2 100644
--- a/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java
+++ b/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java
@@ -16,7 +16,6 @@
package io.grpc.inprocess;
-import com.google.common.collect.ImmutableList;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.ServerStreamTracer;
@@ -31,6 +30,7 @@
import io.grpc.internal.SharedResourcePool;
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
@@ -52,13 +52,13 @@
private TestServer currentServer;
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
- return ImmutableList.of(new TestServer(streamTracerFactories));
+ return new TestServer(streamTracerFactories);
}
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return newServer(streamTracerFactories);
}
@@ -127,10 +127,21 @@
}
@Override
+ public List<SocketAddress> getListenSocketAddresses() {
+ return Collections.singletonList(getListenSocketAddress());
+ }
+
+ @Override
@Nullable
public InternalInstrumented<SocketStats> getListenSocketStats() {
return null;
}
+
+ @Override
+ @Nullable
+ public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
+ return null;
+ }
}
/** Wraps the server listener to ensure we don't accept new transports after shutdown. */
diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java
index 5c2b9c5..e19db3e 100644
--- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java
@@ -40,7 +40,6 @@
import static org.mockito.Mockito.verify;
import com.google.common.base.Objects;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors;
@@ -118,13 +117,13 @@
* Returns a new server that when started will be able to be connected to from the client. Each
* returned instance should be new and yet be accessible by new client transports.
*/
- protected abstract List<? extends InternalServer> newServer(
+ protected abstract InternalServer newServer(
List<ServerStreamTracer.Factory> streamTracerFactories);
/**
* Builds a new server that is listening on the same port as the given server instance does.
*/
- protected abstract List<? extends InternalServer> newServer(
+ protected abstract InternalServer newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories);
/**
@@ -230,7 +229,7 @@
@Before
public void setUp() {
- server = Iterables.getOnlyElement(newServer(Arrays.asList(serverStreamTracerFactory)));
+ server = newServer(Arrays.asList(serverStreamTracerFactory));
callOptions = CallOptions.DEFAULT.withStreamTracerFactory(clientStreamTracerFactory);
}
@@ -401,8 +400,7 @@
if (addr instanceof InetSocketAddress) {
port = ((InetSocketAddress) addr).getPort();
}
- InternalServer server2 =
- Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory)));
+ InternalServer server2 = newServer(port, Arrays.asList(serverStreamTracerFactory));
thrown.expect(IOException.class);
server2.start(new MockServerListener());
}
@@ -421,7 +419,7 @@
assumeTrue("transport is not using InetSocketAddress", port != -1);
server.shutdown();
- server = Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory)));
+ server = newServer(port, Arrays.asList(serverStreamTracerFactory));
boolean success;
Thread.currentThread().interrupt();
try {
@@ -473,7 +471,7 @@
// resources. There may be cases this is impossible in the future, but for now it is a useful
// property.
serverListener = new MockServerListener();
- server = Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory)));
+ server = newServer(port, Arrays.asList(serverStreamTracerFactory));
server.start(serverListener);
// Try to "flush" out any listener notifications on client and server. This also ensures that
diff --git a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java
index 1d6a2ec..ad8cf41 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java
@@ -46,7 +46,7 @@
builder = new ServerImplBuilder(
new ClientTransportServersBuilder() {
@Override
- public List<? extends InternalServer> buildClientTransportServers(
+ public InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
throw new UnsupportedOperationException();
}
diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java
index 5ca3685..5b5f538 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java
@@ -44,7 +44,6 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
@@ -90,7 +89,6 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -205,7 +203,7 @@
builder = new ServerImplBuilder(
new ClientTransportServersBuilder() {
@Override
- public List<? extends InternalServer> buildClientTransportServers(
+ public InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
throw new UnsupportedOperationException();
}
@@ -226,39 +224,19 @@
}
@Test
- public void multiport() throws Exception {
- final CountDownLatch starts = new CountDownLatch(2);
- final CountDownLatch shutdowns = new CountDownLatch(2);
-
- final class Serv extends SimpleServer {
+ public void getListenSockets() throws Exception {
+ int port = 800;
+ final List<InetSocketAddress> addresses =
+ Collections.singletonList(new InetSocketAddress(800));
+ transportServer = new SimpleServer() {
@Override
- public void start(ServerListener listener) throws IOException {
- super.start(listener);
- starts.countDown();
+ public List<InetSocketAddress> getListenSocketAddresses() {
+ return addresses;
}
-
- @Override
- public void shutdown() {
- super.shutdown();
- shutdowns.countDown();
- }
- }
-
- SimpleServer transportServer1 = new Serv();
- SimpleServer transportServer2 = new Serv();
- assertNull(server);
- builder.fallbackHandlerRegistry(fallbackRegistry);
- builder.executorPool = executorPool;
- server = new ServerImpl(
- builder, ImmutableList.of(transportServer1, transportServer2), SERVER_CONTEXT);
-
- server.start();
- assertTrue(starts.await(1, TimeUnit.SECONDS));
- assertEquals(2, shutdowns.getCount());
-
- server.shutdown();
- assertTrue(shutdowns.await(1, TimeUnit.SECONDS));
- assertTrue(server.awaitTermination(1, TimeUnit.SECONDS));
+ };
+ createAndStartServer();
+ assertEquals(port, server.getPort());
+ assertThat(server.getListenSockets()).isEqualTo(addresses);
}
@Test
@@ -1131,15 +1109,22 @@
@Test
public void getPort() throws Exception {
final InetSocketAddress addr = new InetSocketAddress(65535);
+ final List<InetSocketAddress> addrs = Collections.singletonList(addr);
transportServer = new SimpleServer() {
@Override
- public SocketAddress getListenSocketAddress() {
+ public InetSocketAddress getListenSocketAddress() {
return addr;
}
+
+ @Override
+ public List<InetSocketAddress> getListenSocketAddresses() {
+ return addrs;
+ }
};
createAndStartServer();
assertThat(server.getPort()).isEqualTo(addr.getPort());
+ assertThat(server.getListenSockets()).isEqualTo(addrs);
}
@Test
@@ -1431,7 +1416,7 @@
builder.fallbackHandlerRegistry(fallbackRegistry);
builder.executorPool = executorPool;
- server = new ServerImpl(builder, Collections.singletonList(transportServer), SERVER_CONTEXT);
+ server = new ServerImpl(builder, transportServer, SERVER_CONTEXT);
}
private void verifyExecutorsAcquired() {
@@ -1470,11 +1455,21 @@
}
@Override
+ public List<InetSocketAddress> getListenSocketAddresses() {
+ return Collections.singletonList(new InetSocketAddress(12345));
+ }
+
+ @Override
public InternalInstrumented<SocketStats> getListenSocketStats() {
return null;
}
@Override
+ public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
+ return null;
+ }
+
+ @Override
public void shutdown() {
listener.serverShutdown();
}
diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java
index 915de27..876c55e 100644
--- a/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java
@@ -30,7 +30,7 @@
*/
@Internal
public final class InternalNettyServerBuilder {
- public static List<NettyServer> buildTransportServers(NettyServerBuilder builder,
+ public static NettyServer buildTransportServers(NettyServerBuilder builder,
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return builder.buildTransportServers(streamTracerFactories);
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index 5962207..4c16ac5 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -45,17 +45,26 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.ChannelGroupFutureListener;
+import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -66,7 +75,7 @@
private static final Logger log = Logger.getLogger(InternalServer.class.getName());
private final InternalLogId logId;
- private final SocketAddress address;
+ private final List<? extends SocketAddress> addresses;
private final ChannelFactory<? extends ServerChannel> channelFactory;
private final Map<ChannelOption<?>, ?> channelOptions;
private final Map<ChannelOption<?>, ?> childChannelOptions;
@@ -78,7 +87,7 @@
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerListener listener;
- private Channel channel;
+ private final ChannelGroup channelGroup;
private final boolean autoFlowControl;
private final int flowControlWindow;
private final int maxMessageSize;
@@ -96,11 +105,14 @@
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
private final InternalChannelz channelz;
- // Only modified in event loop but safe to read any time.
- private volatile InternalInstrumented<SocketStats> listenSocketStats;
+ private volatile List<InternalInstrumented<SocketStats>> listenSocketStatsList =
+ Collections.emptyList();
+ private volatile boolean terminated;
+ private final EventLoop bossExecutor;
NettyServer(
- SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory,
+ List<? extends SocketAddress> addresses,
+ ChannelFactory<? extends ServerChannel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions,
Map<ChannelOption<?>, ?> childChannelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
@@ -116,7 +128,7 @@
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
Attributes eagAttributes, InternalChannelz channelz) {
- this.address = address;
+ this.addresses = checkNotNull(addresses, "addresses");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
@@ -126,6 +138,8 @@
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.forceHeapBuffer = forceHeapBuffer;
this.bossGroup = bossGroupPool.getObject();
+ this.bossExecutor = bossGroup.next();
+ this.channelGroup = new DefaultChannelGroup(this.bossExecutor);
this.workerGroup = workerGroupPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
@@ -144,32 +158,53 @@
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
this.channelz = Preconditions.checkNotNull(channelz);
- this.logId =
- InternalLogId.allocate(getClass(), address != null ? address.toString() : "No address");
+ this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" :
+ String.valueOf(addresses));
}
@Override
public SocketAddress getListenSocketAddress() {
- if (channel == null) {
+ Iterator<Channel> it = channelGroup.iterator();
+ if (it.hasNext()) {
+ return it.next().localAddress();
+ } else {
// server is not listening/bound yet, just return the original port.
- return address;
+ return addresses.isEmpty() ? null : addresses.get(0);
}
- return channel.localAddress();
+ }
+
+ @Override
+ public List<SocketAddress> getListenSocketAddresses() {
+ List<SocketAddress> listenSocketAddresses = new ArrayList<>();
+ for (Channel c: channelGroup) {
+ listenSocketAddresses.add(c.localAddress());
+ }
+ // server is not listening/bound yet, just return the original ports.
+ if (listenSocketAddresses.isEmpty()) {
+ listenSocketAddresses.addAll(addresses);
+ }
+ return listenSocketAddresses;
}
@Override
public InternalInstrumented<SocketStats> getListenSocketStats() {
- return listenSocketStats;
+ List<InternalInstrumented<SocketStats>> savedListenSocketStatsList = listenSocketStatsList;
+ return savedListenSocketStatsList.isEmpty() ? null : savedListenSocketStatsList.get(0);
+ }
+
+ @Override
+ public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
+ return listenSocketStatsList;
}
@Override
public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");
- ServerBootstrap b = new ServerBootstrap();
+ final ServerBootstrap b = new ServerBootstrap();
b.option(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
b.childOption(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
- b.group(bossGroup, workerGroup);
+ b.group(bossExecutor, workerGroup);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
b.childOption(SO_KEEPALIVE, true);
@@ -226,8 +261,8 @@
ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) {
- if (channel != null && !channel.isOpen()) {
- // Server already shutdown.
+ if (terminated) {
+ // Server already terminated.
ch.close();
return;
}
@@ -258,51 +293,77 @@
ch.closeFuture().addListener(loopReleaser);
}
});
- // Bind and start to accept incoming connections.
- ChannelFuture future = b.bind(address);
- // We'd love to observe interruption, but if interrupted we will need to close the channel,
- // which itself would need an await() to guarantee the port is not used when the method returns.
- // See #6850
- future.awaitUninterruptibly();
- if (!future.isSuccess()) {
- throw new IOException(String.format("Failed to bind to address %s", address), future.cause());
+ Future<Map<ChannelFuture, SocketAddress>> bindCallFuture =
+ bossExecutor.submit(
+ new Callable<Map<ChannelFuture, SocketAddress>>() {
+ @Override
+ public Map<ChannelFuture, SocketAddress> call() {
+ Map<ChannelFuture, SocketAddress> bindFutures = new HashMap<>();
+ for (SocketAddress address: addresses) {
+ ChannelFuture future = b.bind(address);
+ channelGroup.add(future.channel());
+ bindFutures.put(future, address);
+ }
+ return bindFutures;
+ }
+ }
+ );
+ Map<ChannelFuture, SocketAddress> channelFutures =
+ bindCallFuture.awaitUninterruptibly().getNow();
+
+ if (!bindCallFuture.isSuccess()) {
+ channelGroup.close().awaitUninterruptibly();
+ throw new IOException(String.format("Failed to bind to addresses %s",
+ addresses), bindCallFuture.cause());
}
- channel = future.channel();
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- listenSocketStats = new ListenSocket(channel);
- channelz.addListenSocket(listenSocketStats);
+ final List<InternalInstrumented<SocketStats>> socketStats = new ArrayList<>();
+ for (Map.Entry<ChannelFuture, SocketAddress> entry: channelFutures.entrySet()) {
+ // We'd love to observe interruption, but if interrupted we will need to close the channel,
+ // which itself would need an await() to guarantee the port is not used when the method
+ // returns. See #6850
+ final ChannelFuture future = entry.getKey();
+ if (!future.awaitUninterruptibly().isSuccess()) {
+ channelGroup.close().awaitUninterruptibly();
+ throw new IOException(String.format("Failed to bind to address %s",
+ entry.getValue()), future.cause());
}
- });
+ final InternalInstrumented<SocketStats> listenSocketStats =
+ new ListenSocket(future.channel());
+ channelz.addListenSocket(listenSocketStats);
+ socketStats.add(listenSocketStats);
+ future.channel().closeFuture().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ channelz.removeListenSocket(listenSocketStats);
+ }
+ });
+ }
+ listenSocketStatsList = Collections.unmodifiableList(socketStats);
}
@Override
public void shutdown() {
- if (channel == null || !channel.isOpen()) {
- // Already closed.
+ if (terminated) {
return;
}
- channel.close().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- log.log(Level.WARNING, "Error shutting down server", future.cause());
- }
- InternalInstrumented<SocketStats> stats = listenSocketStats;
- listenSocketStats = null;
- if (stats != null) {
- channelz.removeListenSocket(stats);
- }
- sharedResourceReferenceCounter.release();
- protocolNegotiator.close();
- synchronized (NettyServer.this) {
- listener.serverShutdown();
- }
- }
- });
+ ChannelGroupFuture groupFuture = channelGroup.close()
+ .addListener(new ChannelGroupFutureListener() {
+ @Override
+ public void operationComplete(ChannelGroupFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ log.log(Level.WARNING, "Error closing server channel group", future.cause());
+ }
+ sharedResourceReferenceCounter.release();
+ protocolNegotiator.close();
+ listenSocketStatsList = Collections.emptyList();
+ synchronized (NettyServer.this) {
+ listener.serverShutdown();
+ terminated = true;
+ }
+ }
+ });
try {
- channel.closeFuture().await();
+ groupFuture.await();
} catch (InterruptedException e) {
log.log(Level.FINE, "Interrupted while shutting down", e);
Thread.currentThread().interrupt();
@@ -318,7 +379,7 @@
public String toString() {
return MoreObjects.toStringHelper(this)
.add("logId", logId.getId())
- .add("address", address)
+ .add("addresses", addresses)
.toString();
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index f243c10..9b67cd2 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -54,7 +54,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -165,7 +164,7 @@
private final class NettyClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
- public List<? extends InternalServer> buildClientTransportServers(
+ public InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return buildTransportServers(streamTracerFactories);
}
@@ -623,27 +622,22 @@
}
@CheckReturnValue
- List<NettyServer> buildTransportServers(
+ NettyServer buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
assertEventLoopsAndChannelType();
ProtocolNegotiator negotiator = protocolNegotiatorFactory.newNegotiator(
this.serverImplBuilder.getExecutorPool());
- List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
- for (SocketAddress listenAddress : listenAddresses) {
- NettyServer transportServer = new NettyServer(
- listenAddress, channelFactory, channelOptions, childChannelOptions,
- bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator,
- streamTracerFactories, transportTracerFactory, maxConcurrentCallsPerConnection,
- autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize,
- keepAliveTimeInNanos, keepAliveTimeoutInNanos,
- maxConnectionIdleInNanos, maxConnectionAgeInNanos,
- maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos,
- eagAttributes, this.serverImplBuilder.getChannelz());
- transportServers.add(transportServer);
- }
- return Collections.unmodifiableList(transportServers);
+ return new NettyServer(
+ listenAddresses, channelFactory, channelOptions, childChannelOptions,
+ bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator,
+ streamTracerFactories, transportTracerFactory, maxConcurrentCallsPerConnection,
+ autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize,
+ keepAliveTimeInNanos, keepAliveTimeoutInNanos,
+ maxConnectionIdleInNanos, maxConnectionAgeInNanos,
+ maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos,
+ eagAttributes, this.serverImplBuilder.getChannelz());
}
@VisibleForTesting
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 7bff844..fe3e604 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -773,7 +773,7 @@
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
- TestUtils.testServerAddress(new InetSocketAddress(0)),
+ TestUtils.testServerAddresses(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java
index 44e13d8..6d81923 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java
@@ -26,7 +26,6 @@
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
@@ -46,12 +45,12 @@
private NettyServerBuilder builder = NettyServerBuilder.forPort(8080);
@Test
- public void createMultipleServers() {
+ public void addMultipleListenAddresses() {
builder.addListenAddress(new InetSocketAddress(8081));
- List<NettyServer> servers =
+ NettyServer server =
builder.buildTransportServers(ImmutableList.<ServerStreamTracer.Factory>of());
- Truth.assertThat(servers).hasSize(2);
+ Truth.assertThat(server.getListenSocketAddresses()).hasSize(2);
}
@Test
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index 94c42cf..3f277ed 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -19,9 +19,18 @@
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.InternalChannelz.id;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.InternalChannelz;
@@ -36,31 +45,63 @@
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.ReflectiveChannelFactory;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AsciiString;
+import io.netty.util.concurrent.Future;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class NettyServerTest {
private final InternalChannelz channelz = new InternalChannelz();
private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(1);
+ private final ChannelFactory<NioServerSocketChannel> channelFactory =
+ new ReflectiveChannelFactory<>(NioServerSocketChannel.class);
+
+ @Mock
+ EventLoopGroup mockEventLoopGroup;
+ @Mock
+ EventLoop mockEventLoop;
+ @Mock
+ Future<Map<ChannelFuture, SocketAddress>> bindFuture;
+
+ @Before
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockEventLoopGroup.next()).thenReturn(mockEventLoop);
+ when(mockEventLoop
+ .submit(ArgumentMatchers.<Callable<Map<ChannelFuture, SocketAddress>>>any()))
+ .thenReturn(bindFuture);
+ }
@After
public void tearDown() throws Exception {
@@ -90,7 +131,7 @@
NoHandlerProtocolNegotiator protocolNegotiator = new NoHandlerProtocolNegotiator();
NettyServer ns = new NettyServer(
- addr,
+ Arrays.asList(addr),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
@@ -135,10 +176,146 @@
}
@Test
+ public void multiPortStartStopGet() throws Exception {
+ InetSocketAddress addr1 = new InetSocketAddress(0);
+ InetSocketAddress addr2 = new InetSocketAddress(0);
+
+ NettyServer ns = new NettyServer(
+ Arrays.asList(addr1, addr2),
+ new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
+ new HashMap<ChannelOption<?>, Object>(),
+ new HashMap<ChannelOption<?>, Object>(),
+ new FixedObjectPool<>(eventLoop),
+ new FixedObjectPool<>(eventLoop),
+ false,
+ ProtocolNegotiators.plaintext(),
+ Collections.<ServerStreamTracer.Factory>emptyList(),
+ TransportTracer.getDefaultFactory(),
+ 1, // ignore
+ false, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, 1, // ignore
+ 1, 1, // ignore
+ true, 0, // ignore
+ Attributes.EMPTY,
+ channelz);
+ final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
+ ns.start(new ServerListener() {
+ @Override
+ public ServerTransportListener transportCreated(ServerTransport transport) {
+ return new NoopServerTransportListener();
+ }
+
+ @Override
+ public void serverShutdown() {
+ shutdownCompleted.set(null);
+ }
+ });
+
+ // SocketStats won't be available until the event loop task of adding SocketStats created by
+ // ns.start() complete. So submit a noop task and await until it's drained.
+ eventLoop.submit(new Runnable() {
+ @Override
+ public void run() {}
+ }).await(5, TimeUnit.SECONDS);
+
+ assertEquals(2, ns.getListenSocketAddresses().size());
+ for (SocketAddress address: ns.getListenSocketAddresses()) {
+ assertThat(((InetSocketAddress) address).getPort()).isGreaterThan(0);
+ }
+
+ List<InternalInstrumented<SocketStats>> stats = ns.getListenSocketStatsList();
+ assertEquals(2, ns.getListenSocketStatsList().size());
+ for (InternalInstrumented<SocketStats> listenSocket : stats) {
+ assertSame(listenSocket, channelz.getSocket(id(listenSocket)));
+ // very basic sanity check of the contents
+ SocketStats socketStats = listenSocket.getStats().get();
+ assertThat(ns.getListenSocketAddresses()).contains(socketStats.local);
+ assertNull(socketStats.remote);
+ }
+
+ // Cleanup
+ ns.shutdown();
+ shutdownCompleted.get();
+
+ // listen socket is removed
+ for (InternalInstrumented<SocketStats> listenSocket : stats) {
+ assertNull(channelz.getSocket(id(listenSocket)));
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void multiPortConnections() throws Exception {
+ InetSocketAddress addr1 = new InetSocketAddress(0);
+ InetSocketAddress addr2 = new InetSocketAddress(0);
+ final CountDownLatch allPortsConnectedCountDown = new CountDownLatch(2);
+
+ NettyServer ns = new NettyServer(
+ Arrays.asList(addr1, addr2),
+ new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
+ new HashMap<ChannelOption<?>, Object>(),
+ new HashMap<ChannelOption<?>, Object>(),
+ new FixedObjectPool<>(eventLoop),
+ new FixedObjectPool<>(eventLoop),
+ false,
+ ProtocolNegotiators.plaintext(),
+ Collections.<ServerStreamTracer.Factory>emptyList(),
+ TransportTracer.getDefaultFactory(),
+ 1, // ignore
+ false, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, 1, // ignore
+ 1, 1, // ignore
+ true, 0, // ignore
+ Attributes.EMPTY,
+ channelz);
+ final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
+ ns.start(new ServerListener() {
+ @Override
+ public ServerTransportListener transportCreated(ServerTransport transport) {
+ allPortsConnectedCountDown.countDown();
+ return new NoopServerTransportListener();
+ }
+
+ @Override
+ public void serverShutdown() {
+ shutdownCompleted.set(null);
+ }
+ });
+
+ // SocketStats won't be available until the event loop task of adding SocketStats created by
+ // ns.start() complete. So submit a noop task and await until it's drained.
+ eventLoop.submit(new Runnable() {
+ @Override
+ public void run() {}
+ }).await(5, TimeUnit.SECONDS);
+
+ List<SocketAddress> serverSockets = ns.getListenSocketAddresses();
+ assertEquals(2, serverSockets.size());
+
+ for (int i = 0; i < 2; i++) {
+ Socket socket = new Socket();
+ socket.connect(serverSockets.get(i), /* timeout= */ 8000);
+ socket.close();
+ }
+ allPortsConnectedCountDown.await();
+ // Cleanup
+ ns.shutdown();
+ shutdownCompleted.get();
+ }
+
+ @Test
public void getPort_notStarted() {
InetSocketAddress addr = new InetSocketAddress(0);
+ List<InetSocketAddress> addresses = Collections.singletonList(addr);
NettyServer ns = new NettyServer(
- addr,
+ addresses,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
@@ -161,6 +338,7 @@
channelz);
assertThat(ns.getListenSocketAddress()).isEqualTo(addr);
+ assertThat(ns.getListenSocketAddresses()).isEqualTo(addresses);
}
@Test(timeout = 60000)
@@ -211,7 +389,7 @@
TestProtocolNegotiator protocolNegotiator = new TestProtocolNegotiator();
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
- addr,
+ Arrays.asList(addr),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
childChannelOptions,
@@ -258,7 +436,7 @@
public void channelzListenSocket() throws Exception {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
- addr,
+ Arrays.asList(addr),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
@@ -320,6 +498,110 @@
assertNull(channelz.getSocket(id(listenSocket)));
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBindScheduleFailure() throws Exception {
+ when(bindFuture.awaitUninterruptibly()).thenReturn(bindFuture);
+ when(bindFuture.isSuccess()).thenReturn(false);
+ when(bindFuture.getNow()).thenReturn(null);
+ Throwable mockCause = mock(Throwable.class);
+ when(bindFuture.cause()).thenReturn(mockCause);
+ Future<Void> mockFuture = (Future<Void>) mock(Future.class);
+ doReturn(mockFuture).when(mockEventLoopGroup).submit(any(Runnable.class));
+ SocketAddress addr = new InetSocketAddress(0);
+ verifyServerNotStart(Collections.singletonList(addr), mockEventLoopGroup,
+ IOException.class, "Failed to bind to addresses " + Arrays.asList(addr));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBindFailure() throws Exception {
+ when(bindFuture.awaitUninterruptibly()).thenReturn(bindFuture);
+ ChannelFuture future = mock(ChannelFuture.class);
+ when(future.awaitUninterruptibly()).thenReturn(future);
+ when(future.isSuccess()).thenReturn(false);
+ Channel channel = channelFactory.newChannel();
+ eventLoop.register(channel);
+ when(future.channel()).thenReturn(channel);
+ Throwable mockCause = mock(Throwable.class);
+ when(future.cause()).thenReturn(mockCause);
+ SocketAddress addr = new InetSocketAddress(0);
+ Map<ChannelFuture, SocketAddress> map = ImmutableMap.of(future, addr);
+ when(bindFuture.getNow()).thenReturn(map);
+ when(bindFuture.isSuccess()).thenReturn(true);
+ Future<Void> mockFuture = (Future<Void>) mock(Future.class);
+ doReturn(mockFuture).when(mockEventLoopGroup).submit(any(Runnable.class));
+ verifyServerNotStart(Collections.singletonList(addr), mockEventLoopGroup,
+ IOException.class, "Failed to bind to address " + addr);
+ }
+
+ @Test
+ public void testBindPartialFailure() throws Exception {
+ SocketAddress add1 = new InetSocketAddress(0);
+ SocketAddress add2 = new InetSocketAddress(2);
+ SocketAddress add3 = new InetSocketAddress(2);
+ verifyServerNotStart(ImmutableList.of(add1, add2, add3), eventLoop,
+ IOException.class, "Failed to bind to address " + add3);
+ }
+
+ private void verifyServerNotStart(List<SocketAddress> addr, EventLoopGroup ev,
+ Class<?> expectedException, String expectedMessage)
+ throws Exception {
+ NettyServer ns = getServer(addr, ev);
+ try {
+ ns.start(new ServerListener() {
+ @Override
+ public ServerTransportListener transportCreated(ServerTransport transport) {
+ return new NoopServerTransportListener();
+ }
+
+ @Override
+ public void serverShutdown() {
+ }
+ });
+ } catch (Exception ex) {
+ assertTrue(expectedException.isInstance(ex));
+ assertThat(ex.getMessage()).isEqualTo(expectedMessage);
+ assertFalse(addr.isEmpty());
+ // Listener tasks are executed on the event loop, so await until noop task is drained.
+ ev.submit(new Runnable() {
+ @Override
+ public void run() {}
+ }).await(5, TimeUnit.SECONDS);
+ assertThat(ns.getListenSocketAddress()).isEqualTo(addr.get(0));
+ assertThat(ns.getListenSocketAddresses()).isEqualTo(addr);
+ assertTrue(ns.getListenSocketStatsList().isEmpty());
+ assertNull(ns.getListenSocketStats());
+ return;
+ }
+ fail();
+ }
+
+ private NettyServer getServer(List<SocketAddress> addr, EventLoopGroup ev) {
+ return new NettyServer(
+ addr,
+ new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
+ new HashMap<ChannelOption<?>, Object>(),
+ new HashMap<ChannelOption<?>, Object>(),
+ new FixedObjectPool<>(ev),
+ new FixedObjectPool<>(ev),
+ false,
+ ProtocolNegotiators.plaintext(),
+ Collections.<ServerStreamTracer.Factory>emptyList(),
+ TransportTracer.getDefaultFactory(),
+ 1, // ignore
+ false, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, 1, // ignore
+ 1, 1, // ignore
+ true, 0, // ignore
+ Attributes.EMPTY,
+ channelz);
+ }
+
private static class NoopServerTransportListener implements ServerTransportListener {
@Override public void streamCreated(ServerStream stream, String method, Metadata headers) {}
diff --git a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
index 7280ba6..6aa6ca1 100644
--- a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
@@ -63,7 +63,7 @@
}
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
return NettyServerBuilder
.forAddress(new InetSocketAddress("localhost", 0))
@@ -73,7 +73,7 @@
}
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return NettyServerBuilder
.forAddress(new InetSocketAddress("localhost", port))
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
index 1f812fe..44e493c 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java
@@ -51,7 +51,7 @@
}
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
NettyServerBuilder builder = NettyServerBuilder
.forPort(0)
@@ -61,7 +61,7 @@
}
@Override
- protected List<? extends InternalServer> newServer(
+ protected InternalServer newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
NettyServerBuilder builder = NettyServerBuilder
.forAddress(new InetSocketAddress(port))
diff --git a/testing/src/main/java/io/grpc/internal/testing/TestUtils.java b/testing/src/main/java/io/grpc/internal/testing/TestUtils.java
index 6b467ef..e715b19 100644
--- a/testing/src/main/java/io/grpc/internal/testing/TestUtils.java
+++ b/testing/src/main/java/io/grpc/internal/testing/TestUtils.java
@@ -78,6 +78,24 @@
}
/**
+ * Creates a new list of {@link InetSocketAddress} on localhost that overrides the host with
+ * {@link #TEST_SERVER_HOST}.
+ */
+ public static List<InetSocketAddress> testServerAddresses(InetSocketAddress... originalSockAddr) {
+ try {
+ InetAddress inetAddress = InetAddress.getByName("localhost");
+ inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress());
+ List<InetSocketAddress> addresses = new ArrayList<>();
+ for (InetSocketAddress orig: originalSockAddr) {
+ addresses.add(new InetSocketAddress(inetAddress, orig.getPort()));
+ }
+ return addresses;
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
* Returns the ciphers preferred to use during tests. They may be chosen because they are widely
* available or because they are fast. There is no requirement that they provide confidentiality
* or integrity.