blob: b20464494b984d1816bf5f5e724162f8627ec3d0 [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if !_WIN32
#include "async-unix.h"
#include "thread.h"
#include "debug.h"
#include "io.h"
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <kj/compat/gtest.h>
#include <pthread.h>
#include <algorithm>
#include <sys/wait.h>
#include <sys/time.h>
#include <errno.h>
#include "mutex.h"
#if __BIONIC__
// Android's Bionic defines SIGRTMIN but using it in sigaddset() throws EINVAL, which means we
// definitely can't actually use RT signals.
#undef SIGRTMIN
#endif
namespace kj {
namespace {
inline void delay() { usleep(10000); }
// On OSX, si_code seems to be zero when SI_USER is expected.
#if __linux__ || __CYGWIN__
#define EXPECT_SI_CODE EXPECT_EQ
#else
#define EXPECT_SI_CODE(a,b)
#endif
void captureSignals() {
static bool captured = false;
if (!captured) {
// We use SIGIO and SIGURG as our test signals because they're two signals that we can be
// reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test. We can't
// use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
UnixEventPort::captureSignal(SIGURG);
UnixEventPort::captureSignal(SIGIO);
#ifdef SIGRTMIN
UnixEventPort::captureSignal(SIGRTMIN);
#endif
UnixEventPort::captureChildExit();
captured = true;
}
}
TEST(AsyncUnixTest, Signals) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
kill(getpid(), SIGURG);
siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
EXPECT_EQ(SIGURG, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}
#if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
TEST(AsyncUnixTest, SignalWithValue) {
// This tests that if we use sigqueue() to attach a value to the signal, that value is received
// correctly. Note that this only works on platforms that support real-time signals -- even
// though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
// signals. Hence this test won't run on e.g. Mac OSX.
//
// Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
//
// Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
// machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
// Sad. https://github.com/capnproto/capnproto/issues/204
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
union sigval value;
memset(&value, 0, sizeof(value));
value.sival_int = 123;
KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
case ENOSYS:
// sigqueue() not supported. Maybe running on WSL.
KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
return;
default:
KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
}
siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
EXPECT_EQ(SIGURG, info.si_signo);
EXPECT_SI_CODE(SI_QUEUE, info.si_code);
EXPECT_EQ(123, info.si_value.sival_int);
}
TEST(AsyncUnixTest, SignalWithPointerValue) {
// This tests that if we use sigqueue() to attach a value to the signal, that value is received
// correctly. Note that this only works on platforms that support real-time signals -- even
// though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
// signals. Hence this test won't run on e.g. Mac OSX.
//
// Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
//
// Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
// machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
// Sad. https://github.com/capnproto/capnproto/issues/204
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
union sigval value;
memset(&value, 0, sizeof(value));
value.sival_ptr = &port;
KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
case ENOSYS:
// sigqueue() not supported. Maybe running on WSL.
KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
return;
default:
KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
}
siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
EXPECT_EQ(SIGURG, info.si_signo);
EXPECT_SI_CODE(SI_QUEUE, info.si_code);
EXPECT_EQ(&port, info.si_value.sival_ptr);
}
#endif
TEST(AsyncUnixTest, SignalsMultiListen) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
port.onSignal(SIGIO).then([](siginfo_t&&) {
KJ_FAIL_EXPECT("Received wrong signal.");
}).detach([](kj::Exception&& exception) {
KJ_FAIL_EXPECT(exception);
});
kill(getpid(), SIGURG);
siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
EXPECT_EQ(SIGURG, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.
TEST(AsyncUnixTest, SignalsMultiReceive) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
kill(getpid(), SIGURG);
kill(getpid(), SIGIO);
siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
EXPECT_EQ(SIGURG, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
info = port.onSignal(SIGIO).wait(waitScope);
EXPECT_EQ(SIGIO, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}
#endif // !__CYGWIN32__
TEST(AsyncUnixTest, SignalsAsync) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
// Arrange for a signal to be sent from another thread.
pthread_t mainThread = pthread_self();
Thread thread([&]() {
delay();
pthread_kill(mainThread, SIGURG);
});
siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
EXPECT_EQ(SIGURG, info.si_signo);
#if __linux__
EXPECT_SI_CODE(SI_TKILL, info.si_code);
#endif
}
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.
TEST(AsyncUnixTest, SignalsNoWait) {
// Verify that UnixEventPort::poll() correctly receives pending signals.
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
bool receivedSigurg = false;
bool receivedSigio = false;
port.onSignal(SIGURG).then([&](siginfo_t&& info) {
receivedSigurg = true;
EXPECT_EQ(SIGURG, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
port.onSignal(SIGIO).then([&](siginfo_t&& info) {
receivedSigio = true;
EXPECT_EQ(SIGIO, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
kill(getpid(), SIGURG);
kill(getpid(), SIGIO);
EXPECT_FALSE(receivedSigurg);
EXPECT_FALSE(receivedSigio);
loop.run();
EXPECT_FALSE(receivedSigurg);
EXPECT_FALSE(receivedSigio);
port.poll();
EXPECT_FALSE(receivedSigurg);
EXPECT_FALSE(receivedSigio);
loop.run();
EXPECT_TRUE(receivedSigurg);
EXPECT_TRUE(receivedSigio);
}
#endif // !__CYGWIN32__
TEST(AsyncUnixTest, ReadObserver) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(outfd, "foo", 3));
observer.whenBecomesReadable().wait(waitScope);
#if __linux__ // platform known to support POLLRDHUP
EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
char buffer[4096];
ssize_t n;
KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
EXPECT_EQ(3, n);
KJ_SYSCALL(write(outfd, "bar", 3));
outfd = nullptr;
observer.whenBecomesReadable().wait(waitScope);
EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
#endif
}
TEST(AsyncUnixTest, ReadObserverMultiListen) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int bogusPipefds[2];
KJ_SYSCALL(pipe(bogusPipefds));
KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
bogusObserver.whenBecomesReadable().then([]() {
ADD_FAILURE() << "Received wrong poll.";
}).detach([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(pipefds[1], "foo", 3));
observer.whenBecomesReadable().wait(waitScope);
}
TEST(AsyncUnixTest, ReadObserverMultiReceive) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(pipefds[1], "foo", 3));
int pipefds2[2];
KJ_SYSCALL(pipe(pipefds2));
KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
UnixEventPort::FdObserver observer2(port, pipefds2[0],
UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(pipefds2[1], "bar", 3));
auto promise1 = observer.whenBecomesReadable();
auto promise2 = observer2.whenBecomesReadable();
promise1.wait(waitScope);
promise2.wait(waitScope);
}
TEST(AsyncUnixTest, ReadObserverAsync) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
// Make a pipe and wait on its read end while another thread writes to it.
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
Thread thread([&]() {
delay();
KJ_SYSCALL(write(pipefds[1], "foo", 3));
});
// Wait for the event in this thread.
observer.whenBecomesReadable().wait(waitScope);
}
TEST(AsyncUnixTest, ReadObserverNoWait) {
// Verify that UnixEventPort::poll() correctly receives pending FD events.
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
int pipefds2[2];
KJ_SYSCALL(pipe(pipefds2));
KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
UnixEventPort::FdObserver observer2(port, pipefds2[0],
UnixEventPort::FdObserver::OBSERVE_READ);
int receivedCount = 0;
observer.whenBecomesReadable().then([&]() {
receivedCount++;
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
observer2.whenBecomesReadable().then([&]() {
receivedCount++;
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
KJ_SYSCALL(write(pipefds[1], "foo", 3));
KJ_SYSCALL(write(pipefds2[1], "bar", 3));
EXPECT_EQ(0, receivedCount);
loop.run();
EXPECT_EQ(0, receivedCount);
port.poll();
EXPECT_EQ(0, receivedCount);
loop.run();
EXPECT_EQ(2, receivedCount);
}
static void setNonblocking(int fd) {
int flags;
KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
if ((flags & O_NONBLOCK) == 0) {
KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
}
}
TEST(AsyncUnixTest, WriteObserver) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
setNonblocking(outfd);
setNonblocking(infd);
UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
// Fill buffer.
ssize_t n;
do {
KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
} while (n >= 0);
bool writable = false;
auto promise = observer.whenBecomesWritable()
.then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
loop.run();
port.poll();
loop.run();
EXPECT_FALSE(writable);
// Empty the read end so that the write end becomes writable. Note that Linux implements a
// high watermark / low watermark heuristic which means that only reading one byte is not
// sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
// 1 page. To be safe, we read everything.
char buffer[4096];
do {
KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
} while (n > 0);
loop.run();
port.poll();
loop.run();
EXPECT_TRUE(writable);
}
#if !__APPLE__
// Disabled on macOS due to https://github.com/capnproto/capnproto/issues/374.
TEST(AsyncUnixTest, UrgentObserver) {
// Verify that FdObserver correctly detects availability of out-of-band data.
// Availability of out-of-band data is implementation-specific.
// Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
// for this test.
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int tmpFd;
char c;
// Spawn a TCP server
KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
kj::AutoCloseFd serverFd(tmpFd);
sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
socklen_t saddrLen = sizeof(saddr);
KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
KJ_SYSCALL(listen(serverFd, 1));
// Create a pipe that we'll use to signal if MSG_OOB return EINVAL.
int failpipe[2];
KJ_SYSCALL(pipe(failpipe));
KJ_DEFER({
close(failpipe[0]);
close(failpipe[1]);
});
// Accept one connection, send in-band and OOB byte, wait for a quit message
Thread thread([&]() {
int tmpFd;
char c;
sockaddr_in caddr;
socklen_t caddrLen = sizeof(caddr);
KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
kj::AutoCloseFd clientFd(tmpFd);
delay();
// Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
c = 'i';
KJ_SYSCALL(send(clientFd, &c, 1, 0));
c = 'o';
KJ_SYSCALL_HANDLE_ERRORS(send(clientFd, &c, 1, MSG_OOB)) {
case EINVAL:
// Looks like MSG_OOB is not supported. (This is the case e.g. on WSL.)
KJ_SYSCALL(write(failpipe[1], &c, 1));
break;
default:
KJ_FAIL_SYSCALL("send(..., MSG_OOB)", error);
}
KJ_SYSCALL(recv(clientFd, &c, 1, 0));
EXPECT_EQ('q', c);
});
KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
kj::AutoCloseFd clientFd(tmpFd);
KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
UnixEventPort::FdObserver observer(port, clientFd,
UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
UnixEventPort::FdObserver failObserver(port, failpipe[0],
UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
auto promise = observer.whenUrgentDataAvailable().then([]() { return true; });
auto failPromise = failObserver.whenBecomesReadable().then([]() { return false; });
bool oobSupported = promise.exclusiveJoin(kj::mv(failPromise)).wait(waitScope);
if (oobSupported) {
#if __CYGWIN__
// On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
// such a time as the connection closes -- and then the byte is successfully returned. This
// seems to be a cygwin bug.
KJ_SYSCALL(recv(clientFd, &c, 1, 0));
EXPECT_EQ('i', c);
KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
EXPECT_EQ('o', c);
#else
// Attempt to read the urgent byte prior to reading the in-band byte.
KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
EXPECT_EQ('o', c);
KJ_SYSCALL(recv(clientFd, &c, 1, 0));
EXPECT_EQ('i', c);
#endif
} else {
KJ_LOG(WARNING, "MSG_OOB doesn't seem to be supported on your platform.");
}
// Allow server thread to let its clientFd go out of scope.
c = 'q';
KJ_SYSCALL(send(clientFd, &c, 1, 0));
KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
}
#endif
TEST(AsyncUnixTest, SteadyTimers) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto& timer = port.getTimer();
auto start = timer.now();
kj::Vector<TimePoint> expected;
kj::Vector<TimePoint> actual;
auto addTimer = [&](Duration delay) {
expected.add(max(start + delay, start));
timer.atTime(start + delay).then([&]() {
actual.add(timer.now());
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
};
addTimer(30 * MILLISECONDS);
addTimer(40 * MILLISECONDS);
addTimer(20350 * MICROSECONDS);
addTimer(30 * MILLISECONDS);
addTimer(-10 * MILLISECONDS);
std::sort(expected.begin(), expected.end());
timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
ASSERT_EQ(expected.size(), actual.size());
for (int i = 0; i < expected.size(); ++i) {
KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
i, ((expected[i] - actual[i]) / NANOSECONDS));
}
}
bool dummySignalHandlerCalled = false;
void dummySignalHandler(int) {
dummySignalHandlerCalled = true;
}
TEST(AsyncUnixTest, InterruptedTimer) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
#if __linux__
// Linux timeslices are 1ms.
constexpr auto OS_SLOWNESS_FACTOR = 1;
#else
// OSX timeslices are 10ms, so we need longer timeouts to avoid flakiness.
// To be safe we'll assume other OS's are similar.
constexpr auto OS_SLOWNESS_FACTOR = 10;
#endif
// Schedule a timer event in 100ms.
auto& timer = port.getTimer();
auto start = timer.now();
constexpr auto timeout = 100 * MILLISECONDS * OS_SLOWNESS_FACTOR;
// Arrange SIGALRM to be delivered in 50ms, handled in an empty signal handler. This will cause
// our wait to be interrupted with EINTR. We should nevertheless continue waiting for the right
// amount of time.
dummySignalHandlerCalled = false;
if (signal(SIGALRM, &dummySignalHandler) == SIG_ERR) {
KJ_FAIL_SYSCALL("signal(SIGALRM)", errno);
}
struct itimerval itv;
memset(&itv, 0, sizeof(itv));
itv.it_value.tv_usec = 50000 * OS_SLOWNESS_FACTOR; // signal after 50ms
setitimer(ITIMER_REAL, &itv, nullptr);
timer.afterDelay(timeout).wait(waitScope);
KJ_EXPECT(dummySignalHandlerCalled);
KJ_EXPECT(timer.now() - start >= timeout);
KJ_EXPECT(timer.now() - start <= timeout + (timeout / 5)); // allow 20ms error
}
TEST(AsyncUnixTest, Wake) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
EXPECT_FALSE(port.poll());
port.wake();
EXPECT_TRUE(port.poll());
EXPECT_FALSE(port.poll());
port.wake();
EXPECT_TRUE(port.wait());
{
auto promise = port.getTimer().atTime(port.getTimer().now());
EXPECT_FALSE(port.wait());
}
// Test wake() when already wait()ing.
{
Thread thread([&]() {
delay();
port.wake();
});
EXPECT_TRUE(port.wait());
}
// Test wait() after wake() already happened.
{
Thread thread([&]() {
port.wake();
});
delay();
EXPECT_TRUE(port.wait());
}
// Test wake() during poll() busy loop.
{
Thread thread([&]() {
delay();
port.wake();
});
EXPECT_FALSE(port.poll());
while (!port.poll()) {}
}
// Test poll() when wake() already delivered.
{
EXPECT_FALSE(port.poll());
Thread thread([&]() {
port.wake();
});
do {
delay();
} while (!port.poll());
}
}
int exitCodeForSignal = 0;
[[noreturn]] void exitSignalHandler(int) {
_exit(exitCodeForSignal);
}
struct TestChild {
kj::Maybe<pid_t> pid;
kj::Promise<int> promise = nullptr;
TestChild(UnixEventPort& port, int exitCode) {
pid_t p;
KJ_SYSCALL(p = fork());
if (p == 0) {
// Arrange for SIGTERM to cause the process to exit normally.
exitCodeForSignal = exitCode;
signal(SIGTERM, &exitSignalHandler);
sigset_t sigs;
sigemptyset(&sigs);
sigaddset(&sigs, SIGTERM);
pthread_sigmask(SIG_UNBLOCK, &sigs, nullptr);
for (;;) pause();
}
pid = p;
promise = port.onChildExit(pid);
}
~TestChild() noexcept(false) {
KJ_IF_MAYBE(p, pid) {
KJ_SYSCALL(::kill(*p, SIGKILL)) { return; }
int status;
KJ_SYSCALL(waitpid(*p, &status, 0)) { return; }
}
}
void kill(int signo) {
KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo));
}
KJ_DISALLOW_COPY(TestChild);
};
TEST(AsyncUnixTest, ChildProcess) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
// Block SIGTERM so that we can carefully un-block it in children.
sigset_t sigs, oldsigs;
KJ_SYSCALL(sigemptyset(&sigs));
KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &sigs, &oldsigs));
KJ_DEFER(KJ_SYSCALL(pthread_sigmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });
TestChild child1(port, 123);
KJ_EXPECT(!child1.promise.poll(waitScope));
child1.kill(SIGTERM);
{
int status = child1.promise.wait(waitScope);
KJ_EXPECT(WIFEXITED(status));
KJ_EXPECT(WEXITSTATUS(status) == 123);
}
TestChild child2(port, 234);
TestChild child3(port, 345);
KJ_EXPECT(!child2.promise.poll(waitScope));
KJ_EXPECT(!child3.promise.poll(waitScope));
child2.kill(SIGKILL);
{
int status = child2.promise.wait(waitScope);
KJ_EXPECT(!WIFEXITED(status));
KJ_EXPECT(WIFSIGNALED(status));
KJ_EXPECT(WTERMSIG(status) == SIGKILL);
}
KJ_EXPECT(!child3.promise.poll(waitScope));
// child3 will be killed and synchronously waited on the way out.
}
#if !__CYGWIN__
// TODO(someday): Figure out why whenWriteDisconnected() never resolves on Cygwin.
KJ_TEST("UnixEventPort whenWriteDisconnected()") {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int fds_[2];
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_));
kj::AutoCloseFd fds[2] = { kj::AutoCloseFd(fds_[0]), kj::AutoCloseFd(fds_[1]) };
UnixEventPort::FdObserver observer(port, fds[0], UnixEventPort::FdObserver::OBSERVE_READ);
// At one point, the poll()-based version of UnixEventPort had a bug where if some other event
// had completed previously, whenWriteDisconnected() would stop being watched for. So we watch
// for readability as well and check that that goes away first.
auto readablePromise = observer.whenBecomesReadable();
auto hupPromise = observer.whenWriteDisconnected();
KJ_EXPECT(!readablePromise.poll(waitScope));
KJ_EXPECT(!hupPromise.poll(waitScope));
KJ_SYSCALL(write(fds[1], "foo", 3));
KJ_ASSERT(readablePromise.poll(waitScope));
readablePromise.wait(waitScope);
{
char junk[16];
ssize_t n;
KJ_SYSCALL(n = read(fds[0], junk, 16));
KJ_EXPECT(n == 3);
}
KJ_EXPECT(!hupPromise.poll(waitScope));
fds[1] = nullptr;
KJ_ASSERT(hupPromise.poll(waitScope));
hupPromise.wait(waitScope);
}
KJ_TEST("UnixEventPort FdObserver(..., flags=0)::whenWriteDisconnected()") {
// Verifies that given `0' as a `flags' argument,
// FdObserver still observes whenWriteDisconnected().
//
// This can be useful to watch disconnection on a blocking file descriptor.
// See discussion: https://github.com/capnproto/capnproto/issues/924
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
UnixEventPort::FdObserver observer(port, outfd, 0);
auto hupPromise = observer.whenWriteDisconnected();
KJ_EXPECT(!hupPromise.poll(waitScope));
infd = nullptr;
KJ_ASSERT(hupPromise.poll(waitScope));
hupPromise.wait(waitScope);
}
#endif
KJ_TEST("UnixEventPort poll for signals") {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto promise1 = port.onSignal(SIGURG);
auto promise2 = port.onSignal(SIGIO);
KJ_EXPECT(!promise1.poll(waitScope));
KJ_EXPECT(!promise2.poll(waitScope));
KJ_SYSCALL(raise(SIGURG));
KJ_SYSCALL(raise(SIGIO));
port.wake();
KJ_EXPECT(port.poll());
KJ_EXPECT(promise1.poll(waitScope));
KJ_EXPECT(promise2.poll(waitScope));
promise1.wait(waitScope);
promise2.wait(waitScope);
}
#if defined(SIGRTMIN) && !__CYGWIN__ && !__aarch64__
// TODO(someday): Figure out why RT signals don't seem to work correctly on Cygwin. It looks like
// only the first signal is delivered, like how non-RT signals work. Is it possible Cygwin
// advertites RT signal support but doesn't actually implement them correctly? I can't find any
// information on the internet about this and TBH I don't care about Cygwin enough to dig in.
// TODO(someday): Figure out why RT signals don't work under qemu-user emulating aarch64 on
// Debian Buster.
void testRtSignals(UnixEventPort& port, WaitScope& waitScope, bool doPoll) {
union sigval value;
memset(&value, 0, sizeof(value));
// Queue three copies of the signal upfront.
for (uint i = 0; i < 3; i++) {
value.sival_int = 123 + i;
KJ_SYSCALL(sigqueue(getpid(), SIGRTMIN, value));
}
// Now wait for them.
for (uint i = 0; i < 3; i++) {
auto promise = port.onSignal(SIGRTMIN);
if (doPoll) {
KJ_ASSERT(promise.poll(waitScope));
}
auto info = promise.wait(waitScope);
KJ_EXPECT(info.si_value.sival_int == 123 + i);
}
KJ_EXPECT(!port.onSignal(SIGRTMIN).poll(waitScope));
}
KJ_TEST("UnixEventPort can receive multiple queued instances of an RT signal") {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
testRtSignals(port, waitScope, true);
// Test again, but don't poll() the promises. This may test a different code path, if poll() and
// wait() are very different in how they read signals. (For the poll(2)-based implementation of
// UnixEventPort, they are indeed pretty different.)
testRtSignals(port, waitScope, false);
}
#endif
} // namespace
} // namespace kj
#endif // !_WIN32