blob: 3f181516db455598fc773c267f5d3ccb940973d6 [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if !_WIN32
// For Win32 implementation, see async-io-win32.c++.
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include "async-io.h"
#include "async-io-internal.h"
#include "async-unix.h"
#include "debug.h"
#include "thread.h"
#include "io.h"
#include "miniposix.h"
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stddef.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <set>
#include <poll.h>
#include <limits.h>
#include <sys/ioctl.h>
#if !defined(SO_PEERCRED) && defined(LOCAL_PEERCRED)
#include <sys/ucred.h>
#endif
#if !defined(SOL_LOCAL) && (__FreeBSD__ || __DragonflyBSD__)
// On DragonFly or FreeBSD < 12.2 you're supposed to use 0 for SOL_LOCAL.
#define SOL_LOCAL 0
#endif
namespace kj {
namespace {
void setNonblocking(int fd) {
#ifdef FIONBIO
int opt = 1;
KJ_SYSCALL(ioctl(fd, FIONBIO, &opt));
#else
int flags;
KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
if ((flags & O_NONBLOCK) == 0) {
KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
}
#endif
}
void setCloseOnExec(int fd) {
#ifdef FIOCLEX
KJ_SYSCALL(ioctl(fd, FIOCLEX));
#else
int flags;
KJ_SYSCALL(flags = fcntl(fd, F_GETFD));
if ((flags & FD_CLOEXEC) == 0) {
KJ_SYSCALL(fcntl(fd, F_SETFD, flags | FD_CLOEXEC));
}
#endif
}
static constexpr uint NEW_FD_FLAGS =
#if __linux__ && !__BIONIC__
LowLevelAsyncIoProvider::ALREADY_CLOEXEC | LowLevelAsyncIoProvider::ALREADY_NONBLOCK |
#endif
LowLevelAsyncIoProvider::TAKE_OWNERSHIP;
// We always try to open FDs with CLOEXEC and NONBLOCK already set on Linux, but on other platforms
// this is not possible.
class OwnedFileDescriptor {
public:
OwnedFileDescriptor(int fd, uint flags): fd(fd), flags(flags) {
if (flags & LowLevelAsyncIoProvider::ALREADY_NONBLOCK) {
KJ_DREQUIRE(fcntl(fd, F_GETFL) & O_NONBLOCK, "You claimed you set NONBLOCK, but you didn't.");
} else {
setNonblocking(fd);
}
if (flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) {
if (flags & LowLevelAsyncIoProvider::ALREADY_CLOEXEC) {
KJ_DREQUIRE(fcntl(fd, F_GETFD) & FD_CLOEXEC,
"You claimed you set CLOEXEC, but you didn't.");
} else {
setCloseOnExec(fd);
}
}
}
~OwnedFileDescriptor() noexcept(false) {
// Don't use SYSCALL() here because close() should not be repeated on EINTR.
if ((flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) && close(fd) < 0) {
KJ_FAIL_SYSCALL("close", errno, fd) {
// Recoverable exceptions are safe in destructors.
break;
}
}
}
protected:
const int fd;
private:
uint flags;
};
// =======================================================================================
class AsyncStreamFd: public OwnedFileDescriptor, public AsyncCapabilityStream {
public:
AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags)
: OwnedFileDescriptor(fd, flags),
eventPort(eventPort),
observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ_WRITE) {}
virtual ~AsyncStreamFd() noexcept(false) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return tryReadInternal(buffer, minBytes, maxBytes, nullptr, 0, {0,0})
.then([](ReadResult r) { return r.byteCount; });
}
Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
AutoCloseFd* fdBuffer, size_t maxFds) override {
return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, {0,0});
}
Promise<ReadResult> tryReadWithStreams(
void* buffer, size_t minBytes, size_t maxBytes,
Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) override {
auto fdBuffer = kj::heapArray<AutoCloseFd>(maxStreams);
auto promise = tryReadInternal(buffer, minBytes, maxBytes, fdBuffer.begin(), maxStreams, {0,0});
return promise.then([this, fdBuffer = kj::mv(fdBuffer), streamBuffer]
(ReadResult result) mutable {
for (auto i: kj::zeroTo(result.capCount)) {
streamBuffer[i] = kj::heap<AsyncStreamFd>(eventPort, fdBuffer[i].release(),
LowLevelAsyncIoProvider::TAKE_OWNERSHIP | LowLevelAsyncIoProvider::ALREADY_CLOEXEC);
}
return result;
});
}
Promise<void> write(const void* buffer, size_t size) override {
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, size)) {
// Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
// a bug that exists in both Clang and GCC:
// http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
// http://llvm.org/bugs/show_bug.cgi?id=12286
goto error;
}
if (false) {
error:
return kj::READY_NOW;
}
if (n < 0) {
// EAGAIN -- need to wait for writability and try again.
return observer.whenBecomesWritable().then([=]() {
return write(buffer, size);
});
} else if (n == size) {
// All done.
return READY_NOW;
} else {
// Fewer than `size` bytes were written, but we CANNOT assume we're out of buffer space, as
// Linux is known to return partial reads/writes when interrupted by a signal -- yes, even
// for non-blocking operations. So, we'll need to write() again now, even though it will
// almost certainly fail with EAGAIN. See comments in the read path for more info.
buffer = reinterpret_cast<const byte*>(buffer) + n;
size -= n;
return write(buffer, size);
}
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
if (pieces.size() == 0) {
return writeInternal(nullptr, nullptr, nullptr);
} else {
return writeInternal(pieces[0], pieces.slice(1, pieces.size()), nullptr);
}
}
Promise<void> writeWithFds(ArrayPtr<const byte> data,
ArrayPtr<const ArrayPtr<const byte>> moreData,
ArrayPtr<const int> fds) override {
return writeInternal(data, moreData, fds);
}
Promise<void> writeWithStreams(ArrayPtr<const byte> data,
ArrayPtr<const ArrayPtr<const byte>> moreData,
Array<Own<AsyncCapabilityStream>> streams) override {
auto fds = KJ_MAP(stream, streams) {
return downcast<AsyncStreamFd>(*stream).fd;
};
auto promise = writeInternal(data, moreData, fds);
return promise.attach(kj::mv(fds), kj::mv(streams));
}
#if __linux__
Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& inputParam, uint64_t amount = kj::maxValue) override {
AsyncStreamFd& input = KJ_UNWRAP_OR_RETURN(
kj::dynamicDowncastIfAvailable<AsyncStreamFd>(inputParam), nullptr);
// The input is another AsyncStreamFd, so perhaps we can do an optimized pump with splice().
// Before we resort to a bunch of syscalls, let's try to see if the pump is small and able to
// be fully satisfied immediately. This optimizes for the case of small streams, e.g. a short
// HTTP body.
byte buffer[4096];
size_t pos = 0;
size_t initialAmount = kj::min(sizeof(buffer), amount);
bool eof = false;
// Read into the buffer until it's full or there are no bytes available. Note that we'd expect
// one call to read() will pull as much data out of the socket as possible (up to our buffer
// size), so you might think the loop is unnecessary. The reason we want to do a second read(),
// though, is to find out if we're at EOF or merely waiting for more data. In the EOF case,
// we can end the pump early without splicing.
while (pos < initialAmount) {
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = ::read(input.fd, buffer + pos, initialAmount - pos));
if (n <= 0) {
eof = n == 0;
break;
}
pos += n;
}
// Write the bytes that we just read back out to the output.
{
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, pos));
if (n < 0) n = 0; // treat EAGAIN as "zero bytes written"
if (size_t(n) < pos) {
// Oh crap, the output buffer is full. This should be rare. But, now we're going to have
// to copy the remaining bytes into the heap to do an async write.
auto leftover = kj::heapArray<byte>(buffer + n, pos - n);
auto promise = write(leftover.begin(), leftover.size());
promise = promise.attach(kj::mv(leftover));
if (eof || pos == amount) {
return promise.then([pos]() -> uint64_t { return pos; });
} else {
return promise.then([&input, this, pos, amount]() {
return splicePumpFrom(input, pos, amount);
});
}
}
}
if (eof || pos == amount) {
// We finished the pump in one go, so don't splice.
return Promise<uint64_t>(uint64_t(pos));
} else {
// Use splice for the rest of the pump.
return splicePumpFrom(input, pos, amount);
}
}
private:
static constexpr size_t MAX_SPLICE_LEN = 1 << 20;
// Maximum value we'll pass for the `len` argument of `splice()`. Linux does not like it when we
// use `kj::maxValue` here so we clamp it. Note that the actual value of this constant is
// irrelevanta as long as it is more than the pipe buffer size (typically 64k) and less than
// whatever value makes Linux unhappy. All actual operations will be clamped to the buffer size.
// (And if the buffer size is for some reason larger than this, that's OK too, we just won't
// end up using the whole buffer.)
Promise<uint64_t> splicePumpFrom(AsyncStreamFd& input, uint64_t readSoFar, uint64_t limit) {
// splice() requires that either its input or its output is a pipe. But chances are neither
// `input.fd` nor `this->fd` is a pipe -- in most use cases they are sockets. In order to take
// advantage of splice(), then, we need to allocate a pipe to act as the middleman, so we can
// splice() from the input to the pipe, and then from the pipe to the output.
//
// You might wonder why this pipe middleman is required. Why can't splice() go directly from
// a socket to a socket? Linus Torvalds attempts to explain here:
// https://yarchive.net/comp/linux/splice.html
//
// The short version is that the pipe itself is equivalent to an in-memory buffer. In a naive
// pump implementation, we allocate a buffer, read() into it and write() out. With splice(),
// we allocate a kernelspace buffer by allocating a pipe, then we splice() into the pipe and
// splice() back out.
// Linux normally allocates pipe buffers of 64k (16 pages of 4k each). However, when
// /proc/sys/fs/pipe-user-pages-soft is hit, then Linux will start allocating 4k (1 page)
// buffers instead, and will give an error if we try to increase it.
//
// The soft limit defaults to 16384 pages, which we'd hit after 1024 pipes -- totally possible
// in a big server. 64k is a nice buffer size, but even 4k is better than not using splice, so
// we'll live with whatever buffer size the kernel gives us.
//
// There is a second, "hard" limit, /proc/sys/fs/pipe-user-pages-hard, at which point Linux
// will start refusing to allocate pipes at all. In this case we fall back to an unoptimized
// pump. However, this limit defaults to unlimited, so this won't ever happen unless someone
// has manually changed the limit. That's probably dangerous since if the app allocates pipes
// anywhere else in its codebase, it probably doesn't have any fallbacks in those places, so
// things will break anyway... to avoid that we'd need to self-regulate the number of pipes
// we allocate here to avoid coming close to the hard limit, but that's a lot of effort so I'm
// not going to bother!
int pipeFds[2];
KJ_SYSCALL_HANDLE_ERRORS(pipe2(pipeFds, O_NONBLOCK | O_CLOEXEC)) {
case ENFILE:
// Probably hit the limit on pipe buffers, fall back to unoptimized pump.
return unoptimizedPumpTo(input, *this, limit, readSoFar);
default:
KJ_FAIL_SYSCALL("pipe2()", error);
}
AutoCloseFd pipeIn(pipeFds[0]), pipeOut(pipeFds[1]);
return splicePumpLoop(input, pipeFds[0], pipeFds[1], readSoFar, limit, 0)
.attach(kj::mv(pipeIn), kj::mv(pipeOut));
}
Promise<uint64_t> splicePumpLoop(AsyncStreamFd& input, int pipeIn, int pipeOut,
uint64_t readSoFar, uint64_t limit, size_t bufferedAmount) {
for (;;) {
while (bufferedAmount > 0) {
// First flush out whatever is in the pipe buffer.
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = splice(pipeIn, nullptr, fd, nullptr,
MAX_SPLICE_LEN, SPLICE_F_MOVE | SPLICE_F_NONBLOCK));
if (n > 0) {
KJ_ASSERT(n <= bufferedAmount, "splice pipe larger than bufferedAmount?");
bufferedAmount -= n;
} else {
KJ_ASSERT(n < 0, "splice pipe empty before bufferedAmount reached?", bufferedAmount);
return observer.whenBecomesWritable()
.then([this, &input, pipeIn, pipeOut, readSoFar, limit, bufferedAmount]() {
return splicePumpLoop(input, pipeIn, pipeOut, readSoFar, limit, bufferedAmount);
});
}
}
// Now the pipe buffer is empty, so we can try to read some more.
{
if (readSoFar >= limit) {
// Hit the limit, we're done.
KJ_ASSERT(readSoFar == limit);
return readSoFar;
}
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = splice(input.fd, nullptr, pipeOut, nullptr,
kj::min(limit - readSoFar, MAX_SPLICE_LEN), SPLICE_F_MOVE | SPLICE_F_NONBLOCK));
if (n == 0) {
// EOF.
return readSoFar;
} else if (n < 0) {
// No data available, wait.
return input.observer.whenBecomesReadable()
.then([this, &input, pipeIn, pipeOut, readSoFar, limit]() {
return splicePumpLoop(input, pipeIn, pipeOut, readSoFar, limit, 0);
});
}
readSoFar += n;
bufferedAmount = n;
}
}
}
public:
#endif // __linux__
Promise<void> whenWriteDisconnected() override {
KJ_IF_MAYBE(p, writeDisconnectedPromise) {
return p->addBranch();
} else {
auto fork = observer.whenWriteDisconnected().fork();
auto result = fork.addBranch();
writeDisconnectedPromise = kj::mv(fork);
return kj::mv(result);
}
}
void shutdownWrite() override {
// There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
// UnixAsyncIoProvider interface.
KJ_SYSCALL(shutdown(fd, SHUT_WR));
}
void abortRead() override {
// There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
// UnixAsyncIoProvider interface.
KJ_SYSCALL(shutdown(fd, SHUT_RD));
}
void getsockopt(int level, int option, void* value, uint* length) override {
socklen_t socklen = *length;
KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
*length = socklen;
}
void setsockopt(int level, int option, const void* value, uint length) override {
KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
}
void getsockname(struct sockaddr* addr, uint* length) override {
socklen_t socklen = *length;
KJ_SYSCALL(::getsockname(fd, addr, &socklen));
*length = socklen;
}
void getpeername(struct sockaddr* addr, uint* length) override {
socklen_t socklen = *length;
KJ_SYSCALL(::getpeername(fd, addr, &socklen));
*length = socklen;
}
kj::Maybe<int> getFd() const override {
return fd;
}
void registerAncillaryMessageHandler(
kj::Function<void(kj::ArrayPtr<AncillaryMessage>)> fn) override {
ancillaryMsgCallback = kj::mv(fn);
}
Promise<void> waitConnected() {
// Wait until initial connection has completed. This actually just waits until it is writable.
// Can't just go directly to writeObserver.whenBecomesWritable() because of edge triggering. We
// need to explicitly check if the socket is already connected.
struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = fd;
pollfd.events = POLLOUT;
int pollResult;
KJ_SYSCALL(pollResult = poll(&pollfd, 1, 0));
if (pollResult == 0) {
// Not ready yet. We can safely use the edge-triggered observer.
return observer.whenBecomesWritable();
} else {
// Ready now.
return kj::READY_NOW;
}
}
private:
UnixEventPort& eventPort;
UnixEventPort::FdObserver observer;
Maybe<ForkedPromise<void>> writeDisconnectedPromise;
Maybe<Function<void(ArrayPtr<AncillaryMessage>)>> ancillaryMsgCallback;
Promise<ReadResult> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
AutoCloseFd* fdBuffer, size_t maxFds,
ReadResult alreadyRead) {
// `alreadyRead` is the number of bytes we have already received via previous reads -- minBytes,
// maxBytes, and buffer have already been adjusted to account for them, but this count must
// be included in the final return value.
ssize_t n;
if (maxFds == 0 && ancillaryMsgCallback == nullptr) {
KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) {
// Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
// a bug that exists in both Clang and GCC:
// http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
// http://llvm.org/bugs/show_bug.cgi?id=12286
goto error;
}
} else {
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
struct iovec iov;
memset(&iov, 0, sizeof(iov));
iov.iov_base = buffer;
iov.iov_len = maxBytes;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
// Allocate space to receive a cmsg.
size_t msgBytes;
if (ancillaryMsgCallback == nullptr) {
#if __APPLE__ || __FreeBSD__
// Until very recently (late 2018 / early 2019), FreeBSD suffered from a bug in which when
// an SCM_RIGHTS message was truncated on delivery, it would not close the FDs that weren't
// delivered -- they would simply leak: https://bugs.freebsd.org/131876
//
// My testing indicates that MacOS has this same bug as of today (April 2019). I don't know
// if they plan to fix it or are even aware of it.
//
// To handle both cases, we will always provide space to receive 512 FDs. Hopefully, this is
// greater than the maximum number of FDs that these kernels will transmit in one message
// PLUS enough space for any other ancillary messages that could be sent before the
// SCM_RIGHTS message to push it back in the buffer. I couldn't find any firm documentation
// on these limits, though -- I only know that Linux is limited to 253, and I saw a hint in
// a comment in someone else's application that suggested FreeBSD is the same. Hopefully,
// then, this is sufficient to prevent attacks. But if not, there's nothing more we can do;
// it's really up to the kernel to fix this.
msgBytes = CMSG_SPACE(sizeof(int) * 512);
#else
msgBytes = CMSG_SPACE(sizeof(int) * maxFds);
#endif
} else {
// If we want room for ancillary messages instead of or in addition to FDs, just use the
// same amount of cushion as in the MacOS/FreeBSD case above.
// Someday we may want to allow customization here, but there's no immediate use for it.
msgBytes = CMSG_SPACE(sizeof(int) * 512);
}
// On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
// 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
// surprisingly end up with space for two file descriptors when you only wanted one. However,
// cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
// the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
// other platforms), so we want to allocate an array of words (we use void*). So... we use
// CMSG_SPACE() and then additionally round up to deal with Mac.
size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
auto cmsgBytes = cmsgSpace.asBytes();
memset(cmsgBytes.begin(), 0, cmsgBytes.size());
msg.msg_control = cmsgBytes.begin();
msg.msg_controllen = msgBytes;
#ifdef MSG_CMSG_CLOEXEC
static constexpr int RECVMSG_FLAGS = MSG_CMSG_CLOEXEC;
#else
static constexpr int RECVMSG_FLAGS = 0;
#endif
KJ_NONBLOCKING_SYSCALL(n = ::recvmsg(fd, &msg, RECVMSG_FLAGS)) {
// Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
// a bug that exists in both Clang and GCC:
// http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
// http://llvm.org/bugs/show_bug.cgi?id=12286
goto error;
}
if (n >= 0) {
// Process all messages.
//
// WARNING DANGER: We have to be VERY careful not to miss a file descriptor here, because
// if we do, then that FD will never be closed, and a malicious peer could exploit this to
// fill up our FD table, creating a DoS attack. Some things to keep in mind:
// - CMSG_SPACE() could have rounded up the space for alignment purposes, and this could
// mean we permitted the kernel to deliver more file descriptors than `maxFds`. We need
// to close the extras.
// - We can receive multiple ancillary messages at once. In particular, there is also
// SCM_CREDENTIALS. The sender decides what to send. They could send SCM_CREDENTIALS
// first followed by SCM_RIGHTS. We need to make sure we see both.
size_t nfds = 0;
size_t spaceLeft = msg.msg_controllen;
Vector<AncillaryMessage> ancillaryMessages;
for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (spaceLeft >= CMSG_LEN(0) &&
cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
// Some operating systems (like MacOS) do not adjust csmg_len when the message is
// truncated. We must do so ourselves or risk overrunning the buffer.
auto len = kj::min(cmsg->cmsg_len, spaceLeft);
auto data = arrayPtr(reinterpret_cast<int*>(CMSG_DATA(cmsg)),
(len - CMSG_LEN(0)) / sizeof(int));
kj::Vector<kj::AutoCloseFd> trashFds;
for (auto fd: data) {
kj::AutoCloseFd ownFd(fd);
if (nfds < maxFds) {
fdBuffer[nfds++] = kj::mv(ownFd);
} else {
trashFds.add(kj::mv(ownFd));
}
}
} else if (spaceLeft >= CMSG_LEN(0) && ancillaryMsgCallback != nullptr) {
auto len = kj::min(cmsg->cmsg_len, spaceLeft);
auto data = ArrayPtr<const byte>(CMSG_DATA(cmsg), len - CMSG_LEN(0));
ancillaryMessages.add(cmsg->cmsg_level, cmsg->cmsg_type, data);
}
if (spaceLeft >= CMSG_LEN(0) && spaceLeft >= cmsg->cmsg_len) {
spaceLeft -= cmsg->cmsg_len;
} else {
spaceLeft = 0;
}
}
#ifndef MSG_CMSG_CLOEXEC
for (size_t i = 0; i < nfds; i++) {
setCloseOnExec(fdBuffer[i]);
}
#endif
if (ancillaryMessages.size() > 0) {
KJ_IF_MAYBE(fn, ancillaryMsgCallback) {
(*fn)(ancillaryMessages.asPtr());
}
}
alreadyRead.capCount += nfds;
fdBuffer += nfds;
maxFds -= nfds;
}
}
if (false) {
error:
return alreadyRead;
}
if (n < 0) {
// Read would block.
return observer.whenBecomesReadable().then([=]() {
return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
});
} else if (n == 0) {
// EOF -OR- maxBytes == 0.
return alreadyRead;
} else if (implicitCast<size_t>(n) >= minBytes) {
// We read enough to stop here.
alreadyRead.byteCount += n;
return alreadyRead;
} else {
// The kernel returned fewer bytes than we asked for (and fewer than we need).
buffer = reinterpret_cast<byte*>(buffer) + n;
minBytes -= n;
maxBytes -= n;
alreadyRead.byteCount += n;
// According to David Klempner, who works on Stubby at Google, we sadly CANNOT assume that
// we've consumed the whole read buffer here. If a signal is delivered in the middle of a
// read() -- yes, even a non-blocking read -- it can cause the kernel to return a partial
// result, with data still in the buffer.
// https://bugzilla.kernel.org/show_bug.cgi?id=199131
// https://twitter.com/CaptainSegfault/status/1112622245531144194
//
// Unfortunately, we have no choice but to issue more read()s until it either tells us EOF
// or EAGAIN. We used to have an optimization here using observer.atEndHint() (when it is
// non-null) to avoid a redundant call to read(). Alas...
return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
}
}
Promise<void> writeInternal(ArrayPtr<const byte> firstPiece,
ArrayPtr<const ArrayPtr<const byte>> morePieces,
ArrayPtr<const int> fds) {
const size_t iovmax = kj::miniposix::iovMax();
// If there are more than IOV_MAX pieces, we'll only write the first IOV_MAX for now, and
// then we'll loop later.
KJ_STACK_ARRAY(struct iovec, iov, kj::min(1 + morePieces.size(), iovmax), 16, 128);
size_t iovTotal = 0;
// writev() interface is not const-correct. :(
iov[0].iov_base = const_cast<byte*>(firstPiece.begin());
iov[0].iov_len = firstPiece.size();
iovTotal += iov[0].iov_len;
for (uint i = 1; i < iov.size(); i++) {
iov[i].iov_base = const_cast<byte*>(morePieces[i - 1].begin());
iov[i].iov_len = morePieces[i - 1].size();
iovTotal += iov[i].iov_len;
}
if (iovTotal == 0) {
KJ_REQUIRE(fds.size() == 0, "can't write FDs without bytes");
return kj::READY_NOW;
}
ssize_t n;
if (fds.size() == 0) {
KJ_NONBLOCKING_SYSCALL(n = ::writev(fd, iov.begin(), iov.size()), iovTotal, iov.size()) {
// Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
// a bug that exists in both Clang and GCC:
// http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
// http://llvm.org/bugs/show_bug.cgi?id=12286
goto error;
}
} else {
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_iov = iov.begin();
msg.msg_iovlen = iov.size();
// Allocate space to send a cmsg.
size_t msgBytes = CMSG_SPACE(sizeof(int) * fds.size());
// On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
// 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
// surprisingly end up with space for two file descriptors when you only wanted one. However,
// cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
// the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
// other platforms), so we want to allocate an array of words (we use void*). So... we use
// CMSG_SPACE() and then additionally round up to deal with Mac.
size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
auto cmsgBytes = cmsgSpace.asBytes();
memset(cmsgBytes.begin(), 0, cmsgBytes.size());
msg.msg_control = cmsgBytes.begin();
msg.msg_controllen = msgBytes;
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fds.size());
memcpy(CMSG_DATA(cmsg), fds.begin(), fds.asBytes().size());
KJ_NONBLOCKING_SYSCALL(n = ::sendmsg(fd, &msg, 0)) {
// Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
// a bug that exists in both Clang and GCC:
// http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
// http://llvm.org/bugs/show_bug.cgi?id=12286
goto error;
}
}
if (false) {
error:
return kj::READY_NOW;
}
if (n < 0) {
// Got EAGAIN. Nothing was written.
return observer.whenBecomesWritable().then([=]() {
return writeInternal(firstPiece, morePieces, fds);
});
} else if (n == 0) {
// Why would a sendmsg() with a non-empty message ever return 0 when writing to a stream
// socket? If there's no room in the send buffer, it should fail with EAGAIN. If the
// connection is closed, it should fail with EPIPE. Various documents and forum posts around
// the internet claim this can happen but no one seems to know when. My guess is it can only
// happen if we try to send an empty message -- which we didn't. So I think this is
// impossible. If it is possible, we need to figure out how to correctly handle it, which
// depends on what caused it.
//
// Note in particular that if 0 is a valid return here, and we sent an SCM_RIGHTS message,
// we need to know whether the message was sent or not, in order to decide whether to retry
// sending it!
KJ_FAIL_ASSERT("non-empty sendmsg() returned 0");
}
// Non-zero bytes were written. This also implies that *all* FDs were written.
// Discard all data that was written, then issue a new write for what's left (if any).
for (;;) {
if (n < firstPiece.size()) {
// Only part of the first piece was consumed. Wait for buffer space and then write again.
firstPiece = firstPiece.slice(n, firstPiece.size());
iovTotal -= n;
if (iovTotal == 0) {
// Oops, what actually happened is that we hit the IOV_MAX limit. Don't wait.
return writeInternal(firstPiece, morePieces, nullptr);
}
// As with read(), we cannot assume that a short write() really means the write buffer is
// full (see comments in the read path above). We have to write again.
return writeInternal(firstPiece, morePieces, nullptr);
} else if (morePieces.size() == 0) {
// First piece was fully-consumed and there are no more pieces, so we're done.
KJ_DASSERT(n == firstPiece.size(), n);
return READY_NOW;
} else {
// First piece was fully consumed, so move on to the next piece.
n -= firstPiece.size();
iovTotal -= firstPiece.size();
firstPiece = morePieces[0];
morePieces = morePieces.slice(1, morePieces.size());
}
}
}
};
#if __linux__
constexpr size_t AsyncStreamFd::MAX_SPLICE_LEN;
#endif // __linux__
// =======================================================================================
class SocketAddress {
public:
SocketAddress(const void* sockaddr, uint len): addrlen(len) {
KJ_REQUIRE(len <= sizeof(addr), "Sorry, your sockaddr is too big for me.");
memcpy(&addr.generic, sockaddr, len);
}
bool operator<(const SocketAddress& other) const {
// So we can use std::set<SocketAddress>... see DNS lookup code.
if (wildcard < other.wildcard) return true;
if (wildcard > other.wildcard) return false;
if (addrlen < other.addrlen) return true;
if (addrlen > other.addrlen) return false;
return memcmp(&addr.generic, &other.addr.generic, addrlen) < 0;
}
const struct sockaddr* getRaw() const { return &addr.generic; }
socklen_t getRawSize() const { return addrlen; }
int socket(int type) const {
bool isStream = type == SOCK_STREAM;
int result;
#if __linux__ && !__BIONIC__
type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0));
if (isStream && (addr.generic.sa_family == AF_INET ||
addr.generic.sa_family == AF_INET6)) {
// TODO(perf): As a hack for the 0.4 release we are always setting
// TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
// RPC protocol. Later, we should extend the interface to provide more
// control over this. Perhaps write() should have a flag which
// specifies whether to pass MSG_MORE.
int one = 1;
KJ_SYSCALL(setsockopt(
result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
}
return result;
}
void bind(int sockfd) const {
#if !defined(__OpenBSD__)
if (wildcard) {
// Disable IPV6_V6ONLY because we want to handle both ipv4 and ipv6 on this socket. (The
// default value of this option varies across platforms.)
int value = 0;
KJ_SYSCALL(setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)));
}
#endif
KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString());
}
uint getPort() const {
switch (addr.generic.sa_family) {
case AF_INET: return ntohs(addr.inet4.sin_port);
case AF_INET6: return ntohs(addr.inet6.sin6_port);
default: return 0;
}
}
String toString() const {
if (wildcard) {
return str("*:", getPort());
}
switch (addr.generic.sa_family) {
case AF_INET: {
char buffer[INET6_ADDRSTRLEN];
if (inet_ntop(addr.inet4.sin_family, &addr.inet4.sin_addr,
buffer, sizeof(buffer)) == nullptr) {
KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
return heapString("(inet_ntop error)");
}
return str(buffer, ':', ntohs(addr.inet4.sin_port));
}
case AF_INET6: {
char buffer[INET6_ADDRSTRLEN];
if (inet_ntop(addr.inet6.sin6_family, &addr.inet6.sin6_addr,
buffer, sizeof(buffer)) == nullptr) {
KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
return heapString("(inet_ntop error)");
}
return str('[', buffer, "]:", ntohs(addr.inet6.sin6_port));
}
case AF_UNIX: {
auto path = _::safeUnixPath(&addr.unixDomain, addrlen);
if (path.size() > 0 && path[0] == '\0') {
return str("unix-abstract:", path.slice(1, path.size()));
} else {
return str("unix:", path);
}
}
default:
return str("(unknown address family ", addr.generic.sa_family, ")");
}
}
static Promise<Array<SocketAddress>> lookupHost(
LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
_::NetworkFilter& filter);
// Perform a DNS lookup.
static Promise<Array<SocketAddress>> parse(
LowLevelAsyncIoProvider& lowLevel, StringPtr str, uint portHint, _::NetworkFilter& filter) {
// TODO(someday): Allow commas in `str`.
SocketAddress result;
if (str.startsWith("unix:")) {
StringPtr path = str.slice(strlen("unix:"));
KJ_REQUIRE(path.size() < sizeof(addr.unixDomain.sun_path),
"Unix domain socket address is too long.", str);
KJ_REQUIRE(path.size() == strlen(path.cStr()),
"Unix domain socket address contains NULL. Use"
" 'unix-abstract:' for the abstract namespace.");
result.addr.unixDomain.sun_family = AF_UNIX;
strcpy(result.addr.unixDomain.sun_path, path.cStr());
result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
if (!result.parseAllowedBy(filter)) {
KJ_FAIL_REQUIRE("unix sockets blocked by restrictPeers()");
return Array<SocketAddress>();
}
auto array = kj::heapArrayBuilder<SocketAddress>(1);
array.add(result);
return array.finish();
}
if (str.startsWith("unix-abstract:")) {
StringPtr path = str.slice(strlen("unix-abstract:"));
KJ_REQUIRE(path.size() + 1 < sizeof(addr.unixDomain.sun_path),
"Unix domain socket address is too long.", str);
result.addr.unixDomain.sun_family = AF_UNIX;
result.addr.unixDomain.sun_path[0] = '\0';
// although not strictly required by Linux, also copy the trailing
// NULL terminator so that we can safely read it back in toString
memcpy(result.addr.unixDomain.sun_path + 1, path.cStr(), path.size() + 1);
result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
if (!result.parseAllowedBy(filter)) {
KJ_FAIL_REQUIRE("abstract unix sockets blocked by restrictPeers()");
return Array<SocketAddress>();
}
auto array = kj::heapArrayBuilder<SocketAddress>(1);
array.add(result);
return array.finish();
}
// Try to separate the address and port.
ArrayPtr<const char> addrPart;
Maybe<StringPtr> portPart;
int af;
if (str.startsWith("[")) {
// Address starts with a bracket, which is a common way to write an ip6 address with a port,
// since without brackets around the address part, the port looks like another segment of
// the address.
af = AF_INET6;
size_t closeBracket = KJ_ASSERT_NONNULL(str.findLast(']'),
"Unclosed '[' in address string.", str);
addrPart = str.slice(1, closeBracket);
if (str.size() > closeBracket + 1) {
KJ_REQUIRE(str.slice(closeBracket + 1).startsWith(":"),
"Expected port suffix after ']'.", str);
portPart = str.slice(closeBracket + 2);
}
} else {
KJ_IF_MAYBE(colon, str.findFirst(':')) {
if (str.slice(*colon + 1).findFirst(':') == nullptr) {
// There is exactly one colon and no brackets, so it must be an ip4 address with port.
af = AF_INET;
addrPart = str.slice(0, *colon);
portPart = str.slice(*colon + 1);
} else {
// There are two or more colons and no brackets, so the whole thing must be an ip6
// address with no port.
af = AF_INET6;
addrPart = str;
}
} else {
// No colons, so it must be an ip4 address without port.
af = AF_INET;
addrPart = str;
}
}
// Parse the port.
unsigned long port;
KJ_IF_MAYBE(portText, portPart) {
char* endptr;
port = strtoul(portText->cStr(), &endptr, 0);
if (portText->size() == 0 || *endptr != '\0') {
// Not a number. Maybe it's a service name. Fall back to DNS.
return lookupHost(lowLevel, kj::heapString(addrPart), kj::heapString(*portText), portHint,
filter);
}
KJ_REQUIRE(port < 65536, "Port number too large.");
} else {
port = portHint;
}
// Check for wildcard.
if (addrPart.size() == 1 && addrPart[0] == '*') {
result.wildcard = true;
#if defined(__OpenBSD__)
// On OpenBSD, all sockets are either v4-only or v6-only, so use v4 as a
// temporary workaround for wildcards.
result.addrlen = sizeof(addr.inet4);
result.addr.inet4.sin_family = AF_INET;
result.addr.inet4.sin_port = htons(port);
#else
// Create an ip6 socket and set IPV6_V6ONLY to 0 later.
result.addrlen = sizeof(addr.inet6);
result.addr.inet6.sin6_family = AF_INET6;
result.addr.inet6.sin6_port = htons(port);
#endif
auto array = kj::heapArrayBuilder<SocketAddress>(1);
array.add(result);
return array.finish();
}
void* addrTarget;
if (af == AF_INET6) {
result.addrlen = sizeof(addr.inet6);
result.addr.inet6.sin6_family = AF_INET6;
result.addr.inet6.sin6_port = htons(port);
addrTarget = &result.addr.inet6.sin6_addr;
} else {
result.addrlen = sizeof(addr.inet4);
result.addr.inet4.sin_family = AF_INET;
result.addr.inet4.sin_port = htons(port);
addrTarget = &result.addr.inet4.sin_addr;
}
if (addrPart.size() < INET6_ADDRSTRLEN - 1) {
// addrPart is not necessarily NUL-terminated so we have to make a copy. :(
char buffer[INET6_ADDRSTRLEN];
memcpy(buffer, addrPart.begin(), addrPart.size());
buffer[addrPart.size()] = '\0';
// OK, parse it!
switch (inet_pton(af, buffer, addrTarget)) {
case 1: {
// success.
if (!result.parseAllowedBy(filter)) {
KJ_FAIL_REQUIRE("address family blocked by restrictPeers()");
return Array<SocketAddress>();
}
auto array = kj::heapArrayBuilder<SocketAddress>(1);
array.add(result);
return array.finish();
}
case 0:
// It's apparently not a simple address... fall back to DNS.
break;
default:
KJ_FAIL_SYSCALL("inet_pton", errno, af, addrPart);
}
}
return lookupHost(lowLevel, kj::heapString(addrPart), nullptr, port, filter);
}
static SocketAddress getLocalAddress(int sockfd) {
SocketAddress result;
result.addrlen = sizeof(addr);
KJ_SYSCALL(getsockname(sockfd, &result.addr.generic, &result.addrlen));
return result;
}
bool allowedBy(LowLevelAsyncIoProvider::NetworkFilter& filter) {
return filter.shouldAllow(&addr.generic, addrlen);
}
bool parseAllowedBy(_::NetworkFilter& filter) {
return filter.shouldAllowParse(&addr.generic, addrlen);
}
kj::Own<PeerIdentity> getIdentity(LowLevelAsyncIoProvider& llaiop,
LowLevelAsyncIoProvider::NetworkFilter& filter,
AsyncIoStream& stream) const;
private:
SocketAddress() {
// We need to memset the whole object 0 otherwise Valgrind gets unhappy when we write it to a
// pipe, due to the padding bytes being uninitialized.
memset(this, 0, sizeof(*this));
}
socklen_t addrlen;
bool wildcard = false;
union {
struct sockaddr generic;
struct sockaddr_in inet4;
struct sockaddr_in6 inet6;
struct sockaddr_un unixDomain;
struct sockaddr_storage storage;
} addr;
struct LookupParams;
class LookupReader;
};
class SocketAddress::LookupReader {
// Reads SocketAddresses off of a pipe coming from another thread that is performing
// getaddrinfo.
public:
LookupReader(kj::Own<Thread>&& thread, kj::Own<AsyncInputStream>&& input,
_::NetworkFilter& filter)
: thread(kj::mv(thread)), input(kj::mv(input)), filter(filter) {}
~LookupReader() {
if (thread) thread->detach();
}
Promise<Array<SocketAddress>> read() {
return input->tryRead(&current, sizeof(current), sizeof(current)).then(
[this](size_t n) -> Promise<Array<SocketAddress>> {
if (n < sizeof(current)) {
thread = nullptr;
// getaddrinfo()'s docs seem to say it will never return an empty list, but let's check
// anyway.
KJ_REQUIRE(addresses.size() > 0, "DNS lookup returned no permitted addresses.") { break; }
return addresses.releaseAsArray();
} else {
// getaddrinfo() can return multiple copies of the same address for several reasons.
// A major one is that we don't give it a socket type (SOCK_STREAM vs. SOCK_DGRAM), so
// it may return two copies of the same address, one for each type, unless it explicitly
// knows that the service name given is specific to one type. But we can't tell it a type,
// because we don't actually know which one the user wants, and if we specify SOCK_STREAM
// while the user specified a UDP service name then they'll get a resolution error which
// is lame. (At least, I think that's how it works.)
//
// So we instead resort to de-duping results.
if (alreadySeen.insert(current).second) {
if (current.parseAllowedBy(filter)) {
addresses.add(current);
}
}
return read();
}
});
}
private:
kj::Own<Thread> thread;
kj::Own<AsyncInputStream> input;
_::NetworkFilter& filter;
SocketAddress current;
kj::Vector<SocketAddress> addresses;
std::set<SocketAddress> alreadySeen;
};
struct SocketAddress::LookupParams {
kj::String host;
kj::String service;
};
Promise<Array<SocketAddress>> SocketAddress::lookupHost(
LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
_::NetworkFilter& filter) {
// This shitty function spawns a thread to run getaddrinfo(). Unfortunately, getaddrinfo() is
// the only cross-platform DNS API and it is blocking.
//
// TODO(perf): Use a thread pool? Maybe kj::Thread should use a thread pool automatically?
// Maybe use the various platform-specific asynchronous DNS libraries? Please do not implement
// a custom DNS resolver...
int fds[2];
#if __linux__ && !__BIONIC__
KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
#else
KJ_SYSCALL(pipe(fds));
#endif
auto input = lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS);
int outFd = fds[1];
LookupParams params = { kj::mv(host), kj::mv(service) };
auto thread = heap<Thread>(kj::mvCapture(params, [outFd,portHint](LookupParams&& params) {
FdOutputStream output((AutoCloseFd(outFd)));
struct addrinfo* list;
int status = getaddrinfo(
params.host == "*" ? nullptr : params.host.cStr(),
params.service == nullptr ? nullptr : params.service.cStr(),
nullptr, &list);
if (status == 0) {
KJ_DEFER(freeaddrinfo(list));
struct addrinfo* cur = list;
while (cur != nullptr) {
if (params.service == nullptr) {
switch (cur->ai_addr->sa_family) {
case AF_INET:
((struct sockaddr_in*)cur->ai_addr)->sin_port = htons(portHint);
break;
case AF_INET6:
((struct sockaddr_in6*)cur->ai_addr)->sin6_port = htons(portHint);
break;
default:
break;
}
}
SocketAddress addr;
if (params.host == "*") {
// Set up a wildcard SocketAddress. Only use the port number returned by getaddrinfo().
addr.wildcard = true;
addr.addrlen = sizeof(addr.addr.inet6);
addr.addr.inet6.sin6_family = AF_INET6;
switch (cur->ai_addr->sa_family) {
case AF_INET:
addr.addr.inet6.sin6_port = ((struct sockaddr_in*)cur->ai_addr)->sin_port;
break;
case AF_INET6:
addr.addr.inet6.sin6_port = ((struct sockaddr_in6*)cur->ai_addr)->sin6_port;
break;
default:
addr.addr.inet6.sin6_port = portHint;
break;
}
} else {
addr.addrlen = cur->ai_addrlen;
memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
}
KJ_ASSERT_CAN_MEMCPY(SocketAddress);
output.write(&addr, sizeof(addr));
cur = cur->ai_next;
}
} else if (status == EAI_SYSTEM) {
KJ_FAIL_SYSCALL("getaddrinfo", errno, params.host, params.service) {
return;
}
} else {
KJ_FAIL_REQUIRE("DNS lookup failed.",
params.host, params.service, gai_strerror(status)) {
return;
}
}
}));
auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input), filter);
return reader->read().attach(kj::mv(reader));
}
// =======================================================================================
class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor {
public:
FdConnectionReceiver(LowLevelAsyncIoProvider& lowLevel,
UnixEventPort& eventPort, int fd,
LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
: OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter),
observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ) {}
Promise<Own<AsyncIoStream>> accept() override {
return acceptImpl(false).then([](AuthenticatedStream&& a) { return kj::mv(a.stream); });
}
Promise<AuthenticatedStream> acceptAuthenticated() override {
return acceptImpl(true);
}
Promise<AuthenticatedStream> acceptImpl(bool authenticated) {
int newFd;
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
retry:
#if __linux__ && !__BIONIC__
newFd = ::accept4(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen,
SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
newFd = ::accept(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen);
#endif
if (newFd >= 0) {
kj::AutoCloseFd ownFd(newFd);
if (!filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), addrlen)) {
// Ignore disallowed address.
return acceptImpl(authenticated);
} else {
// TODO(perf): As a hack for the 0.4 release we are always setting
// TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
// RPC protocol. Later, we should extend the interface to provide more
// control over this. Perhaps write() should have a flag which
// specifies whether to pass MSG_MORE.
int one = 1;
KJ_SYSCALL_HANDLE_ERRORS(::setsockopt(
ownFd.get(), IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one))) {
case EOPNOTSUPP:
case ENOPROTOOPT: // (returned for AF_UNIX in cygwin)
#if __FreeBSD__
case EINVAL: // (returned for AF_UNIX in FreeBSD)
#endif
break;
default:
KJ_FAIL_SYSCALL("setsocketopt(IPPROTO_TCP, TCP_NODELAY)", error);
}
AuthenticatedStream result;
result.stream = heap<AsyncStreamFd>(eventPort, ownFd.release(), NEW_FD_FLAGS);
if (authenticated) {
result.peerIdentity = SocketAddress(reinterpret_cast<struct sockaddr*>(&addr), addrlen)
.getIdentity(lowLevel, filter, *result.stream);
}
return kj::mv(result);
}
} else {
int error = errno;
switch (error) {
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK:
#endif
// Not ready yet.
return observer.whenBecomesReadable().then([this,authenticated]() {
return acceptImpl(authenticated);
});
case EINTR:
case ENETDOWN:
#ifdef EPROTO
// EPROTO is not defined on OpenBSD.
case EPROTO:
#endif
case EHOSTDOWN:
case EHOSTUNREACH:
case ENETUNREACH:
case ECONNABORTED:
case ETIMEDOUT:
// According to the Linux man page, accept() may report an error if the accepted
// connection is already broken. In this case, we really ought to just ignore it and
// keep waiting. But it's hard to say exactly what errors are such network errors and
// which ones are permanent errors. We've made a guess here.
goto retry;
default:
KJ_FAIL_SYSCALL("accept", error);
}
}
}
uint getPort() override {
return SocketAddress::getLocalAddress(fd).getPort();
}
void getsockopt(int level, int option, void* value, uint* length) override {
socklen_t socklen = *length;
KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
*length = socklen;
}
void setsockopt(int level, int option, const void* value, uint length) override {
KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
}
void getsockname(struct sockaddr* addr, uint* length) override {
socklen_t socklen = *length;
KJ_SYSCALL(::getsockname(fd, addr, &socklen));
*length = socklen;
}
public:
LowLevelAsyncIoProvider& lowLevel;
UnixEventPort& eventPort;
LowLevelAsyncIoProvider::NetworkFilter& filter;
UnixEventPort::FdObserver observer;
};
class DatagramPortImpl final: public DatagramPort, public OwnedFileDescriptor {
public:
DatagramPortImpl(LowLevelAsyncIoProvider& lowLevel, UnixEventPort& eventPort, int fd,
LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
: OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter),
observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ |
UnixEventPort::FdObserver::OBSERVE_WRITE) {}
Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) override;
Promise<size_t> send(
ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) override;
class ReceiverImpl;
Own<DatagramReceiver> makeReceiver(DatagramReceiver::Capacity capacity) override;
uint getPort() override {
return SocketAddress::getLocalAddress(fd).getPort();
}
void getsockopt(int level, int option, void* value, uint* length) override {
socklen_t socklen = *length;
KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
*length = socklen;
}
void setsockopt(int level, int option, const void* value, uint length) override {
KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
}
public:
LowLevelAsyncIoProvider& lowLevel;
UnixEventPort& eventPort;
LowLevelAsyncIoProvider::NetworkFilter& filter;
UnixEventPort::FdObserver observer;
};
class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider {
public:
LowLevelAsyncIoProviderImpl()
: eventLoop(eventPort), waitScope(eventLoop) {}
inline WaitScope& getWaitScope() { return waitScope; }
Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) override {
return heap<AsyncStreamFd>(eventPort, fd, flags);
}
Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) override {
return heap<AsyncStreamFd>(eventPort, fd, flags);
}
Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override {
return heap<AsyncStreamFd>(eventPort, fd, flags);
}
Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0) override {
return heap<AsyncStreamFd>(eventPort, fd, flags);
}
Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
int fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override {
// It's important that we construct the AsyncStreamFd first, so that `flags` are honored,
// especially setting nonblocking mode and taking ownership.
auto result = heap<AsyncStreamFd>(eventPort, fd, flags);
// Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates
// non-blocking using EINPROGRESS.
for (;;) {
if (::connect(fd, addr, addrlen) < 0) {
int error = errno;
if (error == EINPROGRESS) {
// Fine.
break;
} else if (error != EINTR) {
auto address = SocketAddress(addr, addrlen).toString();
KJ_FAIL_SYSCALL("connect()", error, address) { break; }
return Own<AsyncIoStream>();
}
} else {
// no error
break;
}
}
auto connected = result->waitConnected();
return connected.then(kj::mvCapture(result, [fd](Own<AsyncIoStream>&& stream) {
int err;
socklen_t errlen = sizeof(err);
KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen));
if (err != 0) {
KJ_FAIL_SYSCALL("connect()", err) { break; }
}
return kj::mv(stream);
}));
}
Own<ConnectionReceiver> wrapListenSocketFd(
int fd, NetworkFilter& filter, uint flags = 0) override {
return heap<FdConnectionReceiver>(*this, eventPort, fd, filter, flags);
}
Own<DatagramPort> wrapDatagramSocketFd(
int fd, NetworkFilter& filter, uint flags = 0) override {
return heap<DatagramPortImpl>(*this, eventPort, fd, filter, flags);
}
Timer& getTimer() override { return eventPort.getTimer(); }
UnixEventPort& getEventPort() { return eventPort; }
private:
UnixEventPort eventPort;
EventLoop eventLoop;
WaitScope waitScope;
};
// =======================================================================================
class NetworkAddressImpl final: public NetworkAddress {
public:
NetworkAddressImpl(LowLevelAsyncIoProvider& lowLevel,
LowLevelAsyncIoProvider::NetworkFilter& filter,
Array<SocketAddress> addrs)
: lowLevel(lowLevel), filter(filter), addrs(kj::mv(addrs)) {}
Promise<Own<AsyncIoStream>> connect() override {
auto addrsCopy = heapArray(addrs.asPtr());
auto promise = connectImpl(lowLevel, filter, addrsCopy, false);
return promise.attach(kj::mv(addrsCopy))
.then([](AuthenticatedStream&& a) { return kj::mv(a.stream); });
}
Promise<AuthenticatedStream> connectAuthenticated() override {
auto addrsCopy = heapArray(addrs.asPtr());
auto promise = connectImpl(lowLevel, filter, addrsCopy, true);
return promise.attach(kj::mv(addrsCopy));
}
Own<ConnectionReceiver> listen() override {
auto makeReceiver = [&](SocketAddress& addr) {
int fd = addr.socket(SOCK_STREAM);
{
KJ_ON_SCOPE_FAILURE(close(fd));
// We always enable SO_REUSEADDR because having to take your server down for five minutes
// before it can restart really sucks.
int optval = 1;
KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
addr.bind(fd);
// TODO(someday): Let queue size be specified explicitly in string addresses.
KJ_SYSCALL(::listen(fd, SOMAXCONN));
}
return lowLevel.wrapListenSocketFd(fd, filter, NEW_FD_FLAGS);
};
if (addrs.size() == 1) {
return makeReceiver(addrs[0]);
} else {
return newAggregateConnectionReceiver(KJ_MAP(addr, addrs) { return makeReceiver(addr); });
}
}
Own<DatagramPort> bindDatagramPort() override {
if (addrs.size() > 1) {
KJ_LOG(WARNING, "Bind address resolved to multiple addresses. Only the first address will "
"be used. If this is incorrect, specify the address numerically. This may be fixed "
"in the future.", addrs[0].toString());
}
int fd = addrs[0].socket(SOCK_DGRAM);
{
KJ_ON_SCOPE_FAILURE(close(fd));
// We always enable SO_REUSEADDR because having to take your server down for five minutes
// before it can restart really sucks.
int optval = 1;
KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
addrs[0].bind(fd);
}
return lowLevel.wrapDatagramSocketFd(fd, filter, NEW_FD_FLAGS);
}
Own<NetworkAddress> clone() override {
return kj::heap<NetworkAddressImpl>(lowLevel, filter, kj::heapArray(addrs.asPtr()));
}
String toString() override {
return strArray(KJ_MAP(addr, addrs) { return addr.toString(); }, ",");
}
const SocketAddress& chooseOneAddress() {
KJ_REQUIRE(addrs.size() > 0, "No addresses available.");
return addrs[counter++ % addrs.size()];
}
private:
LowLevelAsyncIoProvider& lowLevel;
LowLevelAsyncIoProvider::NetworkFilter& filter;
Array<SocketAddress> addrs;
uint counter = 0;
static Promise<AuthenticatedStream> connectImpl(
LowLevelAsyncIoProvider& lowLevel,
LowLevelAsyncIoProvider::NetworkFilter& filter,
ArrayPtr<SocketAddress> addrs,
bool authenticated) {
KJ_ASSERT(addrs.size() > 0);
return kj::evalNow([&]() -> Promise<Own<AsyncIoStream>> {
if (!addrs[0].allowedBy(filter)) {
return KJ_EXCEPTION(FAILED, "connect() blocked by restrictPeers()");
} else {
int fd = addrs[0].socket(SOCK_STREAM);
return lowLevel.wrapConnectingSocketFd(
fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS);
}
}).then([&lowLevel,&filter,addrs,authenticated](Own<AsyncIoStream>&& stream)
-> Promise<AuthenticatedStream> {
// Success, pass along.
AuthenticatedStream result;
result.stream = kj::mv(stream);
if (authenticated) {
result.peerIdentity = addrs[0].getIdentity(lowLevel, filter, *result.stream);
}
return kj::mv(result);
}, [&lowLevel,&filter,addrs,authenticated](Exception&& exception) mutable
-> Promise<AuthenticatedStream> {
// Connect failed.
if (addrs.size() > 1) {
// Try the next address instead.
return connectImpl(lowLevel, filter, addrs.slice(1, addrs.size()), authenticated);
} else {
// No more addresses to try, so propagate the exception.
return kj::mv(exception);
}
});
}
};
kj::Own<PeerIdentity> SocketAddress::getIdentity(kj::LowLevelAsyncIoProvider& llaiop,
LowLevelAsyncIoProvider::NetworkFilter& filter,
AsyncIoStream& stream) const {
switch (addr.generic.sa_family) {
case AF_INET:
case AF_INET6: {
auto builder = kj::heapArrayBuilder<SocketAddress>(1);
builder.add(*this);
return NetworkPeerIdentity::newInstance(
kj::heap<NetworkAddressImpl>(llaiop, filter, builder.finish()));
}
case AF_UNIX: {
LocalPeerIdentity::Credentials result;
// There is little documentation on what happens when the uid/pid can't be obtained, but I've
// seen vague references on the internet saying that a PID of 0 and a UID of uid_t(-1) are used
// as invalid values.
// OpenBSD defines SO_PEERCRED but uses a different interface for it
// hence we're falling back to LOCAL_PEERCRED
#if defined(SO_PEERCRED) && !__OpenBSD__
struct ucred creds;
uint length = sizeof(creds);
stream.getsockopt(SOL_SOCKET, SO_PEERCRED, &creds, &length);
if (creds.pid > 0) {
result.pid = creds.pid;
}
if (creds.uid != static_cast<uid_t>(-1)) {
result.uid = creds.uid;
}
#elif defined(LOCAL_PEERCRED)
// MacOS / FreeBSD / OpenBSD
struct xucred creds;
uint length = sizeof(creds);
stream.getsockopt(SOL_LOCAL, LOCAL_PEERCRED, &creds, &length);
KJ_ASSERT(length == sizeof(creds));
if (creds.cr_uid != static_cast<uid_t>(-1)) {
result.uid = creds.cr_uid;
}
#if defined(LOCAL_PEERPID)
// MacOS only?
pid_t pid;
length = sizeof(pid);
stream.getsockopt(SOL_LOCAL, LOCAL_PEERPID, &pid, &length);
KJ_ASSERT(length == sizeof(pid));
if (pid > 0) {
result.pid = pid;
}
#endif
#endif
return LocalPeerIdentity::newInstance(result);
}
default:
return UnknownPeerIdentity::newInstance();
}
}
class SocketNetwork final: public Network {
public:
explicit SocketNetwork(LowLevelAsyncIoProvider& lowLevel): lowLevel(lowLevel) {}
explicit SocketNetwork(SocketNetwork& parent,
kj::ArrayPtr<const kj::StringPtr> allow,
kj::ArrayPtr<const kj::StringPtr> deny)
: lowLevel(parent.lowLevel), filter(allow, deny, parent.filter) {}
Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
return evalLater(mvCapture(heapString(addr), [this,portHint](String&& addr) {
return SocketAddress::parse(lowLevel, addr, portHint, filter);
})).then([this](Array<SocketAddress> addresses) -> Own<NetworkAddress> {
return heap<NetworkAddressImpl>(lowLevel, filter, kj::mv(addresses));
});
}
Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
auto array = kj::heapArrayBuilder<SocketAddress>(1);
array.add(SocketAddress(sockaddr, len));
KJ_REQUIRE(array[0].allowedBy(filter), "address blocked by restrictPeers()") { break; }
return Own<NetworkAddress>(heap<NetworkAddressImpl>(lowLevel, filter, array.finish()));
}
Own<Network> restrictPeers(
kj::ArrayPtr<const kj::StringPtr> allow,
kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
return heap<SocketNetwork>(*this, allow, deny);
}
private:
LowLevelAsyncIoProvider& lowLevel;
_::NetworkFilter filter;
};
// =======================================================================================
Promise<size_t> DatagramPortImpl::send(
const void* buffer, size_t size, NetworkAddress& destination) {
auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = sendto(fd, buffer, size, 0, addr.getRaw(), addr.getRawSize()));
if (n < 0) {
// Write buffer full.
return observer.whenBecomesWritable().then([this, buffer, size, &destination]() {
return send(buffer, size, destination);
});
} else {
// If less than the whole message was sent, then it got truncated, and there's nothing we can
// do about it.
return n;
}
}
Promise<size_t> DatagramPortImpl::send(
ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) {
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
msg.msg_name = const_cast<void*>(implicitCast<const void*>(addr.getRaw()));
msg.msg_namelen = addr.getRawSize();
const size_t iovmax = kj::miniposix::iovMax();
KJ_STACK_ARRAY(struct iovec, iov, kj::min(pieces.size(), iovmax), 16, 64);
for (size_t i: kj::indices(pieces)) {
iov[i].iov_base = const_cast<void*>(implicitCast<const void*>(pieces[i].begin()));
iov[i].iov_len = pieces[i].size();
}
Array<byte> extra;
if (pieces.size() > iovmax) {
// Too many pieces, but we can't use multiple syscalls because they'd send separate
// datagrams. We'll have to copy the trailing pieces into a temporary array.
//
// TODO(perf): On Linux we could use multiple syscalls via MSG_MORE or sendmsg/sendmmsg.
size_t extraSize = 0;
for (size_t i = iovmax - 1; i < pieces.size(); i++) {
extraSize += pieces[i].size();
}
extra = kj::heapArray<byte>(extraSize);
extraSize = 0;
for (size_t i = iovmax - 1; i < pieces.size(); i++) {
memcpy(extra.begin() + extraSize, pieces[i].begin(), pieces[i].size());
extraSize += pieces[i].size();
}
iov.back().iov_base = extra.begin();
iov.back().iov_len = extra.size();
}
msg.msg_iov = iov.begin();
msg.msg_iovlen = iov.size();
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = sendmsg(fd, &msg, 0));
if (n < 0) {
// Write buffer full.
return observer.whenBecomesWritable().then([this, pieces, &destination]() {
return send(pieces, destination);
});
} else {
// If less than the whole message was sent, then it was truncated, and there's nothing we can
// do about that now.
return n;
}
}
class DatagramPortImpl::ReceiverImpl final: public DatagramReceiver {
public:
explicit ReceiverImpl(DatagramPortImpl& port, Capacity capacity)
: port(port),
contentBuffer(heapArray<byte>(capacity.content)),
ancillaryBuffer(capacity.ancillary > 0 ? heapArray<byte>(capacity.ancillary)
: Array<byte>(nullptr)) {}
Promise<void> receive() override {
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
struct sockaddr_storage addr;
memset(&addr, 0, sizeof(addr));
msg.msg_name = &addr;
msg.msg_namelen = sizeof(addr);
struct iovec iov;
iov.iov_base = contentBuffer.begin();
iov.iov_len = contentBuffer.size();
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = ancillaryBuffer.begin();
msg.msg_controllen = ancillaryBuffer.size();
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = recvmsg(port.fd, &msg, 0));
if (n < 0) {
// No data available. Wait.
return port.observer.whenBecomesReadable().then([this]() {
return receive();
});
} else {
if (!port.filter.shouldAllow(reinterpret_cast<const struct sockaddr*>(msg.msg_name),
msg.msg_namelen)) {
// Ignore message from disallowed source.
return receive();
}
receivedSize = n;
contentTruncated = msg.msg_flags & MSG_TRUNC;
source.emplace(port.lowLevel, port.filter, msg.msg_name, msg.msg_namelen);
ancillaryList.resize(0);
ancillaryTruncated = msg.msg_flags & MSG_CTRUNC;
for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
// On some platforms (OSX), a cmsghdr's length may cross the end of the ancillary buffer
// when truncated. On other platforms (Linux) the length in cmsghdr will itself be
// truncated to fit within the buffer.
#if __APPLE__
// On MacOS, `CMSG_SPACE(0)` triggers a bogus warning.
#pragma GCC diagnostic ignored "-Wnull-pointer-arithmetic"
#endif
const byte* pos = reinterpret_cast<const byte*>(cmsg);
size_t available = ancillaryBuffer.end() - pos;
if (available < CMSG_SPACE(0)) {
// The buffer ends in the middle of the header. We can't use this message.
// (On Linux, this never happens, because the message is not included if there isn't
// space for a header. I'm not sure how other systems behave, though, so let's be safe.)
break;
}
// OK, we know the cmsghdr is valid, at least.
// Find the start of the message payload.
const byte* begin = (const byte *)CMSG_DATA(cmsg);
// Cap the message length to the available space.
const byte* end = pos + kj::min(available, cmsg->cmsg_len);
ancillaryList.add(AncillaryMessage(
cmsg->cmsg_level, cmsg->cmsg_type, arrayPtr(begin, end)));
}
return READY_NOW;
}
}
MaybeTruncated<ArrayPtr<const byte>> getContent() override {
return { contentBuffer.slice(0, receivedSize), contentTruncated };
}
MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() override {
return { ancillaryList.asPtr(), ancillaryTruncated };
}
NetworkAddress& getSource() override {
return KJ_REQUIRE_NONNULL(source, "Haven't sent a message yet.").abstract;
}
private:
DatagramPortImpl& port;
Array<byte> contentBuffer;
Array<byte> ancillaryBuffer;
Vector<AncillaryMessage> ancillaryList;
size_t receivedSize = 0;
bool contentTruncated = false;
bool ancillaryTruncated = false;
struct StoredAddress {
StoredAddress(LowLevelAsyncIoProvider& lowLevel, LowLevelAsyncIoProvider::NetworkFilter& filter,
const void* sockaddr, uint length)
: raw(sockaddr, length),
abstract(lowLevel, filter, Array<SocketAddress>(&raw, 1, NullArrayDisposer::instance)) {}
SocketAddress raw;
NetworkAddressImpl abstract;
};
kj::Maybe<StoredAddress> source;
};
Own<DatagramReceiver> DatagramPortImpl::makeReceiver(DatagramReceiver::Capacity capacity) {
return kj::heap<ReceiverImpl>(*this, capacity);
}
// =======================================================================================
class AsyncIoProviderImpl final: public AsyncIoProvider {
public:
AsyncIoProviderImpl(LowLevelAsyncIoProvider& lowLevel)
: lowLevel(lowLevel), network(lowLevel) {}
OneWayPipe newOneWayPipe() override {
int fds[2];
#if __linux__ && !__BIONIC__
KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
#else
KJ_SYSCALL(pipe(fds));
#endif
return OneWayPipe {
lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS),
lowLevel.wrapOutputFd(fds[1], NEW_FD_FLAGS)
};
}
TwoWayPipe newTwoWayPipe() override {
int fds[2];
int type = SOCK_STREAM;
#if __linux__ && !__BIONIC__
type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
return TwoWayPipe { {
lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS),
lowLevel.wrapSocketFd(fds[1], NEW_FD_FLAGS)
} };
}
CapabilityPipe newCapabilityPipe() override {
int fds[2];
int type = SOCK_STREAM;
#if __linux__ && !__BIONIC__
type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
return CapabilityPipe { {
lowLevel.wrapUnixSocketFd(fds[0], NEW_FD_FLAGS),
lowLevel.wrapUnixSocketFd(fds[1], NEW_FD_FLAGS)
} };
}
Network& getNetwork() override {
return network;
}
PipeThread newPipeThread(
Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) override {
int fds[2];
int type = SOCK_STREAM;
#if __linux__ && !__BIONIC__
type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
int threadFd = fds[1];
KJ_ON_SCOPE_FAILURE(close(threadFd));
auto pipe = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS);
auto thread = heap<Thread>(kj::mvCapture(startFunc,
[threadFd](Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)>&& startFunc) {
LowLevelAsyncIoProviderImpl lowLevel;
auto stream = lowLevel.wrapSocketFd(threadFd, NEW_FD_FLAGS);
AsyncIoProviderImpl ioProvider(lowLevel);
startFunc(ioProvider, *stream, lowLevel.getWaitScope());
}));
return { kj::mv(thread), kj::mv(pipe) };
}
Timer& getTimer() override { return lowLevel.getTimer(); }
private:
LowLevelAsyncIoProvider& lowLevel;
SocketNetwork network;
};
} // namespace
Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel) {
return kj::heap<AsyncIoProviderImpl>(lowLevel);
}
AsyncIoContext setupAsyncIo() {
auto lowLevel = heap<LowLevelAsyncIoProviderImpl>();
auto ioProvider = kj::heap<AsyncIoProviderImpl>(*lowLevel);
auto& waitScope = lowLevel->getWaitScope();
auto& eventPort = lowLevel->getEventPort();
return { kj::mv(lowLevel), kj::mv(ioProvider), waitScope, eventPort };
}
} // namespace kj
#endif // !_WIN32